diff --git a/src/github.com/matrix-org/dendrite/clientapi/storage/current_room_state_table.go b/src/github.com/matrix-org/dendrite/clientapi/storage/current_room_state_table.go new file mode 100644 index 000000000..bff60972f --- /dev/null +++ b/src/github.com/matrix-org/dendrite/clientapi/storage/current_room_state_table.go @@ -0,0 +1,91 @@ +package storage + +import ( + "database/sql" + "encoding/json" + + "github.com/matrix-org/dendrite/clientapi/events" + "github.com/matrix-org/gomatrixserverlib" +) + +const currentRoomStateSchema = ` +-- Stores the current room state for every room. +CREATE TABLE IF NOT EXISTS current_room_state ( + -- The 'room_id' key for the state event. + room_id TEXT NOT NULL, + -- The state event ID + event_id TEXT NOT NULL, + -- The state event type e.g 'm.room.member' + type TEXT NOT NULL, + -- The state_key value for this state event e.g '' + state_key TEXT NOT NULL, + -- The JSON for the event. Stored as TEXT because this should be valid UTF-8. + event_json TEXT NOT NULL, + -- The 'content.membership' value if this event is an m.room.member event. For other + -- events, this will be NULL. + membership TEXT, + -- Clobber based on 3-uple of room_id, type and state_key + CONSTRAINT room_state_unique UNIQUE (room_id, type, state_key) +); +-- for event deletion +CREATE UNIQUE INDEX IF NOT EXISTS event_id_idx ON current_room_state(event_id); +` + +const upsertRoomStateSQL = "" + + "INSERT INTO current_room_state (room_id, event_id, type, state_key, event_json, membership) VALUES ($1, $2, $3, $4, $5, $6)" + + " ON CONFLICT ON CONSTRAINT room_state_unique" + + " DO UPDATE SET event_id = $2, event_json = $5, membership = $6" + +const deleteRoomStateByEventIDSQL = "" + + "DELETE FROM current_room_state WHERE event_id = $1" + +type currentRoomStateStatements struct { + upsertRoomStateStmt *sql.Stmt + deleteRoomStateByEventIDStmt *sql.Stmt +} + +func (s *currentRoomStateStatements) prepare(db *sql.DB) (err error) { + _, err = db.Exec(currentRoomStateSchema) + if err != nil { + return + } + if s.upsertRoomStateStmt, err = db.Prepare(upsertRoomStateSQL); err != nil { + return + } + if s.deleteRoomStateByEventIDStmt, err = db.Prepare(deleteRoomStateByEventIDSQL); err != nil { + return + } + return +} + +func (s *currentRoomStateStatements) UpdateRoomState(txn *sql.Tx, added []gomatrixserverlib.Event, removedEventIDs []string) error { + // remove first, then add, as we do not ever delete state, but do replace state which is a remove followed by an add. + for _, eventID := range removedEventIDs { + _, err := txn.Stmt(s.deleteRoomStateByEventIDStmt).Exec(eventID) + if err != nil { + return err + } + } + + for _, event := range added { + if event.StateKey() == nil { + // ignore non state events + continue + } + var membership *string + if event.Type() == "m.room.member" { + var memberContent events.MemberContent + if err := json.Unmarshal(event.Content(), &memberContent); err != nil { + return err + } + membership = &memberContent.Membership + } + _, err := txn.Stmt(s.upsertRoomStateStmt).Exec( + event.RoomID(), event.EventID(), event.Type(), *event.StateKey(), event.JSON(), membership, + ) + if err != nil { + return err + } + } + return nil +} diff --git a/src/github.com/matrix-org/dendrite/clientapi/storage/output_room_events_table.go b/src/github.com/matrix-org/dendrite/clientapi/storage/output_room_events_table.go index c3be0a3a2..a31759c3a 100644 --- a/src/github.com/matrix-org/dendrite/clientapi/storage/output_room_events_table.go +++ b/src/github.com/matrix-org/dendrite/clientapi/storage/output_room_events_table.go @@ -2,8 +2,10 @@ package storage import ( "database/sql" + "fmt" "github.com/lib/pq" + "github.com/matrix-org/gomatrixserverlib" ) const outputRoomEventsSchema = ` @@ -13,21 +15,30 @@ CREATE TABLE IF NOT EXISTS output_room_events ( -- NB: 'serial' makes no guarantees to increment by 1 every time, only that it increments. -- This isn't a problem for us since we just want to order by this field. id BIGSERIAL PRIMARY KEY, + -- The event ID for the event + event_id TEXT NOT NULL, -- The 'room_id' key for the event. room_id TEXT NOT NULL, -- The JSON for the event. Stored as TEXT because this should be valid UTF-8. event_json TEXT NOT NULL, - -- A list of event IDs which represent a delta of added/removed room state. - add_state_ids TEXT[] NOT NULL, - remove_state_ids TEXT[] NOT NULL + -- A list of event IDs which represent a delta of added/removed room state. This can be NULL + -- if there is no delta. + add_state_ids TEXT[], + remove_state_ids TEXT[] ); +-- for event selection +CREATE UNIQUE INDEX IF NOT EXISTS event_id_idx ON output_room_events(event_id); ` const insertEventSQL = "" + - "INSERT INTO output_room_events (room_id, event_json, add_state_ids, remove_state_ids) VALUES ($1, $2, $3, $4)" + "INSERT INTO output_room_events (room_id, event_id, event_json, add_state_ids, remove_state_ids) VALUES ($1, $2, $3, $4, $5)" + +const selectEventsSQL = "" + + "SELECT event_json FROM output_room_events WHERE event_id = ANY($1)" type outputRoomEventsStatements struct { - insertEventStmt *sql.Stmt + insertEventStmt *sql.Stmt + selectEventsStmt *sql.Stmt } func (s *outputRoomEventsStatements) prepare(db *sql.DB) (err error) { @@ -38,11 +49,44 @@ func (s *outputRoomEventsStatements) prepare(db *sql.DB) (err error) { if s.insertEventStmt, err = db.Prepare(insertEventSQL); err != nil { return } + if s.selectEventsStmt, err = db.Prepare(selectEventsSQL); err != nil { + return + } return } // InsertEvent into the output_room_events table. addState and removeState are an optional list of state event IDs. -func (s *outputRoomEventsStatements) InsertEvent(roomID string, eventJSON []byte, addState, removeState []string) error { - _, err := s.insertEventStmt.Exec(roomID, eventJSON, pq.StringArray(addState), pq.StringArray(removeState)) +func (s *outputRoomEventsStatements) InsertEvent(txn *sql.Tx, event *gomatrixserverlib.Event, addState, removeState []string) error { + _, err := txn.Stmt(s.insertEventStmt).Exec( + event.RoomID(), event.EventID(), event.JSON(), pq.StringArray(addState), pq.StringArray(removeState), + ) return err } + +// Events returns the events for the given event IDs. Returns an error if any one of the event IDs given are missing +// from the database. +func (s *outputRoomEventsStatements) Events(txn *sql.Tx, eventIDs []string) ([]gomatrixserverlib.Event, error) { + rows, err := txn.Stmt(s.selectEventsStmt).Query(pq.StringArray(eventIDs)) + if err != nil { + return nil, err + } + defer rows.Close() + + result := make([]gomatrixserverlib.Event, len(eventIDs)) + i := 0 + for ; rows.Next(); i++ { + var eventBytes []byte + if err := rows.Scan(&eventBytes); err != nil { + return nil, err + } + ev, err := gomatrixserverlib.NewEventFromTrustedJSON(eventBytes, false) + if err != nil { + return nil, err + } + result = append(result, ev) + } + if i != len(eventIDs) { + return nil, fmt.Errorf("failed to map all event IDs to events: (%d != %d)", i, len(eventIDs)) + } + return result, nil +} diff --git a/src/github.com/matrix-org/dendrite/clientapi/storage/syncserver.go b/src/github.com/matrix-org/dendrite/clientapi/storage/syncserver.go index 0a911d98f..39df640b1 100644 --- a/src/github.com/matrix-org/dendrite/clientapi/storage/syncserver.go +++ b/src/github.com/matrix-org/dendrite/clientapi/storage/syncserver.go @@ -13,6 +13,7 @@ type SyncServerDatabase struct { db *sql.DB partitions common.PartitionOffsetStatements events outputRoomEventsStatements + roomstate currentRoomStateStatements } // NewSyncServerDatabase creates a new sync server database @@ -30,13 +31,44 @@ func NewSyncServerDatabase(dataSourceName string) (*SyncServerDatabase, error) { if err = events.prepare(db); err != nil { return nil, err } - return &SyncServerDatabase{db, partitions, events}, nil + state := currentRoomStateStatements{} + if err := state.prepare(db); err != nil { + return nil, err + } + return &SyncServerDatabase{db, partitions, events, state}, nil } // WriteEvent into the database. It is not safe to call this function from multiple goroutines, as it would create races // when generating the stream position for this event. Returns an error if there was a problem inserting this event. func (d *SyncServerDatabase) WriteEvent(ev *gomatrixserverlib.Event, addStateEventIDs, removeStateEventIDs []string) error { - return d.events.InsertEvent(ev.RoomID(), ev.JSON(), addStateEventIDs, removeStateEventIDs) + return runTransaction(d.db, func(txn *sql.Tx) error { + if err := d.events.InsertEvent(txn, ev, addStateEventIDs, removeStateEventIDs); err != nil { + return err + } + + if len(addStateEventIDs) == 0 && len(removeStateEventIDs) == 0 { + // Nothing to do, the event may have just been a message event. + return nil + } + + // Update the current room state based on the added/removed state event IDs. + // In the common case there is a single added event ID which is the state event itself, assuming `ev` is a state event. + // However, conflict resolution may result in there being different events being added, or even some removed. + if len(removeStateEventIDs) == 0 && len(addStateEventIDs) == 1 && addStateEventIDs[0] == ev.EventID() { + // common case + if err := d.roomstate.UpdateRoomState(txn, []gomatrixserverlib.Event{*ev}, nil); err != nil { + return err + } + return nil + } + + // uncommon case: we need to fetch the full event for each event ID mentioned, then update room state + added, err := d.events.Events(txn, addStateEventIDs) + if err != nil { + return err + } + return d.roomstate.UpdateRoomState(txn, added, removeStateEventIDs) + }) } // PartitionOffsets implements common.PartitionStorer @@ -48,3 +80,22 @@ func (d *SyncServerDatabase) PartitionOffsets(topic string) ([]common.PartitionO func (d *SyncServerDatabase) SetPartitionOffset(topic string, partition int32, offset int64) error { return d.partitions.UpsertPartitionOffset(topic, partition, offset) } + +func runTransaction(db *sql.DB, fn func(txn *sql.Tx) error) (err error) { + txn, err := db.Begin() + if err != nil { + return + } + defer func() { + if r := recover(); r != nil { + txn.Rollback() + panic(r) + } else if err != nil { + txn.Rollback() + } else { + err = txn.Commit() + } + }() + err = fn(txn) + return +} diff --git a/src/github.com/matrix-org/dendrite/clientapi/sync/syncserver.go b/src/github.com/matrix-org/dendrite/clientapi/sync/syncserver.go index 1c97c6a3b..1a0d0d610 100644 --- a/src/github.com/matrix-org/dendrite/clientapi/sync/syncserver.go +++ b/src/github.com/matrix-org/dendrite/clientapi/sync/syncserver.go @@ -65,7 +65,12 @@ func (s *Server) onMessage(msg *sarama.ConsumerMessage) error { if err := s.db.WriteEvent(&ev, output.AddsStateEventIDs, output.RemovesStateEventIDs); err != nil { // panic rather than continue with an inconsistent database - log.WithError(err).WithField("OutputRoomEvent", output).Panicf("roomserver output log: write event failure") + log.WithFields(log.Fields{ + "event": string(ev.JSON()), + log.ErrorKey: err, + "add": output.AddsStateEventIDs, + "del": output.RemovesStateEventIDs, + }).Panicf("roomserver output log: write event failure") return nil }