Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Convert streams to async. (#8014)
Browse files Browse the repository at this point in the history
  • Loading branch information
clokep authored Aug 4, 2020
1 parent 916cf2d commit e19de43
Show file tree
Hide file tree
Showing 10 changed files with 27 additions and 30 deletions.
1 change: 1 addition & 0 deletions changelog.d/8014.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Convert various parts of the codebase to async/await.
4 changes: 2 additions & 2 deletions synapse/handlers/initial_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ async def _snapshot_all_rooms(

rooms_ret = []

now_token = await self.hs.get_event_sources().get_current_token()
now_token = self.hs.get_event_sources().get_current_token()

presence_stream = self.hs.get_event_sources().sources["presence"]
pagination_config = PaginationConfig(from_token=now_token)
Expand Down Expand Up @@ -360,7 +360,7 @@ async def _room_initial_sync_joined(
current_state.values(), time_now
)

now_token = await self.hs.get_event_sources().get_current_token()
now_token = self.hs.get_event_sources().get_current_token()

limit = pagin_config.limit if pagin_config else None
if limit is None:
Expand Down
2 changes: 1 addition & 1 deletion synapse/handlers/pagination.py
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ async def get_messages(
room_token = pagin_config.from_token.room_key
else:
pagin_config.from_token = (
await self.hs.get_event_sources().get_current_token_for_pagination()
self.hs.get_event_sources().get_current_token_for_pagination()
)
room_token = pagin_config.from_token.room_key

Expand Down
10 changes: 5 additions & 5 deletions synapse/handlers/room.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import math
import string
from collections import OrderedDict
from typing import Optional, Tuple
from typing import Awaitable, Optional, Tuple

from synapse.api.constants import (
EventTypes,
Expand Down Expand Up @@ -1041,7 +1041,7 @@ async def get_new_events(
):
# We just ignore the key for now.

to_key = await self.get_current_key()
to_key = self.get_current_key()

from_token = RoomStreamToken.parse(from_key)
if from_token.topological:
Expand Down Expand Up @@ -1081,10 +1081,10 @@ async def get_new_events(

return (events, end_key)

def get_current_key(self):
return self.store.get_room_events_max_id()
def get_current_key(self) -> str:
return "s%d" % (self.store.get_room_max_stream_ordering(),)

def get_current_key_for_room(self, room_id):
def get_current_key_for_room(self, room_id: str) -> Awaitable[str]:
return self.store.get_room_events_max_id(room_id)


Expand Down
2 changes: 1 addition & 1 deletion synapse/handlers/search.py
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,7 @@ async def search(self, user, content, batch=None):
# If client has asked for "context" for each event (i.e. some surrounding
# events and state), fetch that
if event_context is not None:
now_token = await self.hs.get_event_sources().get_current_token()
now_token = self.hs.get_event_sources().get_current_token()

contexts = {}
for event in allowed_events:
Expand Down
2 changes: 1 addition & 1 deletion synapse/handlers/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -961,7 +961,7 @@ async def generate_sync_result(
# this is due to some of the underlying streams not supporting the ability
# to query up to a given point.
# Always use the `now_token` in `SyncResultBuilder`
now_token = await self.event_sources.get_current_token()
now_token = self.event_sources.get_current_token()

logger.debug(
"Calculating sync response for %r between %s and %s",
Expand Down
4 changes: 2 additions & 2 deletions synapse/notifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ async def wait_for_events(
"""
user_stream = self.user_to_user_stream.get(user_id)
if user_stream is None:
current_token = await self.event_sources.get_current_token()
current_token = self.event_sources.get_current_token()
if room_ids is None:
room_ids = await self.store.get_rooms_for_user(user_id)
user_stream = _NotifierUserStream(
Expand Down Expand Up @@ -397,7 +397,7 @@ async def get_events_for(
"""
from_token = pagination_config.from_token
if not from_token:
from_token = await self.event_sources.get_current_token()
from_token = self.event_sources.get_current_token()

limit = pagination_config.limit

Expand Down
8 changes: 4 additions & 4 deletions synapse/storage/data_stores/main/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import abc
import logging
from collections import namedtuple
from typing import Optional

from twisted.internet import defer

Expand Down Expand Up @@ -557,19 +558,18 @@ def _f(txn):

return self.db.runInteraction("get_room_event_before_stream_ordering", _f)

@defer.inlineCallbacks
def get_room_events_max_id(self, room_id=None):
async def get_room_events_max_id(self, room_id: Optional[str] = None) -> str:
"""Returns the current token for rooms stream.
By default, it returns the current global stream token. Specifying a
`room_id` causes it to return the current room specific topological
token.
"""
token = yield self.get_room_max_stream_ordering()
token = self.get_room_max_stream_ordering()
if room_id is None:
return "s%d" % (token,)
else:
topo = yield self.db.runInteraction(
topo = await self.db.runInteraction(
"_get_max_topological_txn", self._get_max_topological_txn, room_id
)
return "t%d-%d" % (topo, token)
Expand Down
22 changes: 9 additions & 13 deletions synapse/streams/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@

from typing import Any, Dict

from twisted.internet import defer

from synapse.handlers.account_data import AccountDataEventSource
from synapse.handlers.presence import PresenceEventSource
from synapse.handlers.receipts import ReceiptEventSource
Expand All @@ -40,39 +38,37 @@ def __init__(self, hs):
} # type: Dict[str, Any]
self.store = hs.get_datastore()

@defer.inlineCallbacks
def get_current_token(self):
def get_current_token(self) -> StreamToken:
push_rules_key, _ = self.store.get_push_rules_stream_token()
to_device_key = self.store.get_to_device_stream_token()
device_list_key = self.store.get_device_stream_token()
groups_key = self.store.get_group_stream_token()

token = StreamToken(
room_key=(yield self.sources["room"].get_current_key()),
presence_key=(yield self.sources["presence"].get_current_key()),
typing_key=(yield self.sources["typing"].get_current_key()),
receipt_key=(yield self.sources["receipt"].get_current_key()),
account_data_key=(yield self.sources["account_data"].get_current_key()),
room_key=self.sources["room"].get_current_key(),
presence_key=self.sources["presence"].get_current_key(),
typing_key=self.sources["typing"].get_current_key(),
receipt_key=self.sources["receipt"].get_current_key(),
account_data_key=self.sources["account_data"].get_current_key(),
push_rules_key=push_rules_key,
to_device_key=to_device_key,
device_list_key=device_list_key,
groups_key=groups_key,
)
return token

@defer.inlineCallbacks
def get_current_token_for_pagination(self):
def get_current_token_for_pagination(self) -> StreamToken:
"""Get the current token for a given room to be used to paginate
events.
The returned token does not have the current values for fields other
than `room`, since they are not used during pagination.
Returns:
Deferred[StreamToken]
The current token for pagination.
"""
token = StreamToken(
room_key=(yield self.sources["room"].get_current_key()),
room_key=self.sources["room"].get_current_key(),
presence_key=0,
typing_key=0,
receipt_key=0,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ def test_server_notice_only_sent_once(self):
self.server_notices_manager.get_or_create_notice_room_for_user(self.user_id)
)

token = self.get_success(self.event_source.get_current_token())
token = self.event_source.get_current_token()
events, _ = self.get_success(
self.store.get_recent_events_for_room(
room_id, limit=100, end_token=token.room_key
Expand Down

0 comments on commit e19de43

Please sign in to comment.