Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Add ability to wait for replication streams (#7542)
Browse files Browse the repository at this point in the history
The idea here is that if an instance persists an event via the replication HTTP API it can return before we receive that event over replication, which can lead to races where code assumes that persisting an event immediately updates various caches (e.g. current state of the room).

Most of Synapse doesn't hit such races, so we don't do the waiting automagically, instead we do so where necessary to avoid unnecessary delays. We may decide to change our minds here if it turns out there are a lot of subtle races going on.

People probably want to look at this commit by commit.
  • Loading branch information
erikjohnston authored May 22, 2020
1 parent 06a02bc commit 1531b21
Show file tree
Hide file tree
Showing 24 changed files with 304 additions and 112 deletions.
1 change: 1 addition & 0 deletions changelog.d/7542.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add ability to wait for replication streams.
33 changes: 23 additions & 10 deletions synapse/handlers/federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ def __init__(self, hs):
self._server_notices_mxid = hs.config.server_notices_mxid
self.config = hs.config
self.http_client = hs.get_simple_http_client()
self._replication = hs.get_replication_data_handler()

self._send_events_to_master = ReplicationFederationSendEventsRestServlet.make_client(
hs
Expand Down Expand Up @@ -1221,7 +1222,7 @@ async def on_event_auth(self, event_id: str) -> List[EventBase]:

async def do_invite_join(
self, target_hosts: Iterable[str], room_id: str, joinee: str, content: JsonDict
) -> None:
) -> Tuple[str, int]:
""" Attempts to join the `joinee` to the room `room_id` via the
servers contained in `target_hosts`.
Expand Down Expand Up @@ -1304,15 +1305,23 @@ async def do_invite_join(
room_id=room_id, room_version=room_version_obj,
)

await self._persist_auth_tree(
max_stream_id = await self._persist_auth_tree(
origin, auth_chain, state, event, room_version_obj
)

# We wait here until this instance has seen the events come down
# replication (if we're using replication) as the below uses caches.
#
# TODO: Currently the events stream is written to from master
await self._replication.wait_for_stream_position(
"master", "events", max_stream_id
)

# Check whether this room is the result of an upgrade of a room we already know
# about. If so, migrate over user information
predecessor = await self.store.get_room_predecessor(room_id)
if not predecessor or not isinstance(predecessor.get("room_id"), str):
return
return event.event_id, max_stream_id
old_room_id = predecessor["room_id"]
logger.debug(
"Found predecessor for %s during remote join: %s", room_id, old_room_id
Expand All @@ -1325,6 +1334,7 @@ async def do_invite_join(
)

logger.debug("Finished joining %s to %s", joinee, room_id)
return event.event_id, max_stream_id
finally:
room_queue = self.room_queues[room_id]
del self.room_queues[room_id]
Expand Down Expand Up @@ -1554,7 +1564,7 @@ async def on_invite_request(

async def do_remotely_reject_invite(
self, target_hosts: Iterable[str], room_id: str, user_id: str, content: JsonDict
) -> EventBase:
) -> Tuple[EventBase, int]:
origin, event, room_version = await self._make_and_verify_event(
target_hosts, room_id, user_id, "leave", content=content
)
Expand All @@ -1574,9 +1584,9 @@ async def do_remotely_reject_invite(
await self.federation_client.send_leave(target_hosts, event)

context = await self.state_handler.compute_event_context(event)
await self.persist_events_and_notify([(event, context)])
stream_id = await self.persist_events_and_notify([(event, context)])

return event
return event, stream_id

async def _make_and_verify_event(
self,
Expand Down Expand Up @@ -1888,7 +1898,7 @@ async def _persist_auth_tree(
state: List[EventBase],
event: EventBase,
room_version: RoomVersion,
) -> None:
) -> int:
"""Checks the auth chain is valid (and passes auth checks) for the
state and event. Then persists the auth chain and state atomically.
Persists the event separately. Notifies about the persisted events
Expand Down Expand Up @@ -1982,7 +1992,7 @@ async def _persist_auth_tree(
event, old_state=state
)

await self.persist_events_and_notify([(event, new_event_context)])
return await self.persist_events_and_notify([(event, new_event_context)])

async def _prep_event(
self,
Expand Down Expand Up @@ -2835,7 +2845,7 @@ async def persist_events_and_notify(
self,
event_and_contexts: Sequence[Tuple[EventBase, EventContext]],
backfilled: bool = False,
) -> None:
) -> int:
"""Persists events and tells the notifier/pushers about them, if
necessary.
Expand All @@ -2845,11 +2855,12 @@ async def persist_events_and_notify(
backfilling or not
"""
if self.config.worker_app:
await self._send_events_to_master(
result = await self._send_events_to_master(
store=self.store,
event_and_contexts=event_and_contexts,
backfilled=backfilled,
)
return result["max_stream_id"]
else:
max_stream_id = await self.storage.persistence.persist_events(
event_and_contexts, backfilled=backfilled
Expand All @@ -2864,6 +2875,8 @@ async def persist_events_and_notify(
for event, _ in event_and_contexts:
await self._notify_persisted_event(event, max_stream_id)

return max_stream_id

async def _notify_persisted_event(
self, event: EventBase, max_stream_id: int
) -> None:
Expand Down
36 changes: 25 additions & 11 deletions synapse/handlers/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from typing import Optional
from typing import Optional, Tuple

from six import iteritems, itervalues, string_types

Expand All @@ -42,6 +42,7 @@
)
from synapse.api.room_versions import KNOWN_ROOM_VERSIONS, RoomVersions
from synapse.api.urls import ConsentURIBuilder
from synapse.events import EventBase
from synapse.events.validator import EventValidator
from synapse.logging.context import run_in_background
from synapse.metrics.background_process_metrics import run_as_background_process
Expand Down Expand Up @@ -630,7 +631,9 @@ def assert_accepted_privacy_policy(self, requester):
msg = self._block_events_without_consent_error % {"consent_uri": consent_uri}
raise ConsentNotGivenError(msg=msg, consent_uri=consent_uri)

async def send_nonmember_event(self, requester, event, context, ratelimit=True):
async def send_nonmember_event(
self, requester, event, context, ratelimit=True
) -> int:
"""
Persists and notifies local clients and federation of an event.
Expand All @@ -639,6 +642,9 @@ async def send_nonmember_event(self, requester, event, context, ratelimit=True):
context (Context) the context of the event.
ratelimit (bool): Whether to rate limit this send.
is_guest (bool): Whether the sender is a guest.
Return:
The stream_id of the persisted event.
"""
if event.type == EventTypes.Member:
raise SynapseError(
Expand All @@ -659,7 +665,7 @@ async def send_nonmember_event(self, requester, event, context, ratelimit=True):
)
return prev_state

await self.handle_new_client_event(
return await self.handle_new_client_event(
requester=requester, event=event, context=context, ratelimit=ratelimit
)

Expand Down Expand Up @@ -688,7 +694,7 @@ def deduplicate_state_event(self, event, context):

async def create_and_send_nonmember_event(
self, requester, event_dict, ratelimit=True, txn_id=None
):
) -> Tuple[EventBase, int]:
"""
Creates an event, then sends it.
Expand All @@ -711,10 +717,10 @@ async def create_and_send_nonmember_event(
spam_error = "Spam is not permitted here"
raise SynapseError(403, spam_error, Codes.FORBIDDEN)

await self.send_nonmember_event(
stream_id = await self.send_nonmember_event(
requester, event, context, ratelimit=ratelimit
)
return event
return event, stream_id

@measure_func("create_new_client_event")
@defer.inlineCallbacks
Expand Down Expand Up @@ -774,7 +780,7 @@ def create_new_client_event(
@measure_func("handle_new_client_event")
async def handle_new_client_event(
self, requester, event, context, ratelimit=True, extra_users=[]
):
) -> int:
"""Processes a new event. This includes checking auth, persisting it,
notifying users, sending to remote servers, etc.
Expand All @@ -787,6 +793,9 @@ async def handle_new_client_event(
context (EventContext)
ratelimit (bool)
extra_users (list(UserID)): Any extra users to notify about event
Return:
The stream_id of the persisted event.
"""

if event.is_state() and (event.type, event.state_key) == (
Expand Down Expand Up @@ -827,7 +836,7 @@ async def handle_new_client_event(
try:
# If we're a worker we need to hit out to the master.
if self.config.worker_app:
await self.send_event_to_master(
result = await self.send_event_to_master(
event_id=event.event_id,
store=self.store,
requester=requester,
Expand All @@ -836,14 +845,17 @@ async def handle_new_client_event(
ratelimit=ratelimit,
extra_users=extra_users,
)
stream_id = result["stream_id"]
event.internal_metadata.stream_ordering = stream_id
success = True
return
return stream_id

await self.persist_and_notify_client_event(
stream_id = await self.persist_and_notify_client_event(
requester, event, context, ratelimit=ratelimit, extra_users=extra_users
)

success = True
return stream_id
finally:
if not success:
# Ensure that we actually remove the entries in the push actions
Expand Down Expand Up @@ -886,7 +898,7 @@ def _validate_canonical_alias(

async def persist_and_notify_client_event(
self, requester, event, context, ratelimit=True, extra_users=[]
):
) -> int:
"""Called when we have fully built the event, have already
calculated the push actions for the event, and checked auth.
Expand Down Expand Up @@ -1076,6 +1088,8 @@ def _notify():
# matters as sometimes presence code can take a while.
run_in_background(self._bump_active_time, requester.user)

return event_stream_id

async def _bump_active_time(self, user):
try:
presence = self.hs.get_presence_handler()
Expand Down
Loading

0 comments on commit 1531b21

Please sign in to comment.