Skip to content

Commit

Permalink
Merge pull request #366 from matrix-org/kegan/invite-join-gap
Browse files Browse the repository at this point in the history
Modify which events get returned when requesting initial timeline events
  • Loading branch information
kegsay authored Nov 6, 2023
2 parents 30586cf + 6f2a00a commit 421a572
Show file tree
Hide file tree
Showing 4 changed files with 201 additions and 80 deletions.
82 changes: 42 additions & 40 deletions state/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"os"
"strings"

"golang.org/x/exp/slices"

"github.com/matrix-org/sliding-sync/sync2"

"github.com/getsentry/sentry-go"
Expand Down Expand Up @@ -701,39 +703,33 @@ func (s *Storage) RoomStateAfterEventPosition(ctx context.Context, roomIDs []str
// - with NIDs <= `to`.
// Up to `limit` events are chosen per room.
func (s *Storage) LatestEventsInRooms(userID string, roomIDs []string, to int64, limit int) (map[string]*LatestEvents, error) {
roomIDToRanges, err := s.visibleEventNIDsBetweenForRooms(userID, roomIDs, 0, to)
roomIDToRange, err := s.visibleEventNIDsBetweenForRooms(userID, roomIDs, 0, to)
if err != nil {
return nil, err
}
result := make(map[string]*LatestEvents, len(roomIDs))
err = sqlutil.WithTransaction(s.Accumulator.db, func(txn *sqlx.Tx) error {
for roomID, ranges := range roomIDToRanges {
for roomID, r := range roomIDToRange {
var earliestEventNID int64
var latestEventNID int64
var roomEvents []json.RawMessage
// start at the most recent range as we want to return the most recent `limit` events
for i := len(ranges) - 1; i >= 0; i-- {
// the most recent event will be first
events, err := s.EventsTable.SelectLatestEventsBetween(txn, roomID, r[0]-1, r[1], limit)
if err != nil {
return fmt.Errorf("room %s failed to SelectEventsBetween: %s", roomID, err)
}
for _, ev := range events {
if latestEventNID == 0 { // set first time and never again
latestEventNID = ev.NID
}
roomEvents = append(roomEvents, ev.JSON)
earliestEventNID = ev.NID
if len(roomEvents) >= limit {
break
}
r := ranges[i]
// the most recent event will be first
events, err := s.EventsTable.SelectLatestEventsBetween(txn, roomID, r[0]-1, r[1], limit)
if err != nil {
return fmt.Errorf("room %s failed to SelectEventsBetween: %s", roomID, err)
}
// keep pushing to the front so we end up with A,B,C
for _, ev := range events {
if latestEventNID == 0 { // set first time and never again
latestEventNID = ev.NID
}
roomEvents = append([]json.RawMessage{ev.JSON}, roomEvents...)
earliestEventNID = ev.NID
if len(roomEvents) >= limit {
break
}
}
}
// we want the most recent event to be last, so reverse the slice now in-place.
slices.Reverse(roomEvents)
latestEvents := LatestEvents{
LatestNID: latestEventNID,
Timeline: roomEvents,
Expand All @@ -754,9 +750,9 @@ func (s *Storage) LatestEventsInRooms(userID string, roomIDs []string, to int64,
}

// visibleEventNIDsBetweenForRooms determines which events a given user has permission to see.
// It accepts a nid range [from, to]. For each given room, it calculates the NID ranges
// [A1, B1], [A2, B2], ... within [from, to] in which the user has permission to see events.
func (s *Storage) visibleEventNIDsBetweenForRooms(userID string, roomIDs []string, from, to int64) (map[string][][2]int64, error) {
// It accepts a nid range [from, to]. For each given room, it calculates the NID range
// [A1, B1] within [from, to] in which the user has permission to see events.
func (s *Storage) visibleEventNIDsBetweenForRooms(userID string, roomIDs []string, from, to int64) (map[string][2]int64, error) {
// load *THESE* joined rooms for this user at from (inclusive)
var membershipEvents []Event
var err error
Expand All @@ -782,7 +778,7 @@ func (s *Storage) visibleEventNIDsBetweenForRooms(userID string, roomIDs []strin
}

// Work out the NID ranges to pull events from for this user. Given a from and to event nid stream position,
// this function returns a map of room ID to a slice of 2-element from|to positions. These positions are
// this function returns a map of room ID to a 2-element from|to positions. These positions are
// all INCLUSIVE, and the client should be informed of these events at some point. For example:
//
// Stream Positions
Expand All @@ -793,20 +789,23 @@ func (s *Storage) visibleEventNIDsBetweenForRooms(userID string, roomIDs []strin
//
// E=message event, M=membership event, followed by user letter, followed by 'i' or 'j' or 'l' for invite|join|leave
//
// - For Room A: from=1, to=10, returns { RoomA: [ [1,10] ]} (tests events in joined room)
// - For Room B: from=1, to=10, returns { RoomB: [ [5,10] ]} (tests joining a room starts events)
// - For Room C: from=1, to=10, returns { RoomC: [ [0,9] ]} (tests leaving a room stops events)
// - For Room A: from=1, to=10, returns { RoomA: [ 1,10 ]} (tests events in joined room)
// - For Room B: from=1, to=10, returns { RoomB: [ 5,10 ]} (tests joining a room starts events)
// - For Room C: from=1, to=10, returns { RoomC: [ 0,9 ]} (tests leaving a room stops events)
//
// Multiple slices can occur when a user leaves and re-joins the same room, and invites are same-element positions:
// In cases where a user joins/leaves a room multiple times in the nid range, only the last range is returned.
// This is critical to ensure we don't skip out timeline events due to history visibility (which the proxy defers
// to the upstream HS for). See https://github.com/matrix-org/sliding-sync/issues/365 for what happens if we returned
// all ranges.
//
// Stream Positions
// 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
// Room D Maj E Mal E Maj E Mal E
// Room E E Mai E E Maj E E
//
// - For Room D: from=1, to=15 returns { RoomD: [ [1,6], [8,10] ] } (tests multi-join/leave)
// - For Room E: from=1, to=15 returns { RoomE: [ [3,3], [13,15] ] } (tests invites)
func (s *Storage) VisibleEventNIDsBetween(userID string, from, to int64) (map[string][][2]int64, error) {
// - For Room D: from=1, to=15 returns { RoomD: [ 8,10 ] } (tests multi-join/leave)
// - For Room E: from=1, to=15 returns { RoomE: [ 13,15 ] } (tests invites)
func (s *Storage) VisibleEventNIDsBetween(userID string, from, to int64) (map[string][2]int64, error) {
// load *ALL* joined rooms for this user at from (inclusive)
joinTimingsAtFromByRoomID, err := s.JoinedRoomsAfterPosition(userID, from)
if err != nil {
Expand All @@ -822,7 +821,7 @@ func (s *Storage) VisibleEventNIDsBetween(userID string, from, to int64) (map[st
return s.visibleEventNIDsWithData(joinTimingsAtFromByRoomID, membershipEvents, userID, from, to)
}

func (s *Storage) visibleEventNIDsWithData(joinTimingsAtFromByRoomID map[string]internal.EventMetadata, membershipEvents []Event, userID string, from, to int64) (map[string][][2]int64, error) {
func (s *Storage) visibleEventNIDsWithData(joinTimingsAtFromByRoomID map[string]internal.EventMetadata, membershipEvents []Event, userID string, from, to int64) (map[string][2]int64, error) {
// load membership events in order and bucket based on room ID
roomIDToLogs := make(map[string][]membershipEvent)
for _, ev := range membershipEvents {
Expand All @@ -835,13 +834,11 @@ func (s *Storage) visibleEventNIDsWithData(joinTimingsAtFromByRoomID map[string]
}

// Performs the algorithm
calculateVisibleEventNIDs := func(isJoined bool, fromIncl, toIncl int64, logs []membershipEvent) [][2]int64 {
calculateVisibleEventNIDs := func(isJoined bool, fromIncl, toIncl int64, logs []membershipEvent) [2]int64 {
// short circuit when there are no membership deltas
if len(logs) == 0 {
return [][2]int64{
{
fromIncl, toIncl,
},
return [2]int64{
fromIncl, toIncl, // TODO: is this actually valid? Surely omitting it is the right answer?
}
}
var result [][2]int64
Expand Down Expand Up @@ -879,11 +876,16 @@ func (s *Storage) visibleEventNIDsWithData(joinTimingsAtFromByRoomID map[string]
if isJoined {
result = append(result, [2]int64{startIndex, toIncl})
}
return result
if len(result) == 0 {
return [2]int64{}
}
// we only care about the LAST nid range, otherwise we can end up with gaps being returned in the
// timeline. See https://github.com/matrix-org/sliding-sync/issues/365
return result[len(result)-1]
}

// For each joined room, perform the algorithm and delete the logs afterwards
result := make(map[string][][2]int64)
result := make(map[string][2]int64)
for joinedRoomID, _ := range joinTimingsAtFromByRoomID {
roomResult := calculateVisibleEventNIDs(true, from, to, roomIDToLogs[joinedRoomID])
result[joinedRoomID] = roomResult
Expand Down
70 changes: 30 additions & 40 deletions state/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,13 @@ import (
"context"
"encoding/json"
"fmt"
"github.com/matrix-org/sliding-sync/sync2"
"reflect"
"sort"
"testing"
"time"

"github.com/matrix-org/sliding-sync/sync2"

"github.com/jmoiron/sqlx"
"github.com/matrix-org/gomatrixserverlib/spec"
"github.com/matrix-org/sliding-sync/internal"
Expand Down Expand Up @@ -362,17 +363,15 @@ func TestVisibleEventNIDsBetween(t *testing.T) {
t.Fatalf("LatestEventNID: %s", err)
}
t.Logf("ABC Start=%d Latest=%d", startPos, latestPos)
roomIDToVisibleRanges, err := store.VisibleEventNIDsBetween(alice, startPos, latestPos)
roomIDToVisibleRange, err := store.VisibleEventNIDsBetween(alice, startPos, latestPos)
if err != nil {
t.Fatalf("VisibleEventNIDsBetween to %d: %s", latestPos, err)
}
for roomID, ranges := range roomIDToVisibleRanges {
for _, r := range ranges {
t.Logf("%v => [%d,%d]", roomID, r[0]-startPos, r[1]-startPos)
}
for roomID, r := range roomIDToVisibleRange {
t.Logf("%v => [%d,%d]", roomID, r[0]-startPos, r[1]-startPos)
}
if len(roomIDToVisibleRanges) != 3 {
t.Errorf("VisibleEventNIDsBetween: wrong number of rooms, want 3 got %+v", roomIDToVisibleRanges)
if len(roomIDToVisibleRange) != 3 {
t.Errorf("VisibleEventNIDsBetween: wrong number of rooms, want 3 got %+v", roomIDToVisibleRange)
}

// check that we can query subsets too
Expand All @@ -381,23 +380,23 @@ func TestVisibleEventNIDsBetween(t *testing.T) {
t.Fatalf("VisibleEventNIDsBetweenForRooms to %d: %s", latestPos, err)
}
if len(roomIDToVisibleRangesSubset) != 2 {
t.Errorf("VisibleEventNIDsBetweenForRooms: wrong number of rooms, want 2 got %+v", roomIDToVisibleRanges)
t.Errorf("VisibleEventNIDsBetweenForRooms: wrong number of rooms, want 2 got %+v", roomIDToVisibleRange)
}

// For Room A: from=1, to=10, returns { RoomA: [ [1,10] ]} (tests events in joined room)
verifyRange(t, roomIDToVisibleRanges, roomA, [][2]int64{
{1 + startPos, 10 + startPos},
verifyRange(t, roomIDToVisibleRange, roomA, [2]int64{
1 + startPos, 10 + startPos,
})

// For Room B: from=1, to=10, returns { RoomB: [ [5,10] ]} (tests joining a room starts events)
verifyRange(t, roomIDToVisibleRanges, roomB, [][2]int64{
{5 + startPos, 10 + startPos},
verifyRange(t, roomIDToVisibleRange, roomB, [2]int64{
5 + startPos, 10 + startPos,
})

// For Room C: from=1, to=10, returns { RoomC: [ [0,9] ]} (tests leaving a room stops events)
// We start at 0 because it's the earliest event (we were joined since the beginning of the room state)
verifyRange(t, roomIDToVisibleRanges, roomC, [][2]int64{
{0 + startPos, 9 + startPos},
verifyRange(t, roomIDToVisibleRange, roomC, [2]int64{
0 + startPos, 9 + startPos,
})

// change the users else we will still have some rooms from A,B,C present if the user is still joined
Expand Down Expand Up @@ -465,29 +464,25 @@ func TestVisibleEventNIDsBetween(t *testing.T) {
t.Fatalf("LatestEventNID: %s", err)
}
t.Logf("DE Start=%d Latest=%d", startPos, latestPos)
roomIDToVisibleRanges, err = store.VisibleEventNIDsBetween(alice, startPos, latestPos)
roomIDToVisibleRange, err = store.VisibleEventNIDsBetween(alice, startPos, latestPos)
if err != nil {
t.Fatalf("VisibleEventNIDsBetween to %d: %s", latestPos, err)
}
for roomID, ranges := range roomIDToVisibleRanges {
for _, r := range ranges {
t.Logf("%v => [%d,%d]", roomID, r[0]-startPos, r[1]-startPos)
}
for roomID, r := range roomIDToVisibleRange {
t.Logf("%v => [%d,%d]", roomID, r[0]-startPos, r[1]-startPos)
}
if len(roomIDToVisibleRanges) != 2 {
t.Errorf("VisibleEventNIDsBetween: wrong number of rooms, want 2 got %+v", roomIDToVisibleRanges)
if len(roomIDToVisibleRange) != 2 {
t.Errorf("VisibleEventNIDsBetween: wrong number of rooms, want 2 got %+v", roomIDToVisibleRange)
}

// For Room D: from=1, to=15 returns { RoomD: [ [1,6], [8,10] ] } (tests multi-join/leave)
verifyRange(t, roomIDToVisibleRanges, roomD, [][2]int64{
{1 + startPos, 6 + startPos},
{8 + startPos, 10 + startPos},
// For Room D: from=1, to=15 returns { RoomD: [ 8,10 ] } (tests multi-join/leave)
verifyRange(t, roomIDToVisibleRange, roomD, [2]int64{
8 + startPos, 10 + startPos,
})

// For Room E: from=1, to=15 returns { RoomE: [ [3,3], [13,15] ] } (tests invites)
verifyRange(t, roomIDToVisibleRanges, roomE, [][2]int64{
{3 + startPos, 3 + startPos},
{13 + startPos, 15 + startPos},
// For Room E: from=1, to=15 returns { RoomE: [ 13,15 ] } (tests invites)
verifyRange(t, roomIDToVisibleRange, roomE, [2]int64{
13 + startPos, 15 + startPos,
})

}
Expand Down Expand Up @@ -961,18 +956,13 @@ func sortHeroes(heroes []internal.Hero) []internal.Hero {
return heroes
}

func verifyRange(t *testing.T, result map[string][][2]int64, roomID string, wantRanges [][2]int64) {
func verifyRange(t *testing.T, result map[string][2]int64, roomID string, wantRange [2]int64) {
t.Helper()
gotRanges := result[roomID]
if gotRanges == nil {
gotRange := result[roomID]
if gotRange == [2]int64{} {
t.Fatalf("no range was returned for room %s", roomID)
}
if len(gotRanges) != len(wantRanges) {
t.Fatalf("%s range count mismatch, got %d ranges, want %d :: GOT=%+v WANT=%+v", roomID, len(gotRanges), len(wantRanges), gotRanges, wantRanges)
}
for i := range gotRanges {
if !reflect.DeepEqual(gotRanges[i], wantRanges[i]) {
t.Errorf("%s range at index %d got %v want %v", roomID, i, gotRanges[i], wantRanges[i])
}
if !reflect.DeepEqual(gotRange, wantRange) {
t.Errorf("%s range got %v want %v", roomID, gotRange, wantRange)
}
}
15 changes: 15 additions & 0 deletions tests-e2e/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,13 @@ import (
"io"
"net/http"
"net/url"
"strconv"
"testing"
"time"

"github.com/matrix-org/complement/b"
"github.com/matrix-org/complement/client"
"github.com/matrix-org/complement/must"
"github.com/matrix-org/sliding-sync/sync3"
"github.com/tidwall/gjson"
)
Expand Down Expand Up @@ -43,6 +45,19 @@ type CSAPI struct {
AvatarURL string
}

// TODO: put this in Complement at some point? Check usage.
func (c *CSAPI) Scrollback(t *testing.T, roomID, prevBatch string, limit int) gjson.Result {
t.Helper()
res := c.MustDo(t, "GET", []string{
"_matrix", "client", "v3", "rooms", roomID, "messages",
}, client.WithQueries(url.Values{
"dir": []string{"b"},
"from": []string{prevBatch},
"limit": []string{strconv.Itoa(limit)},
}))
return must.ParseJSON(t, res.Body)
}

// SlidingSync performs a single sliding sync request. Fails on non 2xx
func (c *CSAPI) SlidingSync(t *testing.T, data sync3.Request, opts ...client.RequestOpt) (resBody *sync3.Response) {
t.Helper()
Expand Down
Loading

0 comments on commit 421a572

Please sign in to comment.