Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Faster remote room joins: invalidate caches and unblock requests when receiving un-partial-stated event notifications over replication. [rei:frrj/streams/unpsr] #14546

Merged
merged 4 commits into from
Dec 19, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/14546.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Faster remote room joins: stream the un-partial-stating of events over replication.
14 changes: 13 additions & 1 deletion synapse/replication/tcp/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,18 @@
TagAccountDataStream,
ToDeviceStream,
TypingStream,
UnPartialStatedEventStream,
UnPartialStatedRoomStream,
)
from synapse.replication.tcp.streams.events import (
EventsStream,
EventsStreamEventRow,
EventsStreamRow,
)
from synapse.replication.tcp.streams.partial_state import UnPartialStatedRoomStreamRow
from synapse.replication.tcp.streams.partial_state import (
UnPartialStatedEventStreamRow,
UnPartialStatedRoomStreamRow,
)
from synapse.types import PersistedEventPosition, ReadReceipt, StreamKeyType, UserID
from synapse.util.async_helpers import Linearizer, timeout_deferred
from synapse.util.metrics import Measure
Expand Down Expand Up @@ -247,6 +251,14 @@ async def on_rdata(
self._state_storage_controller.notify_room_un_partial_stated(
row.room_id
)
elif stream_name == UnPartialStatedEventStream.NAME:
for row in rows:
assert isinstance(row, UnPartialStatedEventStreamRow)

# Wake up any tasks waiting for the event to be un-partial-stated.
self._state_storage_controller.notify_event_un_partial_stated(
row.event_id
)

await self._presence_handler.process_replication_rows(
stream_name, instance_name, token, rows
Expand Down
27 changes: 15 additions & 12 deletions synapse/storage/databases/main/events_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,9 @@
run_as_background_process,
wrap_as_background_process,
)
from synapse.replication.tcp.streams import BackfillStream
from synapse.replication.tcp.streams import BackfillStream, UnPartialStatedEventStream
from synapse.replication.tcp.streams.events import EventsStream
from synapse.replication.tcp.streams.partial_state import UnPartialStatedEventStreamRow
from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause
from synapse.storage.database import (
DatabasePool,
Expand Down Expand Up @@ -391,6 +392,16 @@ def process_replication_rows(
self._stream_id_gen.advance(instance_name, token)
elif stream_name == BackfillStream.NAME:
self._backfill_id_gen.advance(instance_name, -token)
elif stream_name == UnPartialStatedEventStream.NAME:
for row in rows:
assert isinstance(row, UnPartialStatedEventStreamRow)

self.is_partial_state_event.invalidate((row.event_id,))

if row.rejection_status_changed:
# If the partial-stated event became rejected or unrejected
# when it wasn't before, we need to invalidate this cache.
self._invalidate_local_get_event_cache(row.event_id)

super().process_replication_rows(stream_name, instance_name, token, rows)

Expand Down Expand Up @@ -2380,6 +2391,9 @@ def mark_event_rejected_txn(

This can happen, for example, when resyncing state during a faster join.

It is the caller's responsibility to ensure that other workers are
sent a notification so that they call `_invalidate_local_get_event_cache()`.

Args:
txn:
event_id: ID of event to update
Expand Down Expand Up @@ -2418,14 +2432,3 @@ def mark_event_rejected_txn(
)

self.invalidate_get_event_cache_after_txn(txn, event_id)

# TODO(faster_joins): invalidate the cache on workers. Ideally we'd just
# call '_send_invalidation_to_replication', but we actually need the other
# end to call _invalidate_local_get_event_cache() rather than (just)
# _get_event_cache.invalidate().
#
# One solution might be to (somehow) get the workers to call
# _invalidate_caches_for_event() (though that will invalidate more than
# strictly necessary).
#
# https://github.com/matrix-org/synapse/issues/12994
18 changes: 17 additions & 1 deletion synapse/storage/databases/main/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
# limitations under the License.
import collections.abc
import logging
from typing import TYPE_CHECKING, Collection, Dict, Iterable, Optional, Set, Tuple
from typing import TYPE_CHECKING, Any, Collection, Dict, Iterable, Optional, Set, Tuple

import attr

Expand All @@ -24,6 +24,8 @@
from synapse.events import EventBase
from synapse.events.snapshot import EventContext
from synapse.logging.opentracing import trace
from synapse.replication.tcp.streams import UnPartialStatedEventStream
from synapse.replication.tcp.streams.partial_state import UnPartialStatedEventStreamRow
from synapse.storage._base import SQLBaseStore
from synapse.storage.database import (
DatabasePool,
Expand Down Expand Up @@ -82,6 +84,20 @@ def __init__(
super().__init__(database, db_conn, hs)
self._instance_name: str = hs.get_instance_name()

def process_replication_rows(
self,
stream_name: str,
instance_name: str,
token: int,
rows: Iterable[Any],
) -> None:
if stream_name == UnPartialStatedEventStream.NAME:
for row in rows:
assert isinstance(row, UnPartialStatedEventStreamRow)
self._get_state_group_for_event.invalidate((row.event_id,))

super().process_replication_rows(stream_name, instance_name, token, rows)

async def get_room_version(self, room_id: str) -> RoomVersion:
"""Get the room_version of a given room
Raises:
Expand Down