Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Make StreamToken.room_key be a RoomStreamToken instance. (#8281)
Browse files Browse the repository at this point in the history
  • Loading branch information
erikjohnston authored Sep 11, 2020
1 parent c312ee3 commit fe8ed1b
Show file tree
Hide file tree
Showing 16 changed files with 114 additions and 99 deletions.
1 change: 1 addition & 0 deletions changelog.d/8281.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Change `StreamToken.room_key` to be a `RoomStreamToken` instance.
2 changes: 2 additions & 0 deletions mypy.ini
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,12 @@ files =
synapse/server_notices,
synapse/spam_checker_api,
synapse/state,
synapse/storage/databases/main/events.py,
synapse/storage/databases/main/stream.py,
synapse/storage/databases/main/ui_auth.py,
synapse/storage/database.py,
synapse/storage/engines,
synapse/storage/persist_events.py,
synapse/storage/state.py,
synapse/storage/util,
synapse/streams,
Expand Down
6 changes: 3 additions & 3 deletions synapse/handlers/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,8 +125,8 @@ async def export_user_data(self, user_id, writer):
else:
stream_ordering = room.stream_ordering

from_key = str(RoomStreamToken(0, 0))
to_key = str(RoomStreamToken(None, stream_ordering))
from_key = RoomStreamToken(0, 0)
to_key = RoomStreamToken(None, stream_ordering)

written_events = set() # Events that we've processed in this room

Expand All @@ -153,7 +153,7 @@ async def export_user_data(self, user_id, writer):
if not events:
break

from_key = events[-1].internal_metadata.after
from_key = RoomStreamToken.parse(events[-1].internal_metadata.after)

events = await filter_events_for_client(self.storage, user_id, events)

Expand Down
12 changes: 5 additions & 7 deletions synapse/handlers/device.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.types import (
RoomStreamToken,
StreamToken,
get_domain_from_id,
get_verify_key_from_cross_signing_key,
)
Expand Down Expand Up @@ -104,18 +105,15 @@ async def get_device(self, user_id: str, device_id: str) -> Dict[str, Any]:

@trace
@measure_func("device.get_user_ids_changed")
async def get_user_ids_changed(self, user_id, from_token):
async def get_user_ids_changed(self, user_id: str, from_token: StreamToken):
"""Get list of users that have had the devices updated, or have newly
joined a room, that `user_id` may be interested in.
Args:
user_id (str)
from_token (StreamToken)
"""

set_tag("user_id", user_id)
set_tag("from_token", from_token)
now_room_key = await self.store.get_room_events_max_id()
now_room_id = self.store.get_room_max_stream_ordering()
now_room_key = RoomStreamToken(None, now_room_id)

room_ids = await self.store.get_rooms_for_user(user_id)

Expand All @@ -142,7 +140,7 @@ async def get_user_ids_changed(self, user_id, from_token):
)
rooms_changed.update(event.room_id for event in member_events)

stream_ordering = RoomStreamToken.parse_stream_token(from_token.room_key).stream
stream_ordering = from_token.room_key.stream

possibly_changed = set(changed)
possibly_left = set()
Expand Down
4 changes: 2 additions & 2 deletions synapse/handlers/initial_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
from synapse.logging.context import make_deferred_yieldable, run_in_background
from synapse.storage.roommember import RoomsForUser
from synapse.streams.config import PaginationConfig
from synapse.types import JsonDict, Requester, StreamToken, UserID
from synapse.types import JsonDict, Requester, RoomStreamToken, StreamToken, UserID
from synapse.util import unwrapFirstError
from synapse.util.async_helpers import concurrently_execute
from synapse.util.caches.response_cache import ResponseCache
Expand Down Expand Up @@ -167,7 +167,7 @@ async def handle_room(event: RoomsForUser):
self.state_handler.get_current_state, event.room_id
)
elif event.membership == Membership.LEAVE:
room_end_token = "s%d" % (event.stream_ordering,)
room_end_token = RoomStreamToken(None, event.stream_ordering,)
deferred_room_state = run_in_background(
self.state_store.get_state_for_events, [event.event_id]
)
Expand Down
1 change: 1 addition & 0 deletions synapse/handlers/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -973,6 +973,7 @@ async def persist_and_notify_client_event(
This should only be run on the instance in charge of persisting events.
"""
assert self._is_event_writer
assert self.storage.persistence is not None

if ratelimit:
# We check if this is a room admin redacting an event so that we
Expand Down
4 changes: 2 additions & 2 deletions synapse/handlers/pagination.py
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,7 @@ async def get_messages(
# gets called.
raise Exception("limit not set")

room_token = RoomStreamToken.parse(from_token.room_key)
room_token = from_token.room_key

with await self.pagination_lock.read(room_id):
(
Expand Down Expand Up @@ -381,7 +381,7 @@ async def get_messages(

if leave_token.topological < max_topo:
from_token = from_token.copy_and_replace(
"room_key", leave_token_str
"room_key", leave_token
)

await self.hs.get_handlers().federation_handler.maybe_backfill(
Expand Down
15 changes: 7 additions & 8 deletions synapse/handlers/room.py
Original file line number Diff line number Diff line change
Expand Up @@ -1091,20 +1091,19 @@ def __init__(self, hs: "HomeServer"):
async def get_new_events(
self,
user: UserID,
from_key: str,
from_key: RoomStreamToken,
limit: int,
room_ids: List[str],
is_guest: bool,
explicit_room_id: Optional[str] = None,
) -> Tuple[List[EventBase], str]:
) -> Tuple[List[EventBase], RoomStreamToken]:
# We just ignore the key for now.

to_key = self.get_current_key()

from_token = RoomStreamToken.parse(from_key)
if from_token.topological:
if from_key.topological:
logger.warning("Stream has topological part!!!! %r", from_key)
from_key = "s%s" % (from_token.stream,)
from_key = RoomStreamToken(None, from_key.stream)

app_service = self.store.get_app_service_by_user_id(user.to_string())
if app_service:
Expand Down Expand Up @@ -1133,14 +1132,14 @@ async def get_new_events(
events[:] = events[:limit]

if events:
end_key = events[-1].internal_metadata.after
end_key = RoomStreamToken.parse(events[-1].internal_metadata.after)
else:
end_key = to_key

return (events, end_key)

def get_current_key(self) -> str:
return "s%d" % (self.store.get_room_max_stream_ordering(),)
def get_current_key(self) -> RoomStreamToken:
return RoomStreamToken(None, self.store.get_room_max_stream_ordering())

def get_current_key_for_room(self, room_id: str) -> Awaitable[str]:
return self.store.get_room_events_max_id(room_id)
Expand Down
11 changes: 6 additions & 5 deletions synapse/handlers/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,7 @@ async def ephemeral_by_room(
sync_config = sync_result_builder.sync_config

with Measure(self.clock, "ephemeral_by_room"):
typing_key = since_token.typing_key if since_token else "0"
typing_key = since_token.typing_key if since_token else 0

room_ids = sync_result_builder.joined_room_ids

Expand All @@ -402,7 +402,7 @@ async def ephemeral_by_room(
event_copy = {k: v for (k, v) in event.items() if k != "room_id"}
ephemeral_by_room.setdefault(room_id, []).append(event_copy)

receipt_key = since_token.receipt_key if since_token else "0"
receipt_key = since_token.receipt_key if since_token else 0

receipt_source = self.event_sources.sources["receipt"]
receipts, receipt_key = await receipt_source.get_new_events(
Expand Down Expand Up @@ -533,7 +533,7 @@ async def _load_filtered_recents(
if len(recents) > timeline_limit:
limited = True
recents = recents[-timeline_limit:]
room_key = recents[0].internal_metadata.before
room_key = RoomStreamToken.parse(recents[0].internal_metadata.before)

prev_batch_token = now_token.copy_and_replace("room_key", room_key)

Expand Down Expand Up @@ -1322,6 +1322,7 @@ async def _generate_sync_entry_for_presence(
is_guest=sync_config.is_guest,
include_offline=include_offline,
)
assert presence_key
sync_result_builder.now_token = now_token.copy_and_replace(
"presence_key", presence_key
)
Expand Down Expand Up @@ -1484,7 +1485,7 @@ async def _have_rooms_changed(
if rooms_changed:
return True

stream_id = RoomStreamToken.parse_stream_token(since_token.room_key).stream
stream_id = since_token.room_key.stream
for room_id in sync_result_builder.joined_room_ids:
if self.store.has_room_changed_since(room_id, stream_id):
return True
Expand Down Expand Up @@ -1750,7 +1751,7 @@ async def _get_all_rooms(
continue

leave_token = now_token.copy_and_replace(
"room_key", "s%d" % (event.stream_ordering,)
"room_key", RoomStreamToken(None, event.stream_ordering)
)
room_entries.append(
RoomSyncResultBuilder(
Expand Down
16 changes: 12 additions & 4 deletions synapse/notifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
Set,
Tuple,
TypeVar,
Union,
)

from prometheus_client import Counter
Expand All @@ -41,7 +42,7 @@
from synapse.metrics import LaterGauge
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.streams.config import PaginationConfig
from synapse.types import Collection, StreamToken, UserID
from synapse.types import Collection, RoomStreamToken, StreamToken, UserID
from synapse.util.async_helpers import ObservableDeferred, timeout_deferred
from synapse.util.metrics import Measure
from synapse.visibility import filter_events_for_client
Expand Down Expand Up @@ -111,7 +112,9 @@ def __init__(
with PreserveLoggingContext():
self.notify_deferred = ObservableDeferred(defer.Deferred())

def notify(self, stream_key: str, stream_id: int, time_now_ms: int):
def notify(
self, stream_key: str, stream_id: Union[int, RoomStreamToken], time_now_ms: int,
):
"""Notify any listeners for this user of a new event from an
event source.
Args:
Expand Down Expand Up @@ -294,7 +297,12 @@ def _notify_pending_new_room_events(self, max_room_stream_id: int):
rooms.add(event.room_id)

if users or rooms:
self.on_new_event("room_key", max_room_stream_id, users=users, rooms=rooms)
self.on_new_event(
"room_key",
RoomStreamToken(None, max_room_stream_id),
users=users,
rooms=rooms,
)
self._on_updated_room_token(max_room_stream_id)

def _on_updated_room_token(self, max_room_stream_id: int):
Expand Down Expand Up @@ -329,7 +337,7 @@ async def _notify_pusher_pool(self, max_room_stream_id: int):
def on_new_event(
self,
stream_key: str,
new_token: int,
new_token: Union[int, RoomStreamToken],
users: Collection[UserID] = [],
rooms: Collection[str] = [],
):
Expand Down
5 changes: 4 additions & 1 deletion synapse/storage/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ def __init__(self, hs, stores: Databases):
# interfaces.
self.main = stores.main

self.persistence = EventsPersistenceStorage(hs, stores)
self.purge_events = PurgeEventsStorage(hs, stores)
self.state = StateGroupStorage(hs, stores)

self.persistence = None
if stores.persist_events:
self.persistence = EventsPersistenceStorage(hs, stores)
21 changes: 15 additions & 6 deletions synapse/storage/databases/main/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ async def _get_events_which_are_prevs(self, event_ids: Iterable[str]) -> List[st
Returns:
Filtered event ids
"""
results = []
results = [] # type: List[str]

def _get_events_which_are_prevs_txn(txn, batch):
sql = """
Expand Down Expand Up @@ -631,7 +631,9 @@ def _update_forward_extremities_txn(
)

@classmethod
def _filter_events_and_contexts_for_duplicates(cls, events_and_contexts):
def _filter_events_and_contexts_for_duplicates(
cls, events_and_contexts: List[Tuple[EventBase, EventContext]]
) -> List[Tuple[EventBase, EventContext]]:
"""Ensure that we don't have the same event twice.
Pick the earliest non-outlier if there is one, else the earliest one.
Expand All @@ -641,7 +643,9 @@ def _filter_events_and_contexts_for_duplicates(cls, events_and_contexts):
Returns:
list[(EventBase, EventContext)]: filtered list
"""
new_events_and_contexts = OrderedDict()
new_events_and_contexts = (
OrderedDict()
) # type: OrderedDict[str, Tuple[EventBase, EventContext]]
for event, context in events_and_contexts:
prev_event_context = new_events_and_contexts.get(event.event_id)
if prev_event_context:
Expand All @@ -655,7 +659,12 @@ def _filter_events_and_contexts_for_duplicates(cls, events_and_contexts):
new_events_and_contexts[event.event_id] = (event, context)
return list(new_events_and_contexts.values())

def _update_room_depths_txn(self, txn, events_and_contexts, backfilled):
def _update_room_depths_txn(
self,
txn,
events_and_contexts: List[Tuple[EventBase, EventContext]],
backfilled: bool,
):
"""Update min_depth for each room
Args:
Expand All @@ -664,7 +673,7 @@ def _update_room_depths_txn(self, txn, events_and_contexts, backfilled):
we are persisting
backfilled (bool): True if the events were backfilled
"""
depth_updates = {}
depth_updates = {} # type: Dict[str, int]
for event, context in events_and_contexts:
# Remove the any existing cache entries for the event_ids
txn.call_after(self.store._invalidate_get_event_cache, event.event_id)
Expand Down Expand Up @@ -1436,7 +1445,7 @@ def _update_backward_extremeties(self, txn, events):
Forward extremities are handled when we first start persisting the events.
"""
events_by_room = {}
events_by_room = {} # type: Dict[str, List[EventBase]]
for ev in events:
events_by_room.setdefault(ev.room_id, []).append(ev)

Expand Down
Loading

0 comments on commit fe8ed1b

Please sign in to comment.