Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle malformed events more gracefully #224

Merged
merged 4 commits into from
Jul 26, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions state/accumulator.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,8 +207,9 @@ func (a *Accumulator) Initialise(roomID string, state []json.RawMessage) (Initia
IsState: true,
}
}
if err := ensureFieldsSet(events); err != nil {
return fmt.Errorf("events malformed: %s", err)
events = filterAndEnsureFieldsSet(events)
if len(events) == 0 {
return fmt.Errorf("failed to insert events, all events were filtered out: %w", err)
}
eventIDToNID, err := a.eventsTable.Insert(txn, events, false)
if err != nil {
Expand Down Expand Up @@ -415,7 +416,10 @@ func (a *Accumulator) filterAndParseTimelineEvents(txn *sqlx.Tx, roomID string,
RoomID: roomID,
}
if err := e.ensureFieldsSetOnEvent(); err != nil {
return nil, fmt.Errorf("event malformed: %s", err)
logger.Warn().Str("event_id", e.ID).Str("room_id", roomID).Err(err).Msg(
"Accumulator.filterAndParseTimelineEvents: failed to parse event, ignoring",
)
continue
}
if _, ok := seenEvents[e.ID]; ok {
logger.Warn().Str("event_id", e.ID).Str("room_id", roomID).Msg(
Expand Down
16 changes: 10 additions & 6 deletions state/event_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func (ev *Event) ensureFieldsSetOnEvent() error {
}
if ev.Type == "" {
typeResult := evJSON.Get("type")
if !typeResult.Exists() || typeResult.Str == "" {
if !typeResult.Exists() || typeResult.Type != gjson.String { // empty strings for 'type' are valid apparently
return fmt.Errorf("event JSON missing type key")
}
ev.Type = typeResult.Str
Expand Down Expand Up @@ -153,7 +153,7 @@ func (t *EventTable) SelectHighestNID() (highest int64, err error) {
// we insert new events A and B in that order, then NID(A) < NID(B).
func (t *EventTable) Insert(txn *sqlx.Tx, events []Event, checkFields bool) (map[string]int64, error) {
if checkFields {
ensureFieldsSet(events)
events = filterAndEnsureFieldsSet(events)
}
result := make(map[string]int64)
for i := range events {
Expand Down Expand Up @@ -449,14 +449,18 @@ func (c EventChunker) Subslice(i, j int) sqlutil.Chunker {
return c[i:j]
}

func ensureFieldsSet(events []Event) error {
func filterAndEnsureFieldsSet(events []Event) []Event {
result := make([]Event, 0, len(events))
// ensure fields are set
for i := range events {
ev := events[i]
if err := ev.ensureFieldsSetOnEvent(); err != nil {
return err
logger.Warn().Str("event_id", ev.ID).Err(err).Msg(
"filterAndEnsureFieldsSet: failed to parse event, ignoring",
)
Comment on lines +458 to +460
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably want to log room ID here too.

continue
}
events[i] = ev
result = append(result, ev)
}
return nil
return result
}
118 changes: 118 additions & 0 deletions tests-integration/regressions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,3 +110,121 @@ func TestBackfillInviteDoesntCorruptState(t *testing.T) {
},
))
}

func TestMalformedEventsTimeline(t *testing.T) {
pqString := testutils.PrepareDBConnectionString()
// setup code
v2 := runTestV2Server(t)
v3 := runTestServer(t, v2, pqString)
defer v2.close()
defer v3.close()

// unusual events ARE VALID EVENTS and should be sent to the client, but are unusual for some reason.
unusualEvents := []json.RawMessage{
testutils.NewStateEvent(t, "", "", alice, map[string]interface{}{
"empty string": "for event type",
}),
}
// malformed events are INVALID and should be ignored by the proxy.
malformedEvents := []json.RawMessage{
json.RawMessage(`{}`), // empty object
json.RawMessage(`{"type":5}`), // type is an integer
json.RawMessage(`{"type":"foo","content":{},"event_id":""}`), // 0-length string as event ID
json.RawMessage(`{"type":"foo","content":{}}`), // missing event ID
}

room := roomEvents{
roomID: "!TestMalformedEventsTimeline:localhost",
// append malformed after unusual. All malformed events should be dropped,
// leaving only unusualEvents.
events: append(unusualEvents, malformedEvents...),
state: createRoomState(t, alice, time.Now()),
}
v2.addAccount(t, alice, aliceToken)
v2.queueResponse(alice, sync2.SyncResponse{
Rooms: sync2.SyncRoomsResponse{
Join: v2JoinTimeline(room),
},
})

// alice syncs and should see the room.
aliceRes := v3.mustDoV3Request(t, aliceToken, sync3.Request{
Lists: map[string]sync3.RequestList{
"a": {
Ranges: sync3.SliceRanges{{0, 20}},
RoomSubscription: sync3.RoomSubscription{
TimelineLimit: int64(len(unusualEvents)),
},
},
},
})
m.MatchResponse(t, aliceRes, m.MatchList("a", m.MatchV3Count(1), m.MatchV3Ops(m.MatchV3SyncOp(0, 0, []string{room.roomID}))),
m.MatchRoomSubscriptionsStrict(map[string][]m.RoomMatcher{
room.roomID: {
m.MatchJoinCount(1),
m.MatchRoomTimeline(unusualEvents),
},
}))
}

func TestMalformedEventsState(t *testing.T) {
pqString := testutils.PrepareDBConnectionString()
// setup code
v2 := runTestV2Server(t)
v3 := runTestServer(t, v2, pqString)
defer v2.close()
defer v3.close()

// unusual events ARE VALID EVENTS and should be sent to the client, but are unusual for some reason.
unusualEvents := []json.RawMessage{
testutils.NewStateEvent(t, "", "", alice, map[string]interface{}{
"empty string": "for event type",
}),
}
// malformed events are INVALID and should be ignored by the proxy.
malformedEvents := []json.RawMessage{
json.RawMessage(`{}`), // empty object
json.RawMessage(`{"type":5,"content":{},"event_id":"f","state_key":""}`), // type is an integer
json.RawMessage(`{"type":"foo","content":{},"event_id":"","state_key":""}`), // 0-length string as event ID
json.RawMessage(`{"type":"foo","content":{},"state_key":""}`), // missing event ID
}

latestEvent := testutils.NewEvent(t, "m.room.message", alice, map[string]interface{}{"body": "hi"})

room := roomEvents{
roomID: "!TestMalformedEventsState:localhost",
events: []json.RawMessage{latestEvent},
// append malformed after unusual. All malformed events should be dropped,
// leaving only unusualEvents.
state: append(createRoomState(t, alice, time.Now()), append(unusualEvents, malformedEvents...)...),
}
v2.addAccount(t, alice, aliceToken)
v2.queueResponse(alice, sync2.SyncResponse{
Rooms: sync2.SyncRoomsResponse{
Join: v2JoinTimeline(room),
},
})

// alice syncs and should see the room.
aliceRes := v3.mustDoV3Request(t, aliceToken, sync3.Request{
Lists: map[string]sync3.RequestList{
"a": {
Ranges: sync3.SliceRanges{{0, 20}},
RoomSubscription: sync3.RoomSubscription{
TimelineLimit: int64(len(unusualEvents)),
RequiredState: [][2]string{{"", ""}},
},
},
},
})
m.MatchResponse(t, aliceRes, m.MatchList("a", m.MatchV3Count(1), m.MatchV3Ops(m.MatchV3SyncOp(0, 0, []string{room.roomID}))),
m.MatchRoomSubscriptionsStrict(map[string][]m.RoomMatcher{
room.roomID: {
m.MatchJoinCount(1),
m.MatchRoomTimeline([]json.RawMessage{latestEvent}),
m.MatchRoomRequiredState([]json.RawMessage{
unusualEvents[0],
}),
},
}))
}