diff --git a/syncapi/storage/postgres/account_data_table.go b/syncapi/storage/postgres/account_data_table.go index d1e3b527f..58fb21983 100644 --- a/syncapi/storage/postgres/account_data_table.go +++ b/syncapi/storage/postgres/account_data_table.go @@ -21,6 +21,7 @@ import ( "github.com/lib/pq" "github.com/matrix-org/dendrite/common" + "github.com/matrix-org/dendrite/syncapi/storage/tables" "github.com/matrix-org/dendrite/syncapi/types" "github.com/matrix-org/gomatrixserverlib" ) @@ -70,32 +71,33 @@ type accountDataStatements struct { selectMaxAccountDataIDStmt *sql.Stmt } -func (s *accountDataStatements) prepare(db *sql.DB) (err error) { - _, err = db.Exec(accountDataSchema) +func NewPostgresAccountDataTable(db *sql.DB) (tables.AccountData, error) { + s := &accountDataStatements{} + _, err := db.Exec(accountDataSchema) if err != nil { - return + return nil, err } if s.insertAccountDataStmt, err = db.Prepare(insertAccountDataSQL); err != nil { - return + return nil, err } if s.selectAccountDataInRangeStmt, err = db.Prepare(selectAccountDataInRangeSQL); err != nil { - return + return nil, err } if s.selectMaxAccountDataIDStmt, err = db.Prepare(selectMaxAccountDataIDSQL); err != nil { - return + return nil, err } - return + return s, nil } -func (s *accountDataStatements) insertAccountData( - ctx context.Context, +func (s *accountDataStatements) InsertAccountData( + ctx context.Context, txn *sql.Tx, userID, roomID, dataType string, ) (pos types.StreamPosition, err error) { err = s.insertAccountDataStmt.QueryRowContext(ctx, userID, roomID, dataType).Scan(&pos) return } -func (s *accountDataStatements) selectAccountDataInRange( +func (s *accountDataStatements) SelectAccountDataInRange( ctx context.Context, userID string, oldPos, newPos types.StreamPosition, @@ -137,7 +139,7 @@ func (s *accountDataStatements) selectAccountDataInRange( return data, rows.Err() } -func (s *accountDataStatements) selectMaxAccountDataID( +func (s *accountDataStatements) SelectMaxAccountDataID( ctx context.Context, txn *sql.Tx, ) (id int64, err error) { var nullableID sql.NullInt64 diff --git a/syncapi/storage/postgres/output_room_events_table.go b/syncapi/storage/postgres/output_room_events_table.go index 0b53dfa9e..5870bfd52 100644 --- a/syncapi/storage/postgres/output_room_events_table.go +++ b/syncapi/storage/postgres/output_room_events_table.go @@ -22,6 +22,7 @@ import ( "sort" "github.com/matrix-org/dendrite/roomserver/api" + "github.com/matrix-org/dendrite/syncapi/storage/tables" "github.com/matrix-org/dendrite/syncapi/types" "github.com/lib/pq" @@ -120,39 +121,40 @@ type outputRoomEventsStatements struct { selectStateInRangeStmt *sql.Stmt } -func (s *outputRoomEventsStatements) prepare(db *sql.DB) (err error) { - _, err = db.Exec(outputRoomEventsSchema) +func NewPostgresEventsTable(db *sql.DB) (tables.Events, error) { + s := &outputRoomEventsStatements{} + _, err := db.Exec(outputRoomEventsSchema) if err != nil { - return + return nil, err } if s.insertEventStmt, err = db.Prepare(insertEventSQL); err != nil { - return + return nil, err } if s.selectEventsStmt, err = db.Prepare(selectEventsSQL); err != nil { - return + return nil, err } if s.selectMaxEventIDStmt, err = db.Prepare(selectMaxEventIDSQL); err != nil { - return + return nil, err } if s.selectRecentEventsStmt, err = db.Prepare(selectRecentEventsSQL); err != nil { - return + return nil, err } if s.selectRecentEventsForSyncStmt, err = db.Prepare(selectRecentEventsForSyncSQL); err != nil { - return + return nil, err } if s.selectEarlyEventsStmt, err = db.Prepare(selectEarlyEventsSQL); err != nil { - return + return nil, err } if s.selectStateInRangeStmt, err = db.Prepare(selectStateInRangeSQL); err != nil { - return + return nil, err } - return + return s, nil } // selectStateInRange returns the state events between the two given PDU stream positions, exclusive of oldPos, inclusive of newPos. // Results are bucketed based on the room ID. If the same state is overwritten multiple times between the // two positions, only the most recent state is returned. -func (s *outputRoomEventsStatements) selectStateInRange( +func (s *outputRoomEventsStatements) SelectStateInRange( ctx context.Context, txn *sql.Tx, oldPos, newPos types.StreamPosition, stateFilter *gomatrixserverlib.StateFilter, ) (map[string]map[string]bool, map[string]types.StreamEvent, error) { @@ -233,7 +235,7 @@ func (s *outputRoomEventsStatements) selectStateInRange( // MaxID returns the ID of the last inserted event in this table. 'txn' is optional. If it is not supplied, // then this function should only ever be used at startup, as it will race with inserting events if it is // done afterwards. If there are no inserted events, 0 is returned. -func (s *outputRoomEventsStatements) selectMaxEventID( +func (s *outputRoomEventsStatements) SelectMaxEventID( ctx context.Context, txn *sql.Tx, ) (id int64, err error) { var nullableID sql.NullInt64 @@ -247,7 +249,7 @@ func (s *outputRoomEventsStatements) selectMaxEventID( // InsertEvent into the output_room_events table. addState and removeState are an optional list of state event IDs. Returns the position // of the inserted event. -func (s *outputRoomEventsStatements) insertEvent( +func (s *outputRoomEventsStatements) InsertEvent( ctx context.Context, txn *sql.Tx, event *gomatrixserverlib.HeaderedEvent, addState, removeState []string, transactionID *api.TransactionID, excludeFromSync bool, @@ -294,7 +296,7 @@ func (s *outputRoomEventsStatements) insertEvent( // selectRecentEvents returns the most recent events in the given room, up to a maximum of 'limit'. // If onlySyncEvents has a value of true, only returns the events that aren't marked as to exclude // from sync. -func (s *outputRoomEventsStatements) selectRecentEvents( +func (s *outputRoomEventsStatements) SelectRecentEvents( ctx context.Context, txn *sql.Tx, roomID string, fromPos, toPos types.StreamPosition, limit int, chronologicalOrder bool, onlySyncEvents bool, @@ -327,7 +329,7 @@ func (s *outputRoomEventsStatements) selectRecentEvents( // selectEarlyEvents returns the earliest events in the given room, starting // from a given position, up to a maximum of 'limit'. -func (s *outputRoomEventsStatements) selectEarlyEvents( +func (s *outputRoomEventsStatements) SelectEarlyEvents( ctx context.Context, txn *sql.Tx, roomID string, fromPos, toPos types.StreamPosition, limit int, ) ([]types.StreamEvent, error) { @@ -352,7 +354,7 @@ func (s *outputRoomEventsStatements) selectEarlyEvents( // selectEvents returns the events for the given event IDs. If an event is // missing from the database, it will be omitted. -func (s *outputRoomEventsStatements) selectEvents( +func (s *outputRoomEventsStatements) SelectEvents( ctx context.Context, txn *sql.Tx, eventIDs []string, ) ([]types.StreamEvent, error) { stmt := common.TxStmt(txn, s.selectEventsStmt) diff --git a/syncapi/storage/postgres/syncserver.go b/syncapi/storage/postgres/syncserver.go index 9883c3629..4fa08ce5b 100644 --- a/syncapi/storage/postgres/syncserver.go +++ b/syncapi/storage/postgres/syncserver.go @@ -20,9 +20,6 @@ import ( "database/sql" "encoding/json" "fmt" - "time" - - "github.com/sirupsen/logrus" "github.com/matrix-org/dendrite/clientapi/auth/authtypes" "github.com/matrix-org/dendrite/internal/sqlutil" @@ -50,15 +47,12 @@ type stateDelta struct { // SyncServerDatasource represents a sync server datasource which manages // both the database for PDUs and caches for EDUs. type SyncServerDatasource struct { + shared.Database db *sql.DB common.PartitionOffsetStatements - accountData accountDataStatements - events outputRoomEventsStatements roomstate currentRoomStateStatements - eduCache *cache.EDUCache topology outputRoomEventsTopologyStatements backwardExtremities tables.BackwardsExtremities - shared *shared.Database } // NewSyncServerDatasource creates a new sync server database @@ -71,10 +65,12 @@ func NewSyncServerDatasource(dbDataSourceName string, dbProperties common.DbProp if err = d.PartitionOffsetStatements.Prepare(d.db, "syncapi"); err != nil { return nil, err } - if err = d.accountData.prepare(d.db); err != nil { + accountData, err := NewPostgresAccountDataTable(d.db) + if err != nil { return nil, err } - if err = d.events.prepare(d.db); err != nil { + events, err := NewPostgresEventsTable(d.db) + if err != nil { return nil, err } if err = d.roomstate.prepare(d.db); err != nil { @@ -91,10 +87,12 @@ func NewSyncServerDatasource(dbDataSourceName string, dbProperties common.DbProp if err != nil { return nil, err } - d.eduCache = cache.New() - d.shared = &shared.Database{ - DB: d.db, - Invites: invites, + d.Database = shared.Database{ + DB: d.db, + Invites: invites, + AccountData: accountData, + OutputEvents: events, + EDUCache: cache.New(), } return &d, nil } @@ -103,17 +101,6 @@ func (d *SyncServerDatasource) AllJoinedUsersInRooms(ctx context.Context) (map[s return d.roomstate.selectJoinedUsers(ctx) } -func (d *SyncServerDatasource) Events(ctx context.Context, eventIDs []string) ([]gomatrixserverlib.HeaderedEvent, error) { - streamEvents, err := d.events.selectEvents(ctx, nil, eventIDs) - if err != nil { - return nil, err - } - - // We don't include a device here as we only include transaction IDs in - // incremental syncs. - return d.StreamEventsToEvents(nil, streamEvents), nil -} - // handleBackwardExtremities adds this event as a backwards extremity if and only if we do not have all of // the events listed in the event's 'prev_events'. This function also updates the backwards extremities table // to account for the fact that the given event is no longer a backwards extremity, but may be marked as such. @@ -124,7 +111,7 @@ func (d *SyncServerDatasource) handleBackwardExtremities(ctx context.Context, tx // Check if we have all of the event's previous events. If an event is // missing, add it to the room's backward extremities. - prevEvents, err := d.events.selectEvents(ctx, txn, ev.PrevEventIDs()) + prevEvents, err := d.Database.OutputEvents.SelectEvents(ctx, txn, ev.PrevEventIDs()) if err != nil { return err } @@ -157,7 +144,7 @@ func (d *SyncServerDatasource) WriteEvent( ) (pduPosition types.StreamPosition, returnErr error) { returnErr = common.WithTransaction(d.db, func(txn *sql.Tx) error { var err error - pos, err := d.events.insertEvent( + pos, err := d.Database.OutputEvents.InsertEvent( ctx, txn, ev, addStateEventIDs, removeStateEventIDs, transactionID, excludeFromSync, ) if err != nil { @@ -265,36 +252,10 @@ func (d *SyncServerDatasource) GetEventsInTopologicalRange( } // Retrieve the events' contents using their IDs. - events, err = d.events.selectEvents(ctx, nil, eIDs) + events, err = d.Database.OutputEvents.SelectEvents(ctx, nil, eIDs) return } -// GetEventsInStreamingRange retrieves all of the events on a given ordering using the -// given extremities and limit. -func (d *SyncServerDatasource) GetEventsInStreamingRange( - ctx context.Context, - from, to *types.StreamingToken, - roomID string, limit int, - backwardOrdering bool, -) (events []types.StreamEvent, err error) { - if backwardOrdering { - // When using backward ordering, we want the most recent events first. - if events, err = d.events.selectRecentEvents( - ctx, nil, roomID, to.PDUPosition(), from.PDUPosition(), limit, false, false, - ); err != nil { - return - } - } else { - // When using forward ordering, we want the least recent events first. - if events, err = d.events.selectEarlyEvents( - ctx, nil, roomID, from.PDUPosition(), to.PDUPosition(), limit, - ); err != nil { - return - } - } - return events, err -} - func (d *SyncServerDatasource) SyncPosition(ctx context.Context) (types.StreamingToken, error) { return d.syncPositionTx(ctx, nil) } @@ -319,7 +280,7 @@ func (d *SyncServerDatasource) EventsAtTopologicalPosition( return nil, err } - return d.events.selectEvents(ctx, nil, eIDs) + return d.Database.OutputEvents.SelectEvents(ctx, nil, eIDs) } func (d *SyncServerDatasource) EventPositionInTopology( @@ -328,57 +289,29 @@ func (d *SyncServerDatasource) EventPositionInTopology( return d.topology.selectPositionInTopology(ctx, eventID) } -func (d *SyncServerDatasource) SyncStreamPosition(ctx context.Context) (types.StreamPosition, error) { - return d.syncStreamPositionTx(ctx, nil) -} - -func (d *SyncServerDatasource) syncStreamPositionTx( - ctx context.Context, txn *sql.Tx, -) (types.StreamPosition, error) { - maxID, err := d.events.selectMaxEventID(ctx, txn) - if err != nil { - return 0, err - } - maxAccountDataID, err := d.accountData.selectMaxAccountDataID(ctx, txn) - if err != nil { - return 0, err - } - if maxAccountDataID > maxID { - maxID = maxAccountDataID - } - maxInviteID, err := d.shared.Invites.SelectMaxInviteID(ctx, txn) - if err != nil { - return 0, err - } - if maxInviteID > maxID { - maxID = maxInviteID - } - return types.StreamPosition(maxID), nil -} - func (d *SyncServerDatasource) syncPositionTx( ctx context.Context, txn *sql.Tx, ) (sp types.StreamingToken, err error) { - maxEventID, err := d.events.selectMaxEventID(ctx, txn) + maxEventID, err := d.Database.OutputEvents.SelectMaxEventID(ctx, txn) if err != nil { return sp, err } - maxAccountDataID, err := d.accountData.selectMaxAccountDataID(ctx, txn) + maxAccountDataID, err := d.Database.AccountData.SelectMaxAccountDataID(ctx, txn) if err != nil { return sp, err } if maxAccountDataID > maxEventID { maxEventID = maxAccountDataID } - maxInviteID, err := d.shared.Invites.SelectMaxInviteID(ctx, txn) + maxInviteID, err := d.Database.Invites.SelectMaxInviteID(ctx, txn) if err != nil { return sp, err } if maxInviteID > maxEventID { maxEventID = maxInviteID } - sp = types.NewStreamToken(types.StreamPosition(maxEventID), types.StreamPosition(d.eduCache.GetLatestSyncPosition())) + sp = types.NewStreamToken(types.StreamPosition(maxEventID), types.StreamPosition(d.Database.EDUCache.GetLatestSyncPosition())) return } @@ -451,7 +384,7 @@ func (d *SyncServerDatasource) addTypingDeltaToResponse( var ok bool var err error for _, roomID := range joinedRoomIDs { - if typingUsers, updated := d.eduCache.GetTypingUsersIfUpdatedAfter( + if typingUsers, updated := d.Database.EDUCache.GetTypingUsersIfUpdatedAfter( roomID, int64(since.EDUPosition()), ); updated { ev := gomatrixserverlib.ClientEvent{ @@ -580,7 +513,7 @@ func (d *SyncServerDatasource) getResponseWithPDUsForCompleteSync( // TODO: When filters are added, we may need to call this multiple times to get enough events. // See: https://github.com/matrix-org/synapse/blob/v0.19.3/synapse/handlers/sync.py#L316 var recentStreamEvents []types.StreamEvent - recentStreamEvents, err = d.events.selectRecentEvents( + recentStreamEvents, err = d.Database.OutputEvents.SelectRecentEvents( ctx, txn, roomID, types.StreamPosition(0), toPos.PDUPosition(), numRecentEventsPerRoom, true, true, ) @@ -653,54 +586,13 @@ var txReadOnlySnapshot = sql.TxOptions{ ReadOnly: true, } -func (d *SyncServerDatasource) GetAccountDataInRange( - ctx context.Context, userID string, oldPos, newPos types.StreamPosition, - accountDataFilterPart *gomatrixserverlib.EventFilter, -) (map[string][]string, error) { - return d.accountData.selectAccountDataInRange(ctx, userID, oldPos, newPos, accountDataFilterPart) -} - -func (d *SyncServerDatasource) UpsertAccountData( - ctx context.Context, userID, roomID, dataType string, -) (types.StreamPosition, error) { - return d.accountData.insertAccountData(ctx, userID, roomID, dataType) -} - -func (d *SyncServerDatasource) AddInviteEvent( - ctx context.Context, inviteEvent gomatrixserverlib.HeaderedEvent, -) (sp types.StreamPosition, err error) { - return d.shared.AddInviteEvent(ctx, inviteEvent) -} - -func (d *SyncServerDatasource) RetireInviteEvent( - ctx context.Context, inviteEventID string, -) error { - return d.shared.RetireInviteEvent(ctx, inviteEventID) -} - -func (d *SyncServerDatasource) SetTypingTimeoutCallback(fn cache.TimeoutCallbackFn) { - d.eduCache.SetTimeoutCallback(fn) -} - -func (d *SyncServerDatasource) AddTypingUser( - userID, roomID string, expireTime *time.Time, -) types.StreamPosition { - return types.StreamPosition(d.eduCache.AddTypingUser(userID, roomID, expireTime)) -} - -func (d *SyncServerDatasource) RemoveTypingUser( - userID, roomID string, -) types.StreamPosition { - return types.StreamPosition(d.eduCache.RemoveUser(userID, roomID)) -} - func (d *SyncServerDatasource) addInvitesToResponse( ctx context.Context, txn *sql.Tx, userID string, fromPos, toPos types.StreamPosition, res *types.Response, ) error { - invites, err := d.shared.Invites.SelectInviteEventsInRange( + invites, err := d.Database.Invites.SelectInviteEventsInRange( ctx, txn, userID, fromPos, toPos, ) if err != nil { @@ -750,7 +642,7 @@ func (d *SyncServerDatasource) addRoomDeltaToResponse( // This is all "okay" assuming history_visibility == "shared" which it is by default. endPos = delta.membershipPos } - recentStreamEvents, err := d.events.selectRecentEvents( + recentStreamEvents, err := d.Database.OutputEvents.SelectRecentEvents( ctx, txn, delta.roomID, types.StreamPosition(fromPos), types.StreamPosition(endPos), numRecentEventsPerRoom, true, true, ) @@ -841,7 +733,7 @@ func (d *SyncServerDatasource) fetchMissingStateEvents( ) ([]types.StreamEvent, error) { // Fetch from the events table first so we pick up the stream ID for the // event. - events, err := d.events.selectEvents(ctx, txn, eventIDs) + events, err := d.Database.OutputEvents.SelectEvents(ctx, txn, eventIDs) if err != nil { return nil, err } @@ -895,7 +787,7 @@ func (d *SyncServerDatasource) getStateDeltas( var deltas []stateDelta // get all the state events ever between these two positions - stateNeeded, eventMap, err := d.events.selectStateInRange(ctx, txn, fromPos, toPos, stateFilter) + stateNeeded, eventMap, err := d.Database.OutputEvents.SelectStateInRange(ctx, txn, fromPos, toPos, stateFilter) if err != nil { return nil, nil, err } @@ -981,7 +873,7 @@ func (d *SyncServerDatasource) getStateDeltasForFullStateSync( } // Get all the state events ever between these two positions - stateNeeded, eventMap, err := d.events.selectStateInRange(ctx, txn, fromPos, toPos, stateFilter) + stateNeeded, eventMap, err := d.Database.OutputEvents.SelectStateInRange(ctx, txn, fromPos, toPos, stateFilter) if err != nil { return nil, nil, err } @@ -1025,26 +917,6 @@ func (d *SyncServerDatasource) currentStateStreamEventsForRoom( return s, nil } -func (d *SyncServerDatasource) StreamEventsToEvents(device *authtypes.Device, in []types.StreamEvent) []gomatrixserverlib.HeaderedEvent { - out := make([]gomatrixserverlib.HeaderedEvent, len(in)) - for i := 0; i < len(in); i++ { - out[i] = in[i].HeaderedEvent - if device != nil && in[i].TransactionID != nil { - if device.UserID == in[i].Sender() && device.SessionID == in[i].TransactionID.SessionID { - err := out[i].SetUnsignedField( - "transaction_id", in[i].TransactionID.TransactionID, - ) - if err != nil { - logrus.WithFields(logrus.Fields{ - "event_id": out[i].EventID(), - }).WithError(err).Warnf("Failed to add transaction ID to event") - } - } - } - } - return out -} - // There may be some overlap where events in stateEvents are already in recentEvents, so filter // them out so we don't include them twice in the /sync response. They should be in recentEvents // only, so clients get to the correct state once they have rolled forward. diff --git a/syncapi/storage/shared/syncserver.go b/syncapi/storage/shared/syncserver.go index e89976df6..e9fed758b 100644 --- a/syncapi/storage/shared/syncserver.go +++ b/syncapi/storage/shared/syncserver.go @@ -3,20 +3,117 @@ package shared import ( "context" "database/sql" + "time" + "github.com/matrix-org/dendrite/clientapi/auth/authtypes" "github.com/matrix-org/dendrite/common" + "github.com/matrix-org/dendrite/eduserver/cache" "github.com/matrix-org/dendrite/syncapi/storage/tables" "github.com/matrix-org/dendrite/syncapi/types" "github.com/matrix-org/gomatrixserverlib" + "github.com/sirupsen/logrus" ) // Database is a temporary struct until we have made syncserver.go the same for both pq/sqlite // For now this contains the shared functions type Database struct { - DB *sql.DB - Invites tables.Invites + DB *sql.DB + Invites tables.Invites + AccountData tables.AccountData + OutputEvents tables.Events + EDUCache *cache.EDUCache } +// Events lookups a list of event by their event ID. +// Returns a list of events matching the requested IDs found in the database. +// If an event is not found in the database then it will be omitted from the list. +// Returns an error if there was a problem talking with the database. +// Does not include any transaction IDs in the returned events. +func (d *Database) Events(ctx context.Context, eventIDs []string) ([]gomatrixserverlib.HeaderedEvent, error) { + streamEvents, err := d.OutputEvents.SelectEvents(ctx, nil, eventIDs) + if err != nil { + return nil, err + } + + // We don't include a device here as we only include transaction IDs in + // incremental syncs. + return d.StreamEventsToEvents(nil, streamEvents), nil +} + +// GetEventsInStreamingRange retrieves all of the events on a given ordering using the +// given extremities and limit. +func (d *Database) GetEventsInStreamingRange( + ctx context.Context, + from, to *types.StreamingToken, + roomID string, limit int, + backwardOrdering bool, +) (events []types.StreamEvent, err error) { + if backwardOrdering { + // When using backward ordering, we want the most recent events first. + if events, err = d.OutputEvents.SelectRecentEvents( + ctx, nil, roomID, to.PDUPosition(), from.PDUPosition(), limit, false, false, + ); err != nil { + return + } + } else { + // When using forward ordering, we want the least recent events first. + if events, err = d.OutputEvents.SelectEarlyEvents( + ctx, nil, roomID, from.PDUPosition(), to.PDUPosition(), limit, + ); err != nil { + return + } + } + return events, err +} + +func (d *Database) AddTypingUser( + userID, roomID string, expireTime *time.Time, +) types.StreamPosition { + return types.StreamPosition(d.EDUCache.AddTypingUser(userID, roomID, expireTime)) +} + +func (d *Database) RemoveTypingUser( + userID, roomID string, +) types.StreamPosition { + return types.StreamPosition(d.EDUCache.RemoveUser(userID, roomID)) +} + +func (d *Database) SetTypingTimeoutCallback(fn cache.TimeoutCallbackFn) { + d.EDUCache.SetTimeoutCallback(fn) +} + +func (d *Database) SyncStreamPosition(ctx context.Context) (types.StreamPosition, error) { + var maxID int64 + var err error + err = common.WithTransaction(d.DB, func(txn *sql.Tx) error { + maxID, err = d.OutputEvents.SelectMaxEventID(ctx, txn) + if err != nil { + return err + } + var maxAccountDataID int64 + maxAccountDataID, err = d.AccountData.SelectMaxAccountDataID(ctx, txn) + if err != nil { + return err + } + if maxAccountDataID > maxID { + maxID = maxAccountDataID + } + var maxInviteID int64 + maxInviteID, err = d.Invites.SelectMaxInviteID(ctx, txn) + if err != nil { + return err + } + if maxInviteID > maxID { + maxID = maxInviteID + } + return nil + }) + return types.StreamPosition(maxID), err +} + +// AddInviteEvent stores a new invite event for a user. +// If the invite was successfully stored this returns the stream ID it was stored at. +// Returns an error if there was a problem communicating with the database. func (d *Database) AddInviteEvent( ctx context.Context, inviteEvent gomatrixserverlib.HeaderedEvent, ) (sp types.StreamPosition, err error) { @@ -27,6 +124,8 @@ func (d *Database) AddInviteEvent( return } +// RetireInviteEvent removes an old invite event from the database. +// Returns an error if there was a problem communicating with the database. func (d *Database) RetireInviteEvent( ctx context.Context, inviteEventID string, ) error { @@ -35,3 +134,51 @@ func (d *Database) RetireInviteEvent( err := d.Invites.DeleteInviteEvent(ctx, inviteEventID) return err } + +// GetAccountDataInRange returns all account data for a given user inserted or +// updated between two given positions +// Returns a map following the format data[roomID] = []dataTypes +// If no data is retrieved, returns an empty map +// If there was an issue with the retrieval, returns an error +func (d *Database) GetAccountDataInRange( + ctx context.Context, userID string, oldPos, newPos types.StreamPosition, + accountDataFilterPart *gomatrixserverlib.EventFilter, +) (map[string][]string, error) { + return d.AccountData.SelectAccountDataInRange(ctx, userID, oldPos, newPos, accountDataFilterPart) +} + +// UpsertAccountData keeps track of new or updated account data, by saving the type +// of the new/updated data, and the user ID and room ID the data is related to (empty) +// room ID means the data isn't specific to any room) +// If no data with the given type, user ID and room ID exists in the database, +// creates a new row, else update the existing one +// Returns an error if there was an issue with the upsert +func (d *Database) UpsertAccountData( + ctx context.Context, userID, roomID, dataType string, +) (sp types.StreamPosition, err error) { + err = common.WithTransaction(d.DB, func(txn *sql.Tx) error { + sp, err = d.AccountData.InsertAccountData(ctx, txn, userID, roomID, dataType) + return err + }) + return +} + +func (d *Database) StreamEventsToEvents(device *authtypes.Device, in []types.StreamEvent) []gomatrixserverlib.HeaderedEvent { + out := make([]gomatrixserverlib.HeaderedEvent, len(in)) + for i := 0; i < len(in); i++ { + out[i] = in[i].HeaderedEvent + if device != nil && in[i].TransactionID != nil { + if device.UserID == in[i].Sender() && device.SessionID == in[i].TransactionID.SessionID { + err := out[i].SetUnsignedField( + "transaction_id", in[i].TransactionID.TransactionID, + ) + if err != nil { + logrus.WithFields(logrus.Fields{ + "event_id": out[i].EventID(), + }).WithError(err).Warnf("Failed to add transaction ID to event") + } + } + } + } + return out +} diff --git a/syncapi/storage/sqlite3/account_data_table.go b/syncapi/storage/sqlite3/account_data_table.go index 3dbf961b4..e5f2417b8 100644 --- a/syncapi/storage/sqlite3/account_data_table.go +++ b/syncapi/storage/sqlite3/account_data_table.go @@ -21,6 +21,7 @@ import ( "github.com/matrix-org/dendrite/common" + "github.com/matrix-org/dendrite/syncapi/storage/tables" "github.com/matrix-org/dendrite/syncapi/types" "github.com/matrix-org/gomatrixserverlib" ) @@ -55,25 +56,27 @@ type accountDataStatements struct { selectAccountDataInRangeStmt *sql.Stmt } -func (s *accountDataStatements) prepare(db *sql.DB, streamID *streamIDStatements) (err error) { - s.streamIDStatements = streamID - _, err = db.Exec(accountDataSchema) +func NewSqliteAccountDataTable(db *sql.DB, streamID *streamIDStatements) (tables.AccountData, error) { + s := &accountDataStatements{ + streamIDStatements: streamID, + } + _, err := db.Exec(accountDataSchema) if err != nil { - return + return nil, err } if s.insertAccountDataStmt, err = db.Prepare(insertAccountDataSQL); err != nil { - return + return nil, err } if s.selectMaxAccountDataIDStmt, err = db.Prepare(selectMaxAccountDataIDSQL); err != nil { - return + return nil, err } if s.selectAccountDataInRangeStmt, err = db.Prepare(selectAccountDataInRangeSQL); err != nil { - return + return nil, err } - return + return s, nil } -func (s *accountDataStatements) insertAccountData( +func (s *accountDataStatements) InsertAccountData( ctx context.Context, txn *sql.Tx, userID, roomID, dataType string, ) (pos types.StreamPosition, err error) { @@ -85,7 +88,7 @@ func (s *accountDataStatements) insertAccountData( return } -func (s *accountDataStatements) selectAccountDataInRange( +func (s *accountDataStatements) SelectAccountDataInRange( ctx context.Context, userID string, oldPos, newPos types.StreamPosition, @@ -146,7 +149,7 @@ func (s *accountDataStatements) selectAccountDataInRange( return data, nil } -func (s *accountDataStatements) selectMaxAccountDataID( +func (s *accountDataStatements) SelectMaxAccountDataID( ctx context.Context, txn *sql.Tx, ) (id int64, err error) { var nullableID sql.NullInt64 diff --git a/syncapi/storage/sqlite3/output_room_events_table.go b/syncapi/storage/sqlite3/output_room_events_table.go index 08299f64b..d3e88a549 100644 --- a/syncapi/storage/sqlite3/output_room_events_table.go +++ b/syncapi/storage/sqlite3/output_room_events_table.go @@ -22,6 +22,7 @@ import ( "sort" "github.com/matrix-org/dendrite/roomserver/api" + "github.com/matrix-org/dendrite/syncapi/storage/tables" "github.com/matrix-org/dendrite/syncapi/types" "github.com/matrix-org/dendrite/common" @@ -109,40 +110,42 @@ type outputRoomEventsStatements struct { selectStateInRangeStmt *sql.Stmt } -func (s *outputRoomEventsStatements) prepare(db *sql.DB, streamID *streamIDStatements) (err error) { - s.streamIDStatements = streamID - _, err = db.Exec(outputRoomEventsSchema) +func NewSqliteEventsTable(db *sql.DB, streamID *streamIDStatements) (tables.Events, error) { + s := &outputRoomEventsStatements{ + streamIDStatements: streamID, + } + _, err := db.Exec(outputRoomEventsSchema) if err != nil { - return + return nil, err } if s.insertEventStmt, err = db.Prepare(insertEventSQL); err != nil { - return + return nil, err } if s.selectEventsStmt, err = db.Prepare(selectEventsSQL); err != nil { - return + return nil, err } if s.selectMaxEventIDStmt, err = db.Prepare(selectMaxEventIDSQL); err != nil { - return + return nil, err } if s.selectRecentEventsStmt, err = db.Prepare(selectRecentEventsSQL); err != nil { - return + return nil, err } if s.selectRecentEventsForSyncStmt, err = db.Prepare(selectRecentEventsForSyncSQL); err != nil { - return + return nil, err } if s.selectEarlyEventsStmt, err = db.Prepare(selectEarlyEventsSQL); err != nil { - return + return nil, err } if s.selectStateInRangeStmt, err = db.Prepare(selectStateInRangeSQL); err != nil { - return + return nil, err } - return + return s, nil } // selectStateInRange returns the state events between the two given PDU stream positions, exclusive of oldPos, inclusive of newPos. // Results are bucketed based on the room ID. If the same state is overwritten multiple times between the // two positions, only the most recent state is returned. -func (s *outputRoomEventsStatements) selectStateInRange( +func (s *outputRoomEventsStatements) SelectStateInRange( ctx context.Context, txn *sql.Tx, oldPos, newPos types.StreamPosition, stateFilterPart *gomatrixserverlib.StateFilter, ) (map[string]map[string]bool, map[string]types.StreamEvent, error) { @@ -229,7 +232,7 @@ func (s *outputRoomEventsStatements) selectStateInRange( // MaxID returns the ID of the last inserted event in this table. 'txn' is optional. If it is not supplied, // then this function should only ever be used at startup, as it will race with inserting events if it is // done afterwards. If there are no inserted events, 0 is returned. -func (s *outputRoomEventsStatements) selectMaxEventID( +func (s *outputRoomEventsStatements) SelectMaxEventID( ctx context.Context, txn *sql.Tx, ) (id int64, err error) { var nullableID sql.NullInt64 @@ -243,7 +246,7 @@ func (s *outputRoomEventsStatements) selectMaxEventID( // InsertEvent into the output_room_events table. addState and removeState are an optional list of state event IDs. Returns the position // of the inserted event. -func (s *outputRoomEventsStatements) insertEvent( +func (s *outputRoomEventsStatements) InsertEvent( ctx context.Context, txn *sql.Tx, event *gomatrixserverlib.HeaderedEvent, addState, removeState []string, transactionID *api.TransactionID, excludeFromSync bool, @@ -306,7 +309,7 @@ func (s *outputRoomEventsStatements) insertEvent( // selectRecentEvents returns the most recent events in the given room, up to a maximum of 'limit'. // If onlySyncEvents has a value of true, only returns the events that aren't marked as to exclude // from sync. -func (s *outputRoomEventsStatements) selectRecentEvents( +func (s *outputRoomEventsStatements) SelectRecentEvents( ctx context.Context, txn *sql.Tx, roomID string, fromPos, toPos types.StreamPosition, limit int, chronologicalOrder bool, onlySyncEvents bool, @@ -340,7 +343,7 @@ func (s *outputRoomEventsStatements) selectRecentEvents( // selectEarlyEvents returns the earliest events in the given room, starting // from a given position, up to a maximum of 'limit'. -func (s *outputRoomEventsStatements) selectEarlyEvents( +func (s *outputRoomEventsStatements) SelectEarlyEvents( ctx context.Context, txn *sql.Tx, roomID string, fromPos, toPos types.StreamPosition, limit int, ) ([]types.StreamEvent, error) { @@ -365,7 +368,7 @@ func (s *outputRoomEventsStatements) selectEarlyEvents( // selectEvents returns the events for the given event IDs. If an event is // missing from the database, it will be omitted. -func (s *outputRoomEventsStatements) selectEvents( +func (s *outputRoomEventsStatements) SelectEvents( ctx context.Context, txn *sql.Tx, eventIDs []string, ) ([]types.StreamEvent, error) { var returnEvents []types.StreamEvent diff --git a/syncapi/storage/sqlite3/syncserver.go b/syncapi/storage/sqlite3/syncserver.go index a2253dcd1..0da05eab3 100644 --- a/syncapi/storage/sqlite3/syncserver.go +++ b/syncapi/storage/sqlite3/syncserver.go @@ -22,7 +22,6 @@ import ( "errors" "fmt" "net/url" - "time" "github.com/sirupsen/logrus" @@ -53,16 +52,13 @@ type stateDelta struct { // SyncServerDatasource represents a sync server datasource which manages // both the database for PDUs and caches for EDUs. type SyncServerDatasource struct { + shared.Database db *sql.DB common.PartitionOffsetStatements streamID streamIDStatements - accountData accountDataStatements - events outputRoomEventsStatements roomstate currentRoomStateStatements - eduCache *cache.EDUCache topology outputRoomEventsTopologyStatements backwardExtremities tables.BackwardsExtremities - shared *shared.Database } // NewSyncServerDatasource creates a new sync server database @@ -87,7 +83,6 @@ func NewSyncServerDatasource(dataSourceName string) (*SyncServerDatasource, erro if err = d.prepare(); err != nil { return nil, err } - d.eduCache = cache.New() return &d, nil } @@ -98,10 +93,12 @@ func (d *SyncServerDatasource) prepare() (err error) { if err = d.streamID.prepare(d.db); err != nil { return err } - if err = d.accountData.prepare(d.db, &d.streamID); err != nil { + accountData, err := NewSqliteAccountDataTable(d.db, &d.streamID) + if err != nil { return err } - if err = d.events.prepare(d.db, &d.streamID); err != nil { + events, err := NewSqliteEventsTable(d.db, &d.streamID) + if err != nil { return err } if err = d.roomstate.prepare(d.db, &d.streamID); err != nil { @@ -118,9 +115,12 @@ func (d *SyncServerDatasource) prepare() (err error) { if err != nil { return err } - d.shared = &shared.Database{ - DB: d.db, - Invites: invites, + d.Database = shared.Database{ + DB: d.db, + Invites: invites, + AccountData: accountData, + OutputEvents: events, + EDUCache: cache.New(), } return nil } @@ -130,22 +130,6 @@ func (d *SyncServerDatasource) AllJoinedUsersInRooms(ctx context.Context) (map[s return d.roomstate.selectJoinedUsers(ctx) } -// Events lookups a list of event by their event ID. -// Returns a list of events matching the requested IDs found in the database. -// If an event is not found in the database then it will be omitted from the list. -// Returns an error if there was a problem talking with the database. -// Does not include any transaction IDs in the returned events. -func (d *SyncServerDatasource) Events(ctx context.Context, eventIDs []string) ([]gomatrixserverlib.HeaderedEvent, error) { - streamEvents, err := d.events.selectEvents(ctx, nil, eventIDs) - if err != nil { - return nil, err - } - - // We don't include a device here as we only include transaction IDs in - // incremental syncs. - return d.StreamEventsToEvents(nil, streamEvents), nil -} - // handleBackwardExtremities adds this event as a backwards extremity if and only if we do not have all of // the events listed in the event's 'prev_events'. This function also updates the backwards extremities table // to account for the fact that the given event is no longer a backwards extremity, but may be marked as such. @@ -156,7 +140,7 @@ func (d *SyncServerDatasource) handleBackwardExtremities(ctx context.Context, tx // Check if we have all of the event's previous events. If an event is // missing, add it to the room's backward extremities. - prevEvents, err := d.events.selectEvents(ctx, txn, ev.PrevEventIDs()) + prevEvents, err := d.Database.OutputEvents.SelectEvents(ctx, txn, ev.PrevEventIDs()) if err != nil { return err } @@ -192,7 +176,7 @@ func (d *SyncServerDatasource) WriteEvent( ) (pduPosition types.StreamPosition, returnErr error) { returnErr = common.WithTransaction(d.db, func(txn *sql.Tx) error { var err error - pos, err := d.events.insertEvent( + pos, err := d.Database.OutputEvents.InsertEvent( ctx, txn, ev, addStateEventIDs, removeStateEventIDs, transactionID, excludeFromSync, ) if err != nil { @@ -253,6 +237,19 @@ func (d *SyncServerDatasource) updateRoomState( return nil } +// SyncPosition returns the latest positions for syncing. +func (d *SyncServerDatasource) SyncPosition(ctx context.Context) (tok types.StreamingToken, err error) { + err = common.WithTransaction(d.db, func(txn *sql.Tx) error { + pos, err := d.syncPositionTx(ctx, txn) + if err != nil { + return err + } + tok = *pos + return nil + }) + return +} + // GetStateEvent returns the Matrix state event of a given type for a given room with a given state key // If no event could be found, returns nil // If there was an issue during the retrieval, returns an error @@ -309,46 +306,7 @@ func (d *SyncServerDatasource) GetEventsInTopologicalRange( } // Retrieve the events' contents using their IDs. - events, err = d.events.selectEvents(ctx, nil, eIDs) - return -} - -// GetEventsInStreamingRange retrieves all of the events on a given ordering using the -// given extremities and limit. -func (d *SyncServerDatasource) GetEventsInStreamingRange( - ctx context.Context, - from, to *types.StreamingToken, - roomID string, limit int, - backwardOrdering bool, -) (events []types.StreamEvent, err error) { - if backwardOrdering { - // When using backward ordering, we want the most recent events first. - if events, err = d.events.selectRecentEvents( - ctx, nil, roomID, to.PDUPosition(), from.PDUPosition(), limit, false, false, - ); err != nil { - return - } - } else { - // When using forward ordering, we want the least recent events first. - if events, err = d.events.selectEarlyEvents( - ctx, nil, roomID, from.PDUPosition(), to.PDUPosition(), limit, - ); err != nil { - return - } - } - return events, err -} - -// SyncPosition returns the latest positions for syncing. -func (d *SyncServerDatasource) SyncPosition(ctx context.Context) (tok types.StreamingToken, err error) { - err = common.WithTransaction(d.db, func(txn *sql.Tx) error { - pos, err := d.syncPositionTx(ctx, txn) - if err != nil { - return err - } - tok = *pos - return nil - }) + events, err = d.Database.OutputEvents.SelectEvents(ctx, nil, eIDs) return } @@ -378,7 +336,7 @@ func (d *SyncServerDatasource) EventsAtTopologicalPosition( return nil, err } - return d.events.selectEvents(ctx, nil, eIDs) + return d.Database.OutputEvents.SelectEvents(ctx, nil, eIDs) } func (d *SyncServerDatasource) EventPositionInTopology( @@ -399,18 +357,18 @@ func (d *SyncServerDatasource) SyncStreamPosition(ctx context.Context) (pos type func (d *SyncServerDatasource) syncStreamPositionTx( ctx context.Context, txn *sql.Tx, ) (types.StreamPosition, error) { - maxID, err := d.events.selectMaxEventID(ctx, txn) + maxID, err := d.Database.OutputEvents.SelectMaxEventID(ctx, txn) if err != nil { return 0, err } - maxAccountDataID, err := d.accountData.selectMaxAccountDataID(ctx, txn) + maxAccountDataID, err := d.Database.AccountData.SelectMaxAccountDataID(ctx, txn) if err != nil { return 0, err } if maxAccountDataID > maxID { maxID = maxAccountDataID } - maxInviteID, err := d.shared.Invites.SelectMaxInviteID(ctx, txn) + maxInviteID, err := d.Database.Invites.SelectMaxInviteID(ctx, txn) if err != nil { return 0, err } @@ -424,18 +382,18 @@ func (d *SyncServerDatasource) syncPositionTx( ctx context.Context, txn *sql.Tx, ) (*types.StreamingToken, error) { - maxEventID, err := d.events.selectMaxEventID(ctx, txn) + maxEventID, err := d.Database.OutputEvents.SelectMaxEventID(ctx, txn) if err != nil { return nil, err } - maxAccountDataID, err := d.accountData.selectMaxAccountDataID(ctx, txn) + maxAccountDataID, err := d.Database.AccountData.SelectMaxAccountDataID(ctx, txn) if err != nil { return nil, err } if maxAccountDataID > maxEventID { maxEventID = maxAccountDataID } - maxInviteID, err := d.shared.Invites.SelectMaxInviteID(ctx, txn) + maxInviteID, err := d.Database.Invites.SelectMaxInviteID(ctx, txn) if err != nil { return nil, err } @@ -444,7 +402,7 @@ func (d *SyncServerDatasource) syncPositionTx( } sp := types.NewStreamToken( types.StreamPosition(maxEventID), - types.StreamPosition(d.eduCache.GetLatestSyncPosition()), + types.StreamPosition(d.Database.EDUCache.GetLatestSyncPosition()), ) return &sp, nil } @@ -518,7 +476,7 @@ func (d *SyncServerDatasource) addTypingDeltaToResponse( var ok bool var err error for _, roomID := range joinedRoomIDs { - if typingUsers, updated := d.eduCache.GetTypingUsersIfUpdatedAfter( + if typingUsers, updated := d.Database.EDUCache.GetTypingUsersIfUpdatedAfter( roomID, int64(since.EDUPosition()), ); updated { ev := gomatrixserverlib.ClientEvent{ @@ -654,7 +612,7 @@ func (d *SyncServerDatasource) getResponseWithPDUsForCompleteSync( // TODO: When filters are added, we may need to call this multiple times to get enough events. // See: https://github.com/matrix-org/synapse/blob/v0.19.3/synapse/handlers/sync.py#L316 var recentStreamEvents []types.StreamEvent - recentStreamEvents, err = d.events.selectRecentEvents( + recentStreamEvents, err = d.Database.OutputEvents.SelectRecentEvents( ctx, txn, roomID, types.StreamPosition(0), toPos.PDUPosition(), numRecentEventsPerRoom, true, true, ) @@ -729,78 +687,13 @@ var txReadOnlySnapshot = sql.TxOptions{ ReadOnly: true, } -// GetAccountDataInRange returns all account data for a given user inserted or -// updated between two given positions -// Returns a map following the format data[roomID] = []dataTypes -// If no data is retrieved, returns an empty map -// If there was an issue with the retrieval, returns an error -func (d *SyncServerDatasource) GetAccountDataInRange( - ctx context.Context, userID string, oldPos, newPos types.StreamPosition, - accountDataFilterPart *gomatrixserverlib.EventFilter, -) (map[string][]string, error) { - return d.accountData.selectAccountDataInRange(ctx, userID, oldPos, newPos, accountDataFilterPart) -} - -// UpsertAccountData keeps track of new or updated account data, by saving the type -// of the new/updated data, and the user ID and room ID the data is related to (empty) -// room ID means the data isn't specific to any room) -// If no data with the given type, user ID and room ID exists in the database, -// creates a new row, else update the existing one -// Returns an error if there was an issue with the upsert -func (d *SyncServerDatasource) UpsertAccountData( - ctx context.Context, userID, roomID, dataType string, -) (sp types.StreamPosition, err error) { - err = common.WithTransaction(d.db, func(txn *sql.Tx) error { - sp, err = d.accountData.insertAccountData(ctx, txn, userID, roomID, dataType) - return err - }) - return -} - -// AddInviteEvent stores a new invite event for a user. -// If the invite was successfully stored this returns the stream ID it was stored at. -// Returns an error if there was a problem communicating with the database. -func (d *SyncServerDatasource) AddInviteEvent( - ctx context.Context, inviteEvent gomatrixserverlib.HeaderedEvent, -) (sp types.StreamPosition, err error) { - return d.shared.AddInviteEvent(ctx, inviteEvent) -} - -// RetireInviteEvent removes an old invite event from the database. -// Returns an error if there was a problem communicating with the database. -func (d *SyncServerDatasource) RetireInviteEvent( - ctx context.Context, inviteEventID string, -) error { - return d.shared.RetireInviteEvent(ctx, inviteEventID) -} - -func (d *SyncServerDatasource) SetTypingTimeoutCallback(fn cache.TimeoutCallbackFn) { - d.eduCache.SetTimeoutCallback(fn) -} - -// AddTypingUser adds a typing user to the typing cache. -// Returns the newly calculated sync position for typing notifications. -func (d *SyncServerDatasource) AddTypingUser( - userID, roomID string, expireTime *time.Time, -) types.StreamPosition { - return types.StreamPosition(d.eduCache.AddTypingUser(userID, roomID, expireTime)) -} - -// RemoveTypingUser removes a typing user from the typing cache. -// Returns the newly calculated sync position for typing notifications. -func (d *SyncServerDatasource) RemoveTypingUser( - userID, roomID string, -) types.StreamPosition { - return types.StreamPosition(d.eduCache.RemoveUser(userID, roomID)) -} - func (d *SyncServerDatasource) addInvitesToResponse( ctx context.Context, txn *sql.Tx, userID string, fromPos, toPos types.StreamPosition, res *types.Response, ) error { - invites, err := d.shared.Invites.SelectInviteEventsInRange( + invites, err := d.Database.Invites.SelectInviteEventsInRange( ctx, txn, userID, fromPos, toPos, ) if err != nil { @@ -853,7 +746,7 @@ func (d *SyncServerDatasource) addRoomDeltaToResponse( // This is all "okay" assuming history_visibility == "shared" which it is by default. endPos = delta.membershipPos } - recentStreamEvents, err := d.events.selectRecentEvents( + recentStreamEvents, err := d.Database.OutputEvents.SelectRecentEvents( ctx, txn, delta.roomID, types.StreamPosition(fromPos), types.StreamPosition(endPos), numRecentEventsPerRoom, true, true, ) @@ -943,7 +836,7 @@ func (d *SyncServerDatasource) fetchMissingStateEvents( ) ([]types.StreamEvent, error) { // Fetch from the events table first so we pick up the stream ID for the // event. - events, err := d.events.selectEvents(ctx, txn, eventIDs) + events, err := d.Database.OutputEvents.SelectEvents(ctx, txn, eventIDs) if err != nil { return nil, err } @@ -997,7 +890,7 @@ func (d *SyncServerDatasource) getStateDeltas( var deltas []stateDelta // get all the state events ever between these two positions - stateNeeded, eventMap, err := d.events.selectStateInRange(ctx, txn, fromPos, toPos, stateFilterPart) + stateNeeded, eventMap, err := d.Database.OutputEvents.SelectStateInRange(ctx, txn, fromPos, toPos, stateFilterPart) if err != nil { return nil, nil, err } @@ -1083,7 +976,7 @@ func (d *SyncServerDatasource) getStateDeltasForFullStateSync( } // Get all the state events ever between these two positions - stateNeeded, eventMap, err := d.events.selectStateInRange(ctx, txn, fromPos, toPos, stateFilterPart) + stateNeeded, eventMap, err := d.Database.OutputEvents.SelectStateInRange(ctx, txn, fromPos, toPos, stateFilterPart) if err != nil { return nil, nil, err } diff --git a/syncapi/storage/tables/interface.go b/syncapi/storage/tables/interface.go index 1a9940524..ffd502165 100644 --- a/syncapi/storage/tables/interface.go +++ b/syncapi/storage/tables/interface.go @@ -4,13 +4,29 @@ import ( "context" "database/sql" + "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/syncapi/types" "github.com/matrix-org/gomatrixserverlib" ) +type AccountData interface { + InsertAccountData(ctx context.Context, txn *sql.Tx, userID, roomID, dataType string) (pos types.StreamPosition, err error) + SelectAccountDataInRange(ctx context.Context, userID string, oldPos, newPos types.StreamPosition, accountDataEventFilter *gomatrixserverlib.EventFilter) (data map[string][]string, err error) + SelectMaxAccountDataID(ctx context.Context, txn *sql.Tx) (id int64, err error) +} + type Invites interface { InsertInviteEvent(ctx context.Context, txn *sql.Tx, inviteEvent gomatrixserverlib.HeaderedEvent) (streamPos types.StreamPosition, err error) DeleteInviteEvent(ctx context.Context, inviteEventID string) error SelectInviteEventsInRange(ctx context.Context, txn *sql.Tx, targetUserID string, startPos, endPos types.StreamPosition) (map[string]gomatrixserverlib.HeaderedEvent, error) SelectMaxInviteID(ctx context.Context, txn *sql.Tx) (id int64, err error) } + +type Events interface { + SelectStateInRange(ctx context.Context, txn *sql.Tx, oldPos, newPos types.StreamPosition, stateFilter *gomatrixserverlib.StateFilter) (map[string]map[string]bool, map[string]types.StreamEvent, error) + SelectMaxEventID(ctx context.Context, txn *sql.Tx) (id int64, err error) + InsertEvent(ctx context.Context, txn *sql.Tx, event *gomatrixserverlib.HeaderedEvent, addState, removeState []string, transactionID *api.TransactionID, excludeFromSync bool) (streamPos types.StreamPosition, err error) + SelectRecentEvents(ctx context.Context, txn *sql.Tx, roomID string, fromPos, toPos types.StreamPosition, limit int, chronologicalOrder bool, onlySyncEvents bool) ([]types.StreamEvent, error) + SelectEarlyEvents(ctx context.Context, txn *sql.Tx, roomID string, fromPos, toPos types.StreamPosition, limit int) ([]types.StreamEvent, error) + SelectEvents(ctx context.Context, txn *sql.Tx, eventIDs []string) ([]types.StreamEvent, error) +}