Skip to content

Commit

Permalink
Merge branch 'main' into dmr/debug-from-stable
Browse files Browse the repository at this point in the history
  • Loading branch information
kegsay authored Jul 12, 2023
2 parents ea25b81 + 57c25ab commit b72ad3b
Show file tree
Hide file tree
Showing 24 changed files with 719 additions and 205 deletions.
12 changes: 9 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,11 @@

Run a sliding sync proxy. An implementation of [MSC3575](https://github.com/matrix-org/matrix-doc/blob/kegan/sync-v3/proposals/3575-sync.md).

Proxy version to MSC API specification:
## Proxy version to MSC API specification

This describes which proxy versions implement which version of the API drafted
in MSC3575. See https://github.com/matrix-org/sliding-sync/releases for the
changes in the proxy itself.

- Version 0.1.x: [2022/04/01](https://github.com/matrix-org/matrix-spec-proposals/blob/615e8f5a7bfe4da813bc2db661ed0bd00bccac20/proposals/3575-sync.md)
- First release
Expand All @@ -21,10 +25,12 @@ Proxy version to MSC API specification:
- Support for `errcode` when sessions expire.
- Version 0.99.1 [2023/01/20](https://github.com/matrix-org/matrix-spec-proposals/blob/b4b4e7ff306920d2c862c6ff4d245110f6fa5bc7/proposals/3575-sync.md)
- Preparing for major v1.x release: lists-as-keys support.
- Version 0.99.2 [2024/07/27](https://github.com/matrix-org/matrix-spec-proposals/blob/eab643cb3ca63b03537a260fa343e1fb2d1ee284/proposals/3575-sync.md)
- Version 0.99.2 [2023/03/31](https://github.com/matrix-org/matrix-spec-proposals/blob/eab643cb3ca63b03537a260fa343e1fb2d1ee284/proposals/3575-sync.md)
- Experimental support for `bump_event_types` when ordering rooms by recency.
- Support for opting in to extensions on a per-list and per-room basis.
- Sentry support.
- Version 0.99.3 [2023/05/23](https://github.com/matrix-org/matrix-spec-proposals/blob/4103ee768a4a3e1decee80c2987f50f4c6b3d539/proposals/3575-sync.md)
- Support for per-list `bump_event_types`.
- Support for [`conn_id`](https://github.com/matrix-org/matrix-spec-proposals/blob/4103ee768a4a3e1decee80c2987f50f4c6b3d539/proposals/3575-sync.md#concurrent-connections) for distinguishing multiple concurrent connections.

## Usage

Expand Down
19 changes: 11 additions & 8 deletions cmd/syncv3/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,14 @@ package main

import (
"fmt"
"net/http"
_ "net/http/pprof"
"os"
"os/signal"
"strings"
"syscall"
"time"

"github.com/getsentry/sentry-go"
sentryhttp "github.com/getsentry/sentry-go/http"
syncv3 "github.com/matrix-org/sliding-sync"
Expand All @@ -10,18 +18,11 @@ import (
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/rs/zerolog"
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
"net/http"
_ "net/http/pprof"
"os"
"os/signal"
"strings"
"syscall"
"time"
)

var GitCommit string

const version = "0.99.2"
const version = "0.99.3"

const (
// Required fields
Expand Down Expand Up @@ -163,6 +164,8 @@ func main() {

h2, h3 := syncv3.Setup(args[EnvServer], args[EnvDB], args[EnvSecret], syncv3.Opts{
AddPrometheusMetrics: args[EnvPrometheus] != "",
DBMaxConns: 100,
DBConnMaxIdleTime: time.Hour,
})

go h2.StartV2Pollers()
Expand Down
8 changes: 4 additions & 4 deletions state/device_data_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func NewDeviceDataTable(db *sqlx.DB) *DeviceDataTable {
func (t *DeviceDataTable) Select(userID, deviceID string, swap bool) (result *internal.DeviceData, err error) {
err = sqlutil.WithTransaction(t.db, func(txn *sqlx.Tx) error {
var row DeviceDataRow
err = t.db.Get(&row, `SELECT data FROM syncv3_device_data WHERE user_id=$1 AND device_id=$2`, userID, deviceID)
err = txn.Get(&row, `SELECT data FROM syncv3_device_data WHERE user_id=$1 AND device_id=$2`, userID, deviceID)
if err != nil {
if err == sql.ErrNoRows {
// if there is no device data for this user, it's not an error.
Expand Down Expand Up @@ -78,7 +78,7 @@ func (t *DeviceDataTable) Select(userID, deviceID string, swap bool) (result *in
// the device_data table.
return nil
}
_, err = t.db.Exec(`UPDATE syncv3_device_data SET data=$1 WHERE user_id=$2 AND device_id=$3`, data, userID, deviceID)
_, err = txn.Exec(`UPDATE syncv3_device_data SET data=$1 WHERE user_id=$2 AND device_id=$3`, data, userID, deviceID)
return err
})
return
Expand All @@ -94,7 +94,7 @@ func (t *DeviceDataTable) Upsert(dd *internal.DeviceData) (pos int64, err error)
err = sqlutil.WithTransaction(t.db, func(txn *sqlx.Tx) error {
// select what already exists
var row DeviceDataRow
err = t.db.Get(&row, `SELECT data FROM syncv3_device_data WHERE user_id=$1 AND device_id=$2`, dd.UserID, dd.DeviceID)
err = txn.Get(&row, `SELECT data FROM syncv3_device_data WHERE user_id=$1 AND device_id=$2`, dd.UserID, dd.DeviceID)
if err != nil && err != sql.ErrNoRows {
return err
}
Expand All @@ -119,7 +119,7 @@ func (t *DeviceDataTable) Upsert(dd *internal.DeviceData) (pos int64, err error)
if err != nil {
return err
}
err = t.db.QueryRow(
err = txn.QueryRow(
`INSERT INTO syncv3_device_data(user_id, device_id, data) VALUES($1,$2,$3)
ON CONFLICT (user_id, device_id) DO UPDATE SET data=$3, id=nextval('syncv3_device_data_seq') RETURNING id`,
dd.UserID, dd.DeviceID, data,
Expand Down
23 changes: 21 additions & 2 deletions state/event_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,25 @@ func (t *EventTable) LatestEventInRooms(txn *sqlx.Tx, roomIDs []string, highestN
return
}

func (t *EventTable) LatestEventNIDInRooms(roomIDs []string, highestNID int64) (roomToNID map[string]int64, err error) {
// the position (event nid) may be for a random different room, so we need to find the highest nid <= this position for this room
var events []Event
err = t.db.Select(
&events,
`SELECT event_nid, room_id FROM syncv3_events
WHERE event_nid IN (SELECT max(event_nid) FROM syncv3_events WHERE event_nid <= $1 AND room_id = ANY($2) GROUP BY room_id)`,
highestNID, pq.StringArray(roomIDs),
)
if err == sql.ErrNoRows {
err = nil
}
roomToNID = make(map[string]int64)
for _, ev := range events {
roomToNID[ev.RoomID] = ev.NID
}
return
}

func (t *EventTable) SelectEventsBetween(txn *sqlx.Tx, roomID string, lowerExclusive, upperInclusive int64, limit int) ([]Event, error) {
var events []Event
err := txn.Select(&events, `SELECT event_nid, event FROM syncv3_events WHERE event_nid > $1 AND event_nid <= $2 AND room_id = $3 ORDER BY event_nid ASC LIMIT $4`,
Expand Down Expand Up @@ -419,8 +438,8 @@ func (t *EventTable) SelectClosestPrevBatchByID(roomID string, eventID string) (

// Select the closest prev batch token for the provided event NID. Returns the empty string if there
// is no closest.
func (t *EventTable) SelectClosestPrevBatch(roomID string, eventNID int64) (prevBatch string, err error) {
err = t.db.QueryRow(
func (t *EventTable) SelectClosestPrevBatch(txn *sqlx.Tx, roomID string, eventNID int64) (prevBatch string, err error) {
err = txn.QueryRow(
`SELECT prev_batch FROM syncv3_events WHERE prev_batch IS NOT NULL AND room_id=$1 AND event_nid >= $2 LIMIT 1`, roomID, eventNID,
).Scan(&prevBatch)
if err == sql.ErrNoRows {
Expand Down
101 changes: 97 additions & 4 deletions state/event_table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@ import (
"bytes"
"database/sql"
"fmt"
"reflect"
"testing"

"github.com/jmoiron/sqlx"
"github.com/tidwall/gjson"

"github.com/matrix-org/sliding-sync/sqlutil"
Expand Down Expand Up @@ -776,10 +778,14 @@ func TestEventTablePrevBatch(t *testing.T) {
}

assertPrevBatch := func(roomID string, index int, wantPrevBatch string) {
gotPrevBatch, err := table.SelectClosestPrevBatch(roomID, int64(idToNID[events[index].ID]))
if err != nil {
t.Fatalf("failed to SelectClosestPrevBatch: %s", err)
}
var gotPrevBatch string
_ = sqlutil.WithTransaction(db, func(txn *sqlx.Tx) error {
gotPrevBatch, err = table.SelectClosestPrevBatch(txn, roomID, int64(idToNID[events[index].ID]))
if err != nil {
t.Fatalf("failed to SelectClosestPrevBatch: %s", err)
}
return nil
})
if wantPrevBatch != "" {
if gotPrevBatch == "" || gotPrevBatch != wantPrevBatch {
t.Fatalf("SelectClosestPrevBatch: got %v want %v", gotPrevBatch, wantPrevBatch)
Expand Down Expand Up @@ -871,6 +877,93 @@ func TestRemoveUnsignedTXNID(t *testing.T) {
}
}

func TestLatestEventNIDInRooms(t *testing.T) {
db, close := connectToDB(t)
defer close()
table := NewEventTable(db)

var result map[string]int64
var err error
// Insert the following:
// - Room FIRST: [N]
// - Room SECOND: [N+1, N+2, N+3] (replace)
// - Room THIRD: [N+4] (max)
first := "!FIRST"
second := "!SECOND"
third := "!THIRD"
err = sqlutil.WithTransaction(db, func(txn *sqlx.Tx) error {
result, err = table.Insert(txn, []Event{
{
ID: "$N",
Type: "message",
RoomID: first,
JSON: []byte(`{}`),
},
{
ID: "$N+1",
Type: "message",
RoomID: second,
JSON: []byte(`{}`),
},
{
ID: "$N+2",
Type: "message",
RoomID: second,
JSON: []byte(`{}`),
},
{
ID: "$N+3",
Type: "message",
RoomID: second,
JSON: []byte(`{}`),
},
{
ID: "$N+4",
Type: "message",
RoomID: third,
JSON: []byte(`{}`),
},
}, false)
return err
})
assertNoError(t, err)

testCases := []struct {
roomIDs []string
highestNID int64
wantMap map[string]string
}{
// We should see FIRST=N, SECOND=N+3, THIRD=N+4 when querying LatestEventNIDInRooms with N+4
{
roomIDs: []string{first, second, third},
highestNID: result["$N+4"],
wantMap: map[string]string{
first: "$N", second: "$N+3", third: "$N+4",
},
},
// We should see FIRST=N, SECOND=N+2 when querying LatestEventNIDInRooms with N+2
{
roomIDs: []string{first, second, third},
highestNID: result["$N+2"],
wantMap: map[string]string{
first: "$N", second: "$N+2",
},
},
}
for _, tc := range testCases {
gotRoomToNID, err := table.LatestEventNIDInRooms(tc.roomIDs, int64(tc.highestNID))
assertNoError(t, err)
want := make(map[string]int64) // map event IDs to nids
for roomID, eventID := range tc.wantMap {
want[roomID] = int64(result[eventID])
}
if !reflect.DeepEqual(gotRoomToNID, want) {
t.Errorf("%+v: got %v want %v", tc, gotRoomToNID, want)
}
}

}

func TestEventTableSelectUnknownEventIDs(t *testing.T) {
db, close := connectToDB(t)
defer close()
Expand Down
43 changes: 28 additions & 15 deletions state/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,12 @@ type StartupSnapshot struct {
AllJoinedMembers map[string][]string // room_id -> [user_id]
}

type LatestEvents struct {
Timeline []json.RawMessage
PrevBatch string
LatestNID int64
}

type Storage struct {
Accumulator *Accumulator
EventsTable *EventTable
Expand Down Expand Up @@ -535,7 +541,7 @@ func (s *Storage) RoomStateAfterEventPosition(ctx context.Context, roomIDs []str
if err != nil {
return fmt.Errorf("failed to form sql query: %s", err)
}
rows, err := s.Accumulator.db.Query(s.Accumulator.db.Rebind(query), args...)
rows, err := txn.Query(txn.Rebind(query), args...)
if err != nil {
return fmt.Errorf("failed to execute query: %s", err)
}
Expand Down Expand Up @@ -580,16 +586,16 @@ func (s *Storage) RoomStateAfterEventPosition(ctx context.Context, roomIDs []str
return
}

func (s *Storage) LatestEventsInRooms(userID string, roomIDs []string, to int64, limit int) (map[string][]json.RawMessage, map[string]string, error) {
func (s *Storage) LatestEventsInRooms(userID string, roomIDs []string, to int64, limit int) (map[string]*LatestEvents, error) {
roomIDToRanges, err := s.visibleEventNIDsBetweenForRooms(userID, roomIDs, 0, to)
if err != nil {
return nil, nil, err
return nil, err
}
result := make(map[string][]json.RawMessage, len(roomIDs))
prevBatches := make(map[string]string, len(roomIDs))
result := make(map[string]*LatestEvents, len(roomIDs))
err = sqlutil.WithTransaction(s.Accumulator.db, func(txn *sqlx.Tx) error {
for roomID, ranges := range roomIDToRanges {
var earliestEventNID int64
var latestEventNID int64
var roomEvents []json.RawMessage
// start at the most recent range as we want to return the most recent `limit` events
for i := len(ranges) - 1; i >= 0; i-- {
Expand All @@ -604,26 +610,33 @@ func (s *Storage) LatestEventsInRooms(userID string, roomIDs []string, to int64,
}
// keep pushing to the front so we end up with A,B,C
for _, ev := range events {
if latestEventNID == 0 { // set first time and never again
latestEventNID = ev.NID
}
roomEvents = append([]json.RawMessage{ev.JSON}, roomEvents...)
earliestEventNID = ev.NID
if len(roomEvents) >= limit {
break
}
}
}
latestEvents := LatestEvents{
LatestNID: latestEventNID,
Timeline: roomEvents,
}
if earliestEventNID != 0 {
// the oldest event needs a prev batch token, so find one now
prevBatch, err := s.EventsTable.SelectClosestPrevBatch(roomID, earliestEventNID)
prevBatch, err := s.EventsTable.SelectClosestPrevBatch(txn, roomID, earliestEventNID)
if err != nil {
return fmt.Errorf("failed to select prev_batch for room %s : %s", roomID, err)
}
prevBatches[roomID] = prevBatch
latestEvents.PrevBatch = prevBatch
}
result[roomID] = roomEvents
result[roomID] = &latestEvents
}
return nil
})
return result, prevBatches, err
return result, err
}

func (s *Storage) visibleEventNIDsBetweenForRooms(userID string, roomIDs []string, from, to int64) (map[string][][2]int64, error) {
Expand All @@ -637,7 +650,7 @@ func (s *Storage) visibleEventNIDsBetweenForRooms(userID string, roomIDs []strin
return nil, fmt.Errorf("VisibleEventNIDsBetweenForRooms.SelectEventsWithTypeStateKeyInRooms: %s", err)
}
}
joinTimingsByRoomID, err := s.determineJoinedRoomsFromMemberships(membershipEvents)
joinTimingsAtFromByRoomID, err := s.determineJoinedRoomsFromMemberships(membershipEvents)
if err != nil {
return nil, fmt.Errorf("failed to work out joined rooms for %s at pos %d: %s", userID, from, err)
}
Expand All @@ -648,7 +661,7 @@ func (s *Storage) visibleEventNIDsBetweenForRooms(userID string, roomIDs []strin
return nil, fmt.Errorf("failed to load membership events: %s", err)
}

return s.visibleEventNIDsWithData(joinTimingsByRoomID, membershipEvents, userID, from, to)
return s.visibleEventNIDsWithData(joinTimingsAtFromByRoomID, membershipEvents, userID, from, to)
}

// Work out the NID ranges to pull events from for this user. Given a from and to event nid stream position,
Expand Down Expand Up @@ -678,7 +691,7 @@ func (s *Storage) visibleEventNIDsBetweenForRooms(userID string, roomIDs []strin
// - For Room E: from=1, to=15 returns { RoomE: [ [3,3], [13,15] ] } (tests invites)
func (s *Storage) VisibleEventNIDsBetween(userID string, from, to int64) (map[string][][2]int64, error) {
// load *ALL* joined rooms for this user at from (inclusive)
joinTimingsByRoomID, err := s.JoinedRoomsAfterPosition(userID, from)
joinTimingsAtFromByRoomID, err := s.JoinedRoomsAfterPosition(userID, from)
if err != nil {
return nil, fmt.Errorf("failed to work out joined rooms for %s at pos %d: %s", userID, from, err)
}
Expand All @@ -689,10 +702,10 @@ func (s *Storage) VisibleEventNIDsBetween(userID string, from, to int64) (map[st
return nil, fmt.Errorf("failed to load membership events: %s", err)
}

return s.visibleEventNIDsWithData(joinTimingsByRoomID, membershipEvents, userID, from, to)
return s.visibleEventNIDsWithData(joinTimingsAtFromByRoomID, membershipEvents, userID, from, to)
}

func (s *Storage) visibleEventNIDsWithData(joinTimingsByRoomID map[string]internal.EventMetadata, membershipEvents []Event, userID string, from, to int64) (map[string][][2]int64, error) {
func (s *Storage) visibleEventNIDsWithData(joinTimingsAtFromByRoomID map[string]internal.EventMetadata, membershipEvents []Event, userID string, from, to int64) (map[string][][2]int64, error) {
// load membership events in order and bucket based on room ID
roomIDToLogs := make(map[string][]membershipEvent)
for _, ev := range membershipEvents {
Expand Down Expand Up @@ -754,7 +767,7 @@ func (s *Storage) visibleEventNIDsWithData(joinTimingsByRoomID map[string]intern

// For each joined room, perform the algorithm and delete the logs afterwards
result := make(map[string][][2]int64)
for joinedRoomID, _ := range joinTimingsByRoomID {
for joinedRoomID, _ := range joinTimingsAtFromByRoomID {
roomResult := calculateVisibleEventNIDs(true, from, to, roomIDToLogs[joinedRoomID])
result[joinedRoomID] = roomResult
delete(roomIDToLogs, joinedRoomID)
Expand Down
Loading

0 comments on commit b72ad3b

Please sign in to comment.