Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Inject state blocks from incremental polls into the room timeline #71

Merged
merged 40 commits into from
Apr 24, 2023
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
38c1bda
Integration test
Apr 13, 2023
1d59167
E2E test case draft
Apr 13, 2023
46059df
WIP: update test cases
Apr 14, 2023
419e1ab
Update test case again
Apr 17, 2023
4b90454
Don't reselect NIDs after initialising a room
Apr 17, 2023
a2cba6e
We don't need `eventIDs` either
Apr 17, 2023
c19b561
Merge branch 'dmr/dont-select-after-insert-returning' into dmr/gappy-…
Apr 17, 2023
666823d
Introduce return struct for Initialise
Apr 17, 2023
4ba80d2
Initialise: handle state blocks from a gappy sync
Apr 17, 2023
9fdb001
TODO comment
Apr 17, 2023
33b174d
Fixup test code
Apr 17, 2023
ac9651c
Propagate err message
Apr 17, 2023
5d8560c
Fix db query
Apr 17, 2023
b7a8e7d
Improve logging
Apr 17, 2023
e3008e6
Fix the logging
Apr 17, 2023
2406da5
TODO note
Apr 17, 2023
f28f7d0
Return set of unknown events from the db
Apr 18, 2023
9d53a87
Simply Initialise to bail out early
Apr 18, 2023
2bae784
Propagate prepend events to poller, and inject
Apr 18, 2023
5621423
Fix tests
Apr 18, 2023
9af0471
Always look for unknown state events
Apr 18, 2023
b5893b1
Better error checking in test
Apr 18, 2023
aa07188
Fixup test
Apr 18, 2023
7f03a3d
Use test helper instead
Apr 19, 2023
e6aac43
SelectUnknownEventIDs expects a txn
Apr 19, 2023
b54ba7c
Poller test description
Apr 19, 2023
5dd4315
Test new events table query
Apr 19, 2023
76066f9
Tidy up accumulator event ID handling
Apr 19, 2023
5a9fae1
Fix capitalisation
Apr 19, 2023
b32e5da
Fix test function args too
Apr 19, 2023
00dd396
Fix MatchRoomTimelineMostRecent
Apr 19, 2023
bea931d
Fix running unit test alongside other tests
Apr 19, 2023
1680952
Tidyup unit test
Apr 19, 2023
6e7d0cb
Test memberships are updated after gappy sync
Apr 19, 2023
aa09d12
Check the new query reports multiple unknown IDs
Apr 21, 2023
8324f6e
SelectByIDs has an ordering guarantee
Apr 21, 2023
6c9aa09
Pass room ID to new query
Apr 24, 2023
2755384
Revert "Pass room ID to new query"
Apr 24, 2023
82d1d58
Update integration test
Apr 24, 2023
3274cf2
typo
Apr 24, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 9 additions & 5 deletions state/accumulator.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,17 +178,21 @@ func (a *Accumulator) Initialise(roomID string, state []json.RawMessage) (Initia
// that the poller can log a warning.
logger.Debug().Str("room_id", roomID).Int64("snapshot_id", snapshotID).Msg("Accumulator.Initialise called with incremental state but current snapshot already exists.")
eventIDs := make([]string, len(state))
eventIDToRawEvent := make(map[string]json.RawMessage, len(state))
for i := range state {
eventIDs[i] = gjson.ParseBytes(state[i]).Get("event_id").Str
eventID := gjson.ParseBytes(state[i]).Get("event_id")
if !eventID.Exists() || eventID.Type != gjson.String {
return fmt.Errorf("Event %d lacks an event ID", i)
}
eventIDToRawEvent[eventID.Str] = state[i]
eventIDs[i] = eventID.Str
}
unknownEventIDs, err := a.eventsTable.SelectUnknownEventIDs(txn, eventIDs)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could be improved if we made use of the room ID as it can cut lookup times down a lot.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I'd just assumed this was indexed on event id---maybe we should add that index?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Err, the event_id column is UNIQUE, so it will be index will exist. Not sure why the room ID would help here?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The room ID can help as it cuts down the dataset immediately, rather than relying on a the mahoosive event ID index of all events ever. In the past it has been beneficial to include this information when running EXPLAIN.

Copy link
Contributor Author

@DMRobertson DMRobertson Apr 24, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I remain sceptical.

-- Matrix HQ
\set room_id '!OGEhHVWSdvArJzumhm:matrix.org'
-- 20 event Ids taken from Matrix HQ, plus two fake event IDs
\set event_ids {$fakeevent1,$4jteAg_T0BMXWiSRzZbptOF9zETeomXbJoDtg6zs9SU,$Iu-74VzJ0B4Xc6XgBDAKoWYZkFcUGCuJ_vLd47Cf4Ok,$OIqjRnK_rXBApl_8x4s0Iq--OH969cikkvjGYW5Seq8,$7EYkESAKR-t6wKLG7rjPyyVcZdQ-v0YQtaLbzrLJ2vI,$6LYIKlvAlC1ILMhIsWx-M9IDOarFBLM_Ni609bz1NDI,$w1sLe53ZqsRc9vebfLS8JCUyq3ql9DRhN-lERWsZRHQ,$A6k3YCykEOHCzsbzxQjdupwNdt4hr4AeS6IWOnEG1H4,$K1EGNY8C-McIJJ5PBbd1GlUFMuw84S_QlbomqnYMiYY,$QgyACr1TujWs846B5tiAkbM9JkBEzUoMK6QgOfXDc64,$f2aefO_6yYMhSFRtQWYkTDdie87ldgTJD1h3JQ2F_3Q,$pYU_XgUXF0kQXvtSjG0boNdMHtXGBIFfwBDqxZuU1IE,$cZ5XF2o7ObQqROkJZWSubgm4cLgBX8j1nkrcrHv0rZ0,$wXTQjHKMTsMBqj3B4BLN40L0-7tz1NbG8WesWgGcjwY,$csN_i4UoBTk5jFzdDAxnKSmKg0Hp6U7Af_ifht07D-o,$NBsWMM0MJ_XcAt9gk9ZJxV3_v3tPfvmZ1azHI3Wb31A,$bf1IpaCxsC2Te4mWrBXcJZqesoh1HsssdaI_U1OzXuQ,$nsLyOZZRywB1bGwEVNR3qQFLuY3hGOuM-26J6IAvsgk,$-b0T_rqr5-vE1FiqwYOriXkTE0c8oMXj0a9uSeMnWVw,$qx3qMhu6ba_ceFcv9ADA5J45CTUhhAMDz44eOVmJwR8,$ugwvSmk0LfVLpxk0cloQ6tQxjacdp0ojurri3pN4bl4,$notanevent2}

Query without room ID:

EXPLAIN ANALYZE VERBOSE
WITH maybe_unknown_events(event_id) AS (SELECT unnest(:'event_ids'::text[]))
	SELECT *
	FROM maybe_unknown_events LEFT JOIN syncv3_events USING(event_id)
	WHERE event_nid IS NULL;
                                         QUERY PLAN                                                                                                                                                                                                                                         
                                                                                                                                                                                                                                                                                            
                                                                                            
--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
--------------------------------------------------------------------------------------------
 Nested Loop Left Join  (cost=0.56..189.27 rows=1 width=648) (actual time=0.030..0.239 rows=2 loops=1)
   Output: (unnest('{$fakeevent1,$4jteAg_T0BMXWiSRzZbptOF9zETeomXbJoDtg6zs9SU,$Iu-74VzJ0B4Xc6XgBDAKoWYZkFcUGCuJ_vLd47Cf4Ok,$OIqjRnK_rXBApl_8x4s0Iq--OH969cikkvjGYW5Seq8,$7EYkESAKR-t6wKLG7rjPyyVcZdQ-v0YQtaLbzrLJ2vI,$6LYIKlvAlC1ILMhIsWx-M9IDOarFBLM_Ni609bz1NDI,$w1sLe53ZqsRc9vebfLS8JCUyq
3ql9DRhN-lERWsZRHQ,$A6k3YCykEOHCzsbzxQjdupwNdt4hr4AeS6IWOnEG1H4,$K1EGNY8C-McIJJ5PBbd1GlUFMuw84S_QlbomqnYMiYY,$QgyACr1TujWs846B5tiAkbM9JkBEzUoMK6QgOfXDc64,$f2aefO_6yYMhSFRtQWYkTDdie87ldgTJD1h3JQ2F_3Q,$pYU_XgUXF0kQXvtSjG0boNdMHtXGBIFfwBDqxZuU1IE,$cZ5XF2o7ObQqROkJZWSubgm4cLgBX8j1nkrcrHv
0rZ0,$wXTQjHKMTsMBqj3B4BLN40L0-7tz1NbG8WesWgGcjwY,$csN_i4UoBTk5jFzdDAxnKSmKg0Hp6U7Af_ifht07D-o,$NBsWMM0MJ_XcAt9gk9ZJxV3_v3tPfvmZ1azHI3Wb31A,$bf1IpaCxsC2Te4mWrBXcJZqesoh1HsssdaI_U1OzXuQ,$nsLyOZZRywB1bGwEVNR3qQFLuY3hGOuM-26J6IAvsgk,$-b0T_rqr5-vE1FiqwYOriXkTE0c8oMXj0a9uSeMnWVw,$qx3qMhu6
ba_ceFcv9ADA5J45CTUhhAMDz44eOVmJwR8,$ugwvSmk0LfVLpxk0cloQ6tQxjacdp0ojurri3pN4bl4,$notanevent2}'::text[])), syncv3_events.event_nid, syncv3_events.before_state_snapshot_id, syncv3_events.event_replaces_nid, syncv3_events.room_id, syncv3_events.event_type, syncv3_events.state_key, sync
v3_events.prev_batch, syncv3_events.membership, syncv3_events.is_state, syncv3_events.event
   Inner Unique: true
   Filter: (syncv3_events.event_nid IS NULL)
   Rows Removed by Filter: 20
   ->  ProjectSet  (cost=0.00..0.13 rows=22 width=32) (actual time=0.003..0.007 rows=22 loops=1)
         Output: unnest('{$fakeevent1,$4jteAg_T0BMXWiSRzZbptOF9zETeomXbJoDtg6zs9SU,$Iu-74VzJ0B4Xc6XgBDAKoWYZkFcUGCuJ_vLd47Cf4Ok,$OIqjRnK_rXBApl_8x4s0Iq--OH969cikkvjGYW5Seq8,$7EYkESAKR-t6wKLG7rjPyyVcZdQ-v0YQtaLbzrLJ2vI,$6LYIKlvAlC1ILMhIsWx-M9IDOarFBLM_Ni609bz1NDI,$w1sLe53ZqsRc9vebfLS8
JCUyq3ql9DRhN-lERWsZRHQ,$A6k3YCykEOHCzsbzxQjdupwNdt4hr4AeS6IWOnEG1H4,$K1EGNY8C-McIJJ5PBbd1GlUFMuw84S_QlbomqnYMiYY,$QgyACr1TujWs846B5tiAkbM9JkBEzUoMK6QgOfXDc64,$f2aefO_6yYMhSFRtQWYkTDdie87ldgTJD1h3JQ2F_3Q,$pYU_XgUXF0kQXvtSjG0boNdMHtXGBIFfwBDqxZuU1IE,$cZ5XF2o7ObQqROkJZWSubgm4cLgBX8j1nk
rcrHv0rZ0,$wXTQjHKMTsMBqj3B4BLN40L0-7tz1NbG8WesWgGcjwY,$csN_i4UoBTk5jFzdDAxnKSmKg0Hp6U7Af_ifht07D-o,$NBsWMM0MJ_XcAt9gk9ZJxV3_v3tPfvmZ1azHI3Wb31A,$bf1IpaCxsC2Te4mWrBXcJZqesoh1HsssdaI_U1OzXuQ,$nsLyOZZRywB1bGwEVNR3qQFLuY3hGOuM-26J6IAvsgk,$-b0T_rqr5-vE1FiqwYOriXkTE0c8oMXj0a9uSeMnWVw,$qx3
qMhu6ba_ceFcv9ADA5J45CTUhhAMDz44eOVmJwR8,$ugwvSmk0LfVLpxk0cloQ6tQxjacdp0ojurri3pN4bl4,$notanevent2}'::text[])
         ->  Result  (cost=0.00..0.01 rows=1 width=0) (actual time=0.001..0.001 rows=1 loops=1)
   ->  Index Scan using syncv3_events_event_id_key on public.syncv3_events  (cost=0.56..8.58 rows=1 width=658) (actual time=0.010..0.010 rows=1 loops=22)
         Output: syncv3_events.event_nid, syncv3_events.event_id, syncv3_events.before_state_snapshot_id, syncv3_events.event_replaces_nid, syncv3_events.room_id, syncv3_events.event_type, syncv3_events.state_key, syncv3_events.prev_batch, syncv3_events.membership, syncv3_events.is_s
tate, syncv3_events.event
         Index Cond: (syncv3_events.event_id = (unnest('{$fakeevent1,$4jteAg_T0BMXWiSRzZbptOF9zETeomXbJoDtg6zs9SU,$Iu-74VzJ0B4Xc6XgBDAKoWYZkFcUGCuJ_vLd47Cf4Ok,$OIqjRnK_rXBApl_8x4s0Iq--OH969cikkvjGYW5Seq8,$7EYkESAKR-t6wKLG7rjPyyVcZdQ-v0YQtaLbzrLJ2vI,$6LYIKlvAlC1ILMhIsWx-M9IDOarFBLM_Ni
609bz1NDI,$w1sLe53ZqsRc9vebfLS8JCUyq3ql9DRhN-lERWsZRHQ,$A6k3YCykEOHCzsbzxQjdupwNdt4hr4AeS6IWOnEG1H4,$K1EGNY8C-McIJJ5PBbd1GlUFMuw84S_QlbomqnYMiYY,$QgyACr1TujWs846B5tiAkbM9JkBEzUoMK6QgOfXDc64,$f2aefO_6yYMhSFRtQWYkTDdie87ldgTJD1h3JQ2F_3Q,$pYU_XgUXF0kQXvtSjG0boNdMHtXGBIFfwBDqxZuU1IE,$cZ5
XF2o7ObQqROkJZWSubgm4cLgBX8j1nkrcrHv0rZ0,$wXTQjHKMTsMBqj3B4BLN40L0-7tz1NbG8WesWgGcjwY,$csN_i4UoBTk5jFzdDAxnKSmKg0Hp6U7Af_ifht07D-o,$NBsWMM0MJ_XcAt9gk9ZJxV3_v3tPfvmZ1azHI3Wb31A,$bf1IpaCxsC2Te4mWrBXcJZqesoh1HsssdaI_U1OzXuQ,$nsLyOZZRywB1bGwEVNR3qQFLuY3hGOuM-26J6IAvsgk,$-b0T_rqr5-vE1Fiqw
YOriXkTE0c8oMXj0a9uSeMnWVw,$qx3qMhu6ba_ceFcv9ADA5J45CTUhhAMDz44eOVmJwR8,$ugwvSmk0LfVLpxk0cloQ6tQxjacdp0ojurri3pN4bl4,$notanevent2}'::text[])))
 Query Identifier: -467502754092800539
 Planning Time: 0.142 ms
 Execution Time: 0.264 ms

Query also checking room ID (note it is not just a case of appending WHERE room_id = ...):

EXPLAIN ANALYZE VERBOSE
WITH maybe_unknown_events(event_id) AS (SELECT unnest(:'event_ids'::text[]))
	SELECT *
	FROM maybe_unknown_events LEFT JOIN syncv3_events ON maybe_unknown_events.event_id = syncv3_events.event_id AND room_id = :'room_id'
	WHERE event_nid IS NULL;
	                                                     QUERY PLAN                                                                                                                                                                                                                             
                                                                                                                                                                                                                                                                                            
                                                                                                                    
--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
--------------------------------------------------------------------------------------------------------------------
 Nested Loop Left Join  (cost=0.56..189.33 rows=1 width=690) (actual time=0.053..0.262 rows=2 loops=1)
   Output: (unnest('{$fakeevent1,$4jteAg_T0BMXWiSRzZbptOF9zETeomXbJoDtg6zs9SU,$Iu-74VzJ0B4Xc6XgBDAKoWYZkFcUGCuJ_vLd47Cf4Ok,$OIqjRnK_rXBApl_8x4s0Iq--OH969cikkvjGYW5Seq8,$7EYkESAKR-t6wKLG7rjPyyVcZdQ-v0YQtaLbzrLJ2vI,$6LYIKlvAlC1ILMhIsWx-M9IDOarFBLM_Ni609bz1NDI,$w1sLe53ZqsRc9vebfLS8JCUyq
3ql9DRhN-lERWsZRHQ,$A6k3YCykEOHCzsbzxQjdupwNdt4hr4AeS6IWOnEG1H4,$K1EGNY8C-McIJJ5PBbd1GlUFMuw84S_QlbomqnYMiYY,$QgyACr1TujWs846B5tiAkbM9JkBEzUoMK6QgOfXDc64,$f2aefO_6yYMhSFRtQWYkTDdie87ldgTJD1h3JQ2F_3Q,$pYU_XgUXF0kQXvtSjG0boNdMHtXGBIFfwBDqxZuU1IE,$cZ5XF2o7ObQqROkJZWSubgm4cLgBX8j1nkrcrHv
0rZ0,$wXTQjHKMTsMBqj3B4BLN40L0-7tz1NbG8WesWgGcjwY,$csN_i4UoBTk5jFzdDAxnKSmKg0Hp6U7Af_ifht07D-o,$NBsWMM0MJ_XcAt9gk9ZJxV3_v3tPfvmZ1azHI3Wb31A,$bf1IpaCxsC2Te4mWrBXcJZqesoh1HsssdaI_U1OzXuQ,$nsLyOZZRywB1bGwEVNR3qQFLuY3hGOuM-26J6IAvsgk,$-b0T_rqr5-vE1FiqwYOriXkTE0c8oMXj0a9uSeMnWVw,$qx3qMhu6
ba_ceFcv9ADA5J45CTUhhAMDz44eOVmJwR8,$ugwvSmk0LfVLpxk0cloQ6tQxjacdp0ojurri3pN4bl4,$notanevent2}'::text[])), syncv3_events.event_nid, syncv3_events.event_id, syncv3_events.before_state_snapshot_id, syncv3_events.event_replaces_nid, syncv3_events.room_id, syncv3_events.event_type, syncv
3_events.state_key, syncv3_events.prev_batch, syncv3_events.membership, syncv3_events.is_state, syncv3_events.event
   Inner Unique: true
   Filter: (syncv3_events.event_nid IS NULL)
   Rows Removed by Filter: 20
   ->  ProjectSet  (cost=0.00..0.13 rows=22 width=32) (actual time=0.003..0.006 rows=22 loops=1)
         Output: unnest('{$fakeevent1,$4jteAg_T0BMXWiSRzZbptOF9zETeomXbJoDtg6zs9SU,$Iu-74VzJ0B4Xc6XgBDAKoWYZkFcUGCuJ_vLd47Cf4Ok,$OIqjRnK_rXBApl_8x4s0Iq--OH969cikkvjGYW5Seq8,$7EYkESAKR-t6wKLG7rjPyyVcZdQ-v0YQtaLbzrLJ2vI,$6LYIKlvAlC1ILMhIsWx-M9IDOarFBLM_Ni609bz1NDI,$w1sLe53ZqsRc9vebfLS8
JCUyq3ql9DRhN-lERWsZRHQ,$A6k3YCykEOHCzsbzxQjdupwNdt4hr4AeS6IWOnEG1H4,$K1EGNY8C-McIJJ5PBbd1GlUFMuw84S_QlbomqnYMiYY,$QgyACr1TujWs846B5tiAkbM9JkBEzUoMK6QgOfXDc64,$f2aefO_6yYMhSFRtQWYkTDdie87ldgTJD1h3JQ2F_3Q,$pYU_XgUXF0kQXvtSjG0boNdMHtXGBIFfwBDqxZuU1IE,$cZ5XF2o7ObQqROkJZWSubgm4cLgBX8j1nk
rcrHv0rZ0,$wXTQjHKMTsMBqj3B4BLN40L0-7tz1NbG8WesWgGcjwY,$csN_i4UoBTk5jFzdDAxnKSmKg0Hp6U7Af_ifht07D-o,$NBsWMM0MJ_XcAt9gk9ZJxV3_v3tPfvmZ1azHI3Wb31A,$bf1IpaCxsC2Te4mWrBXcJZqesoh1HsssdaI_U1OzXuQ,$nsLyOZZRywB1bGwEVNR3qQFLuY3hGOuM-26J6IAvsgk,$-b0T_rqr5-vE1FiqwYOriXkTE0c8oMXj0a9uSeMnWVw,$qx3
qMhu6ba_ceFcv9ADA5J45CTUhhAMDz44eOVmJwR8,$ugwvSmk0LfVLpxk0cloQ6tQxjacdp0ojurri3pN4bl4,$notanevent2}'::text[])
         ->  Result  (cost=0.00..0.01 rows=1 width=0) (actual time=0.001..0.001 rows=1 loops=1)
   ->  Index Scan using syncv3_events_event_id_key on public.syncv3_events  (cost=0.56..8.58 rows=1 width=658) (actual time=0.011..0.011 rows=1 loops=22)
         Output: syncv3_events.event_nid, syncv3_events.event_id, syncv3_events.before_state_snapshot_id, syncv3_events.event_replaces_nid, syncv3_events.room_id, syncv3_events.event_type, syncv3_events.state_key, syncv3_events.prev_batch, syncv3_events.membership, syncv3_events.is_s
tate, syncv3_events.event
         Index Cond: (syncv3_events.event_id = (unnest('{$fakeevent1,$4jteAg_T0BMXWiSRzZbptOF9zETeomXbJoDtg6zs9SU,$Iu-74VzJ0B4Xc6XgBDAKoWYZkFcUGCuJ_vLd47Cf4Ok,$OIqjRnK_rXBApl_8x4s0Iq--OH969cikkvjGYW5Seq8,$7EYkESAKR-t6wKLG7rjPyyVcZdQ-v0YQtaLbzrLJ2vI,$6LYIKlvAlC1ILMhIsWx-M9IDOarFBLM_Ni
609bz1NDI,$w1sLe53ZqsRc9vebfLS8JCUyq3ql9DRhN-lERWsZRHQ,$A6k3YCykEOHCzsbzxQjdupwNdt4hr4AeS6IWOnEG1H4,$K1EGNY8C-McIJJ5PBbd1GlUFMuw84S_QlbomqnYMiYY,$QgyACr1TujWs846B5tiAkbM9JkBEzUoMK6QgOfXDc64,$f2aefO_6yYMhSFRtQWYkTDdie87ldgTJD1h3JQ2F_3Q,$pYU_XgUXF0kQXvtSjG0boNdMHtXGBIFfwBDqxZuU1IE,$cZ5
XF2o7ObQqROkJZWSubgm4cLgBX8j1nkrcrHv0rZ0,$wXTQjHKMTsMBqj3B4BLN40L0-7tz1NbG8WesWgGcjwY,$csN_i4UoBTk5jFzdDAxnKSmKg0Hp6U7Af_ifht07D-o,$NBsWMM0MJ_XcAt9gk9ZJxV3_v3tPfvmZ1azHI3Wb31A,$bf1IpaCxsC2Te4mWrBXcJZqesoh1HsssdaI_U1OzXuQ,$nsLyOZZRywB1bGwEVNR3qQFLuY3hGOuM-26J6IAvsgk,$-b0T_rqr5-vE1Fiqw
YOriXkTE0c8oMXj0a9uSeMnWVw,$qx3qMhu6ba_ceFcv9ADA5J45CTUhhAMDz44eOVmJwR8,$ugwvSmk0LfVLpxk0cloQ6tQxjacdp0ojurri3pN4bl4,$notanevent2}'::text[])))
         Filter: (syncv3_events.room_id = '!OGEhHVWSdvArJzumhm:matrix.org'::text)
 Query Identifier: 2484314538527637739
 Planning Time: 0.163 ms
 Execution Time: 0.285 ms

AFAICS the only difference is an additional

     Filter: (syncv3_events.room_id = '!OGEhHVWSdvArJzumhm:matrix.org'::text)

at the bottom of the second query plan, which removes no rows anyway. The time diff (~+40 microseconds) is positive. (Though this isn't a statistical analysis; that diff may be just noise.)

if err != nil {
return fmt.Errorf("error determing which event IDs are unknown: %s", err)
}
for i := range state {
if _, unknown := unknownEventIDs[eventIDs[i]]; unknown {
res.PrependTimelineEvents = append(res.PrependTimelineEvents, state[i])
}
for unknownEventID := range unknownEventIDs {
res.PrependTimelineEvents = append(res.PrependTimelineEvents, eventIDToRawEvent[unknownEventID])
}
return nil
}
Expand Down
9 changes: 2 additions & 7 deletions state/event_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,7 @@ func (t *EventTable) SelectStrippedEventsByIDs(txn *sqlx.Tx, verifyAll bool, ids
}

// SelectUnknownEventIDs accepts a list of event IDs and returns the subset of those which are not known to the DB.
// It MUST be called within a transaction, or else will panic.
func (t *EventTable) SelectUnknownEventIDs(txn *sqlx.Tx, maybeUnknownEventIDs []string) (map[string]struct{}, error) {
// Note: in practice, the order of rows returned matches the order of rows of
// array entries. But I don't think that's guaranteed. Return an (unordered) set
Expand All @@ -271,13 +272,7 @@ func (t *EventTable) SelectUnknownEventIDs(txn *sqlx.Tx, maybeUnknownEventIDs []
WHERE event_nid IS NULL;`

var unknownEventIDs []string
var err error
if txn != nil {
err = txn.Select(&unknownEventIDs, queryStr, pq.StringArray(maybeUnknownEventIDs))
} else {
err = t.db.Select(&unknownEventIDs, queryStr, pq.StringArray(maybeUnknownEventIDs))
}
if err != nil {
if err := txn.Select(&unknownEventIDs, queryStr, pq.StringArray(maybeUnknownEventIDs)); err != nil {
return nil, err
}
unknownMap := make(map[string]struct{}, len(unknownEventIDs))
Expand Down
75 changes: 75 additions & 0 deletions state/event_table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -870,3 +870,78 @@ func TestRemoveUnsignedTXNID(t *testing.T) {
}
}
}

func TestEventTableSelectUnknownEventIDs(t *testing.T) {
db, close := connectToDB(t)
defer close()
txn, err := db.Beginx()
if err != nil {
t.Fatalf("failed to start txn: %s", err)
}
defer txn.Rollback()
const roomID = "!1:localhost"

// Note: there shouldn't be any other events with these IDs inserted before this
// transaction. $A and $B seem to be inserted and commit in TestEventTablePrevBatch.
const eventID1 = "$A-SelectUnknownEventIDs"
const eventID2 = "$B-SelectUnknownEventIDs"

knownEvents := []Event{
{
Type: "m.room.create",
StateKey: "",
IsState: true,
ID: eventID1,
RoomID: roomID,
},
{
Type: "m.room.name",
StateKey: "",
IsState: true,
ID: eventID2,
RoomID: roomID,
},
}
table := NewEventTable(db)

// Check the event IDs haven't been added by another test.
gotEvents, err := table.SelectByIDs(txn, true, []string{eventID1, eventID2})
if len(gotEvents) > 0 {
t.Fatalf("Event IDs already in use---commited by another test?")
}

// Insert the events
_, err = table.Insert(txn, knownEvents, false)
if err != nil {
t.Fatalf("failed to insert event: %s", err)
}

gotEvents, err = table.SelectByIDs(txn, true, []string{eventID1, eventID2})
if err != nil {
t.Fatalf("failed to select events: %s", err)
}
if (gotEvents[0].ID == eventID1 && gotEvents[1].ID == eventID2) || (gotEvents[0].ID == eventID2 && gotEvents[1].ID == eventID1) {
t.Logf("Got expected event IDs after insert. NIDS: %s=%d, %s=%d", gotEvents[0].ID, gotEvents[0].NID, gotEvents[1].ID, gotEvents[1].NID)
} else {
DMRobertson marked this conversation as resolved.
Show resolved Hide resolved
t.Fatalf("Event ID mismatch: expected $A-SelectUnknownEventIDs and $B-SelectUnknownEventIDs, got %v", gotEvents)
}

// Someone else tells us the state of the room is {A, C}. Query which of those
// event IDs are unknown.
DMRobertson marked this conversation as resolved.
Show resolved Hide resolved
const unknownEventID = "$C-SelectUnknownEventIDs"
stateBlockIDs := []string{eventID1, unknownEventID}
unknownIDs, err := table.SelectUnknownEventIDs(txn, stateBlockIDs)
t.Logf("unknownIDs=%v", unknownIDs)
if err != nil {
t.Fatalf("failed to select unknown state events: %s", err)
}

// Only event C should be flagged as unknown.
if len(unknownIDs) != 1 {
t.Fatalf("Expected 1 unknown id, got %v", unknownIDs)
}
_, ok := unknownIDs[unknownEventID]
if !ok {
t.Fatalf("Expected $C-SelectUnknownEventIDs to be unknown to the DB, but it wasn't")
}
}
23 changes: 2 additions & 21 deletions tests-e2e/gappy_state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"fmt"
"github.com/matrix-org/sliding-sync/sync3"
"github.com/matrix-org/sliding-sync/testutils/m"
"github.com/tidwall/gjson"
"testing"
)

Expand Down Expand Up @@ -46,16 +45,7 @@ func TestGappyState(t *testing.T) {
m.MatchRoomSubscription(
roomID,
m.MatchRoomName(firstRoomName),
func(r sync3.Room) error {
if len(r.Timeline) == 0 {
return fmt.Errorf("no/empty timeline in response")
}
lastReceivedEventID := gjson.ParseBytes(r.Timeline[len(r.Timeline)-1]).Get("event_id").Str
if lastReceivedEventID == firstMessageID {
return nil
}
return fmt.Errorf("expected end of timeline to be %s but got %s", firstMessageID, lastReceivedEventID)
},
MatchRoomTimelineMostRecent(1, []Event{{ID: firstMessageID}}),
),
)

Expand Down Expand Up @@ -102,16 +92,7 @@ func TestGappyState(t *testing.T) {
m.MatchRoomSubscription(
roomID,
m.MatchRoomName("potato"),
func(r sync3.Room) error {
if len(r.Timeline) == 0 {
return fmt.Errorf("no timeline in response, expected at least one event")
}
lastReceivedEventID := gjson.ParseBytes(r.Timeline[len(r.Timeline)-1]).Get("event_id").Str
if lastReceivedEventID != latestMessageID {
return fmt.Errorf("last message in response is %s, expected %s", lastReceivedEventID, latestMessageID)
}
return nil
},
MatchRoomTimelineMostRecent(1, []Event{{ID: latestMessageID}}),
),
)
}
DMRobertson marked this conversation as resolved.
Show resolved Hide resolved
16 changes: 14 additions & 2 deletions tests-e2e/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,10 +79,22 @@ func eventsEqual(wantList []Event, gotList []json.RawMessage) error {
return nil
}

// MatchRoomTimelineMostRecent builds a matcher which checks that the last `n` elements
// of `events` are the same as the last n elements of the room timeline. If either list
// contains fewer than `n` events, the match fails.
// Events are tested for equality using `eventsEqual`.
func MatchRoomTimelineMostRecent(n int, events []Event) m.RoomMatcher {
subset := events[len(events)-n:]
return func(r sync3.Room) error {
return MatchRoomTimeline(subset)(r)
if len(events) < n {
return fmt.Errorf("list of wanted events has %d events, expected at least %d", len(events), n)
}
wantList := events[len(events)-n:]
if len(r.Timeline) < n {
return fmt.Errorf("timeline has %d events, expected at least %d", len(r.Timeline), n)
}

gotList := r.Timeline[len(r.Timeline)-n:]
return eventsEqual(wantList, gotList)
}
}

Expand Down
122 changes: 114 additions & 8 deletions tests-integration/poller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,33 +113,38 @@ func TestSecondPollerFiltersToDevice(t *testing.T) {
m.MatchResponse(t, res, m.MatchToDeviceMessages([]json.RawMessage{wantMsg}))
}

// TODO test description
// Test that the poller makes a best-effort attempt to integrate state seen in a
// v2 sync state block. Our strategy for doing so is to prepend any unknown state events
// to the start of the v2 sync response's timeline, which should then be visible to
// sync v3 clients as ordinary state events in the room timeline.
func TestPollerHandlesUnknownStateEventsOnIncrementalSync(t *testing.T) {
DMRobertson marked this conversation as resolved.
Show resolved Hide resolved
pqString := testutils.PrepareDBConnectionString()
v2 := runTestV2Server(t)
v3 := runTestServer(t, v2, pqString)
defer v2.close()
defer v3.close()
deviceAToken := "DEVICE_A_TOKEN"
v2.addAccount(alice, deviceAToken)
v2.addAccount(alice, aliceToken)
const roomID = "!unimportant"
v2.queueResponse(deviceAToken, sync2.SyncResponse{
v2.queueResponse(aliceToken, sync2.SyncResponse{
Rooms: sync2.SyncRoomsResponse{
Join: v2JoinTimeline(roomEvents{
roomID: roomID,
events: createRoomState(t, alice, time.Now()),
}),
},
})
res := v3.mustDoV3Request(t, deviceAToken, sync3.Request{
res := v3.mustDoV3Request(t, aliceToken, sync3.Request{
Lists: map[string]sync3.RequestList{
"a": {
Ranges: [][2]int64{{0, 20}},
RoomSubscription: sync3.RoomSubscription{
TimelineLimit: 10,
},
},
},
})

t.Log("The poller receives a gappy incremental sync response with a state block")
t.Log("The poller receives a gappy incremental sync response with a state block. The power levels and room name have changed.")
nameEvent := testutils.NewStateEvent(
t,
"m.room.name",
Expand All @@ -158,7 +163,7 @@ func TestPollerHandlesUnknownStateEventsOnIncrementalSync(t *testing.T) {
},
)
messageEvent := testutils.NewMessageEvent(t, alice, "hello")
v2.queueResponse(deviceAToken, sync2.SyncResponse{
v2.queueResponse(aliceToken, sync2.SyncResponse{
Rooms: sync2.SyncRoomsResponse{
Join: map[string]sync2.SyncV2JoinResponse{
roomID: {
Expand All @@ -175,7 +180,7 @@ func TestPollerHandlesUnknownStateEventsOnIncrementalSync(t *testing.T) {
},
})

res = v3.mustDoV3RequestWithPos(t, deviceAToken, res.Pos, sync3.Request{})
res = v3.mustDoV3RequestWithPos(t, aliceToken, res.Pos, sync3.Request{})
m.MatchResponse(
t,
res,
Expand All @@ -186,3 +191,104 @@ func TestPollerHandlesUnknownStateEventsOnIncrementalSync(t *testing.T) {
),
)
}
DMRobertson marked this conversation as resolved.
Show resolved Hide resolved

// Similar to TestPollerHandlesUnknownStateEventsOnIncrementalSync. Here we are testing
// that if Alice's poller sees Bob leave in a state block, the events seen in that
// timeline are not visible to Bob.
func TestPollerUpdatesRoomMemberTrackerOnGappySyncStateBlock(t *testing.T) {
pqString := testutils.PrepareDBConnectionString()
v2 := runTestV2Server(t)
v3 := runTestServer(t, v2, pqString)
defer v2.close()
defer v3.close()
v2.addAccount(alice, aliceToken)
v2.addAccount(bob, bobToken)
const roomID = "!unimportant"

t.Log("Alice's poller does an initial sync. It sees that Alice and Bob share a room.")
initialTimeline := createRoomState(t, alice, time.Now())
bobJoin := testutils.NewStateEvent(
t,
"m.room.member",
bob,
bob,
map[string]interface{}{"membership": "join"},
)
initialJoinBlock := v2JoinTimeline(roomEvents{
roomID: roomID,
events: append(initialTimeline, bobJoin),
})
v2.queueResponse(aliceToken, sync2.SyncResponse{
Rooms: sync2.SyncRoomsResponse{Join: initialJoinBlock},
})

t.Log("Alice makes an initial sliding sync request.")
aliceRes := v3.mustDoV3Request(t, aliceToken, sync3.Request{
Lists: map[string]sync3.RequestList{
"a": {
Ranges: [][2]int64{{0, 20}},
RoomSubscription: sync3.RoomSubscription{
TimelineLimit: 10,
},
},
},
})

t.Log("Alice's poller receives a gappy incremental sync response. Bob has left in the gap. The timeline includes a message from Alice.")
bobLeave := testutils.NewStateEvent(
t,
"m.room.member",
bob,
bob,
map[string]interface{}{"membership": "leave"},
)
aliceMessage := testutils.NewMessageEvent(t, alice, "hello")
v2.queueResponse(aliceToken, sync2.SyncResponse{
Rooms: sync2.SyncRoomsResponse{
Join: map[string]sync2.SyncV2JoinResponse{
roomID: {
State: sync2.EventsResponse{
Events: []json.RawMessage{bobLeave},
},
Timeline: sync2.TimelineResponse{
Events: []json.RawMessage{aliceMessage},
Limited: true,
PrevBatch: "batchymcbatchface",
},
},
},
},
})

t.Log("Alice makes an incremental sliding sync request.")
aliceRes = v3.mustDoV3RequestWithPos(t, aliceToken, aliceRes.Pos, sync3.Request{})

t.Log("She should see Bob's leave event and her message at the end of the room timeline.")
m.MatchResponse(
t,
aliceRes,
m.MatchRoomSubscription(
roomID,
m.MatchRoomTimelineMostRecent(2, []json.RawMessage{bobLeave, aliceMessage}),
),
)

t.Log("Bob makes an initial sliding sync request.")
bobRes := v3.mustDoV3Request(t, bobToken, sync3.Request{
Lists: map[string]sync3.RequestList{
"a": {
Ranges: [][2]int64{{0, 20}},
RoomSubscription: sync3.RoomSubscription{
TimelineLimit: 10,
},
},
},
})
t.Log("He should not see himself in the room.")
m.MatchResponse(
DMRobertson marked this conversation as resolved.
Show resolved Hide resolved
t,
bobRes,
m.MatchList("a", m.MatchV3Count(0)),
)

}