Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
David Robertson committed Apr 17, 2023
1 parent dba5ce3 commit e1a49bb
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 7 deletions.
51 changes: 44 additions & 7 deletions state/accumulator.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,19 +155,25 @@ func (a *Accumulator) Initialise(roomID string, state []json.RawMessage) (res In
return res, nil
}
err = sqlutil.WithTransaction(a.db, func(txn *sqlx.Tx) error {
// Attempt to short-circuit. This has to be done inside a transaction to make sure
// This has to be done inside a transaction to make sure
// we don't race with multiple calls to Initialise with the same room ID.
snapshotID, err := a.roomsTable.CurrentAfterSnapshotID(txn, roomID)
if err != nil {
return fmt.Errorf("error fetching snapshot id for room %s: %s", roomID, err)
}
if snapshotID > 0 {
// we only initialise rooms once
logger.Info().Str("room_id", roomID).Int64("snapshot_id", snapshotID).Msg("Accumulator.Initialise called but current snapshot already exists, bailing early")
return nil
const warningMsg = "Accumulator.Initialise called when current snapshot already exists. Patching in events"
logger.Warn().Str("room_id", roomID).Int64("snapshot_id", snapshotID).Msg(warningMsg)
sentry.WithScope(func(scope *sentry.Scope) {
scope.SetContext("sliding-sync", map[string]interface{}{
"room_id": roomID,
"snapshot_id": snapshotID,
})
sentry.CaptureException(fmt.Errorf(warningMsg))
})
}

// Insert the events
// Parse the events
events := make([]Event, len(state))
for i := range events {
events[i] = Event{
Expand All @@ -179,7 +185,38 @@ func (a *Accumulator) Initialise(roomID string, state []json.RawMessage) (res In
if err := ensureFieldsSet(events); err != nil {
return fmt.Errorf("events malformed: %s", err)
}
eventIDToNID, err := a.eventsTable.Insert(txn, events, false)

// Determine which events should be inserted.
var insertEvents []Event
if snapshotID == 0 {
insertEvents = events
} else {
// Select the events which do not have a NID
eventIDs := make([]string, len(events))
for i := range events {
eventIDs[i] = events[i].ID
}
unknownEventIDs, err := a.eventsTable.SelectUnknownEventIDs(txn, eventIDs)
if err != nil {
return fmt.Errorf("error determing which event IDs are unknown")
}
if len(unknownEventIDs) == 0 {
// All events known. Odd, but nothing to do.
return nil
}
Outer:
for i := range events {
for j := range unknownEventIDs {
if events[i].ID == unknownEventIDs[j] {
insertEvents = append(insertEvents, events[i])
continue Outer
}
}
}
}

// Insert new events
eventIDToNID, err := a.eventsTable.Insert(txn, insertEvents, false)
if err != nil {
return fmt.Errorf("failed to insert events: %w", err)
}
Expand All @@ -192,7 +229,7 @@ func (a *Accumulator) Initialise(roomID string, state []json.RawMessage) (res In
return nil
}

// pull out the event NIDs we just inserted
// Split the new NIDs in to membership and nonmemberships.
membershipEventIDs := make(map[string]struct{}, len(events))
for _, event := range events {
if event.Type == "m.room.member" {
Expand Down
19 changes: 19 additions & 0 deletions state/event_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,25 @@ func (t *EventTable) SelectStrippedEventsByIDs(txn *sqlx.Tx, verifyAll bool, ids

}

// SelectUnknownEventIDs accepts a list of event IDs and returns the subset of those which are not known to the DB.
// The order of event IDs in the return value is not guaranteed.
func (t *EventTable) SelectUnknownEventIDs(txn *sqlx.Tx, maybeUnknownEventIDs []string) ([]string, error) {
queryStr := `
WITH maybe_unknown_events(event_id) AS (SELECT unnest($1::text[]))
SELECT event_id
FROM maybe_unknown_events LEFT JOIN syncv3_events USING(event_id)
WHERE event_nid IS NULL;`

var unknownEventIDs []string
var err error
if txn != nil {
err = txn.Select(&unknownEventIDs, queryStr, maybeUnknownEventIDs)
} else {
err = t.db.Select(&unknownEventIDs, queryStr, maybeUnknownEventIDs)
}
return unknownEventIDs, err
}

// UpdateBeforeSnapshotID sets the before_state_snapshot_id field to `snapID` for the given NIDs.
func (t *EventTable) UpdateBeforeSnapshotID(txn *sqlx.Tx, eventNID, snapID, replacesNID int64) error {
_, err := txn.Exec(
Expand Down

0 comments on commit e1a49bb

Please sign in to comment.