Skip to content

Commit

Permalink
bugfix: handle malformed state/timeline responses
Browse files Browse the repository at this point in the history
Specifically, look for the create event in the timeline as this
has been seen in the wild on Synapse. Fixes #367.
  • Loading branch information
kegsay committed Nov 8, 2023
1 parent 421a572 commit c868e54
Show file tree
Hide file tree
Showing 2 changed files with 112 additions and 2 deletions.
45 changes: 43 additions & 2 deletions sync2/poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
"sync/atomic"
"time"

"golang.org/x/exp/slices"

"github.com/getsentry/sentry-go"

"github.com/matrix-org/sliding-sync/internal"
Expand Down Expand Up @@ -791,8 +793,47 @@ func (p *poller) parseRoomsResponse(ctx context.Context, res *SyncResponse) erro
stateCalls++
prependStateEvents, err := p.receiver.Initialise(ctx, roomID, roomData.State.Events)
if err != nil {
lastErrs = append(lastErrs, fmt.Errorf("Initialise[%s]: %w", roomID, err))
continue
_, ok := err.(*internal.DataError)
if ok {
// This typically happens when we are missing an m.room.create event.
// Synapse may sometimes send the m.room.create event erroneously in the timeline,
// so check if that is the case here. See https://github.com/matrix-org/complement/pull/690
var createEvent json.RawMessage
for i, ev := range roomData.Timeline.Events {
if gjson.ParseBytes(ev).Get("type").Str == "m.room.create" {
createEvent = roomData.Timeline.Events[i]
// remove the create event from the timeline so we don't double process it
roomData.Timeline.Events = append(roomData.Timeline.Events[:i], roomData.Timeline.Events[i+1:]...)
break
}
}
if createEvent != nil {
roomData.State.Events = slices.Insert(roomData.State.Events, 0, createEvent)
// retry the processing of the room state
prependStateEvents, err = p.receiver.Initialise(ctx, roomID, roomData.State.Events)
if err == nil {
const warnMsg = "parseRoomsResponse: m.room.create event was found in the timeline not state"
logger.Warn().Str("user_id", p.userID).Str("room_id", roomID).Int(
"timeline", len(roomData.Timeline.Events),
).Int("state", len(roomData.State.Events)).Msg(warnMsg)
hub := internal.GetSentryHubFromContextOrDefault(ctx)
hub.WithScope(func(scope *sentry.Scope) {
scope.SetContext(internal.SentryCtxKey, map[string]interface{}{
"room_id": roomID,
"timeline": len(roomData.Timeline.Events),
"state": len(roomData.State.Events),
})
hub.CaptureMessage(warnMsg)
})
}
}
}
// either err isn't a data error OR we retried Initialise and it still returned an error
// either way, give up.
if err != nil {
lastErrs = append(lastErrs, fmt.Errorf("Initialise[%s]: %w", roomID, err))
continue
}
}
if len(prependStateEvents) > 0 {
// The poller has just learned of these state events due to an
Expand Down
69 changes: 69 additions & 0 deletions tests-integration/timeline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,11 @@ import (
"testing"
"time"

"github.com/jmoiron/sqlx"
slidingsync "github.com/matrix-org/sliding-sync"

"github.com/matrix-org/sliding-sync/sqlutil"
"github.com/matrix-org/sliding-sync/state"
"github.com/matrix-org/sliding-sync/sync2"
"github.com/matrix-org/sliding-sync/sync3"
"github.com/matrix-org/sliding-sync/testutils"
Expand Down Expand Up @@ -1335,3 +1338,69 @@ func TestNumLiveBulk(t *testing.T) {
},
))
}

// Regression test for a thing which Synapse can sometimes send down sync v2.
// See https://github.com/matrix-org/sliding-sync/issues/367
// This would cause this room to not be processed at all, which is bad.
func TestSeeCreateEvent(t *testing.T) {
// setup code
pqString := testutils.PrepareDBConnectionString()
db, err := sqlx.Open("postgres", pqString)
if err != nil {
t.Fatalf("failed to open postgres: %s", err)
}
v2 := runTestV2Server(t)
v3 := runTestServer(t, v2, pqString)
defer v2.close()
defer v3.close()

roomID := "!TestSeeCreateEvent:localhost"
userID := "@TestSeeCreateEvent:localhost"
token := "TestSeeCreateEvent_TOKEN"
v2.addAccount(t, userID, token)
v2.queueResponse(userID, sync2.SyncResponse{
Rooms: sync2.SyncRoomsResponse{
Join: map[string]sync2.SyncV2JoinResponse{
roomID: {
State: sync2.EventsResponse{
Events: []json.RawMessage{
testutils.NewStateEvent(t, "m.room.join_rules", "", "@someone:somewhere", map[string]interface{}{"join_rule": "invite"}),
testutils.NewStateEvent(t, "m.room.power_levels", "", "@someone:somewhere", map[string]interface{}{"users_default": 0}),
testutils.NewStateEvent(t, "m.room.history_visibility", "", "@someone:somewhere", map[string]interface{}{"history_visibility": "shared"}),
},
},
Timeline: sync2.TimelineResponse{
Events: []json.RawMessage{
testutils.NewStateEvent(t, "m.room.create", "", "@someone:somewhere", map[string]interface{}{"room_version": "10"}),
testutils.NewJoinEvent(t, "@someone:somewhere"),
testutils.NewJoinEvent(t, "@someone2:somewhere"),
},
},
},
},
},
})
v3.mustDoV3Request(t, token, sync3.Request{})
// ensure the room exists
roomsTable := state.NewRoomsTable(db)
err = sqlutil.WithTransaction(db, func(txn *sqlx.Tx) error {
nids, err := roomsTable.LatestNIDs(txn, []string{roomID})
if err != nil {
t.Fatalf("LatestNIDs: %s", err)
}
if len(nids) != 1 {
t.Fatalf("LatestNIDs missing: %+v", nids)
}
nid, ok := nids[roomID]
if !ok {
t.Fatalf("LatestNIDs missing room %s : %+v", roomID, nids)
}
if nid == 0 {
t.Fatalf("LatestNIDs 0 nid for room %s", roomID)
}
return nil
})
if err != nil {
t.Fatalf("WithTransaction: %s", err)
}
}

0 comments on commit c868e54

Please sign in to comment.