diff --git a/sync2/poller.go b/sync2/poller.go index cc456a6d..9d2fba93 100644 --- a/sync2/poller.go +++ b/sync2/poller.go @@ -10,6 +10,8 @@ import ( "sync/atomic" "time" + "golang.org/x/exp/slices" + "github.com/getsentry/sentry-go" "github.com/matrix-org/sliding-sync/internal" @@ -791,8 +793,47 @@ func (p *poller) parseRoomsResponse(ctx context.Context, res *SyncResponse) erro stateCalls++ prependStateEvents, err := p.receiver.Initialise(ctx, roomID, roomData.State.Events) if err != nil { - lastErrs = append(lastErrs, fmt.Errorf("Initialise[%s]: %w", roomID, err)) - continue + _, ok := err.(*internal.DataError) + if ok { + // This typically happens when we are missing an m.room.create event. + // Synapse may sometimes send the m.room.create event erroneously in the timeline, + // so check if that is the case here. See https://github.com/matrix-org/complement/pull/690 + var createEvent json.RawMessage + for i, ev := range roomData.Timeline.Events { + if gjson.ParseBytes(ev).Get("type").Str == "m.room.create" { + createEvent = roomData.Timeline.Events[i] + // remove the create event from the timeline so we don't double process it + roomData.Timeline.Events = append(roomData.Timeline.Events[:i], roomData.Timeline.Events[i+1:]...) + break + } + } + if createEvent != nil { + roomData.State.Events = slices.Insert(roomData.State.Events, 0, createEvent) + // retry the processing of the room state + prependStateEvents, err = p.receiver.Initialise(ctx, roomID, roomData.State.Events) + if err == nil { + const warnMsg = "parseRoomsResponse: m.room.create event was found in the timeline not state" + logger.Warn().Str("user_id", p.userID).Str("room_id", roomID).Int( + "timeline", len(roomData.Timeline.Events), + ).Int("state", len(roomData.State.Events)).Msg(warnMsg) + hub := internal.GetSentryHubFromContextOrDefault(ctx) + hub.WithScope(func(scope *sentry.Scope) { + scope.SetContext(internal.SentryCtxKey, map[string]interface{}{ + "room_id": roomID, + "timeline": len(roomData.Timeline.Events), + "state": len(roomData.State.Events), + }) + hub.CaptureMessage(warnMsg) + }) + } + } + } + // either err isn't a data error OR we retried Initialise and it still returned an error + // either way, give up. + if err != nil { + lastErrs = append(lastErrs, fmt.Errorf("Initialise[%s]: %w", roomID, err)) + continue + } } if len(prependStateEvents) > 0 { // The poller has just learned of these state events due to an diff --git a/tests-integration/timeline_test.go b/tests-integration/timeline_test.go index 09be482f..72c7316b 100644 --- a/tests-integration/timeline_test.go +++ b/tests-integration/timeline_test.go @@ -6,8 +6,11 @@ import ( "testing" "time" + "github.com/jmoiron/sqlx" slidingsync "github.com/matrix-org/sliding-sync" + "github.com/matrix-org/sliding-sync/sqlutil" + "github.com/matrix-org/sliding-sync/state" "github.com/matrix-org/sliding-sync/sync2" "github.com/matrix-org/sliding-sync/sync3" "github.com/matrix-org/sliding-sync/testutils" @@ -1335,3 +1338,69 @@ func TestNumLiveBulk(t *testing.T) { }, )) } + +// Regression test for a thing which Synapse can sometimes send down sync v2. +// See https://github.com/matrix-org/sliding-sync/issues/367 +// This would cause this room to not be processed at all, which is bad. +func TestSeeCreateEvent(t *testing.T) { + // setup code + pqString := testutils.PrepareDBConnectionString() + db, err := sqlx.Open("postgres", pqString) + if err != nil { + t.Fatalf("failed to open postgres: %s", err) + } + v2 := runTestV2Server(t) + v3 := runTestServer(t, v2, pqString) + defer v2.close() + defer v3.close() + + roomID := "!TestSeeCreateEvent:localhost" + userID := "@TestSeeCreateEvent:localhost" + token := "TestSeeCreateEvent_TOKEN" + v2.addAccount(t, userID, token) + v2.queueResponse(userID, sync2.SyncResponse{ + Rooms: sync2.SyncRoomsResponse{ + Join: map[string]sync2.SyncV2JoinResponse{ + roomID: { + State: sync2.EventsResponse{ + Events: []json.RawMessage{ + testutils.NewStateEvent(t, "m.room.join_rules", "", "@someone:somewhere", map[string]interface{}{"join_rule": "invite"}), + testutils.NewStateEvent(t, "m.room.power_levels", "", "@someone:somewhere", map[string]interface{}{"users_default": 0}), + testutils.NewStateEvent(t, "m.room.history_visibility", "", "@someone:somewhere", map[string]interface{}{"history_visibility": "shared"}), + }, + }, + Timeline: sync2.TimelineResponse{ + Events: []json.RawMessage{ + testutils.NewStateEvent(t, "m.room.create", "", "@someone:somewhere", map[string]interface{}{"room_version": "10"}), + testutils.NewJoinEvent(t, "@someone:somewhere"), + testutils.NewJoinEvent(t, "@someone2:somewhere"), + }, + }, + }, + }, + }, + }) + v3.mustDoV3Request(t, token, sync3.Request{}) + // ensure the room exists + roomsTable := state.NewRoomsTable(db) + err = sqlutil.WithTransaction(db, func(txn *sqlx.Tx) error { + nids, err := roomsTable.LatestNIDs(txn, []string{roomID}) + if err != nil { + t.Fatalf("LatestNIDs: %s", err) + } + if len(nids) != 1 { + t.Fatalf("LatestNIDs missing: %+v", nids) + } + nid, ok := nids[roomID] + if !ok { + t.Fatalf("LatestNIDs missing room %s : %+v", roomID, nids) + } + if nid == 0 { + t.Fatalf("LatestNIDs 0 nid for room %s", roomID) + } + return nil + }) + if err != nil { + t.Fatalf("WithTransaction: %s", err) + } +}