Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Fix recording of federation stream token
Browse files Browse the repository at this point in the history
A couple of changes of significance:

 * remove the `_last_ack < federation_position` condition, so that
   updates will still be correctly processed after restart

 * Correctly wire up send_federation_ack to the right class.
  • Loading branch information
richvdh committed May 23, 2020
1 parent f426969 commit 5c12410
Show file tree
Hide file tree
Showing 3 changed files with 116 additions and 24 deletions.
1 change: 1 addition & 0 deletions changelog.d/7565.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix exception `'GenericWorkerReplicationHandler' object has no attribute 'send_federation_ack'`, introduced in v1.13.0.
68 changes: 44 additions & 24 deletions synapse/app/generic_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
import contextlib
import logging
import sys
from typing import Dict, Iterable
from typing import Dict, Iterable, Optional, Set

from typing_extensions import ContextManager

Expand Down Expand Up @@ -677,10 +677,9 @@ def __init__(self, hs):
self.notify_pushers = hs.config.start_pushers
self.pusher_pool = hs.get_pusherpool()

self.send_handler = None # type: Optional[FederationSenderHandler]
if hs.config.send_federation:
self.send_handler = FederationSenderHandler(hs, self)
else:
self.send_handler = None
self.send_handler = FederationSenderHandler(hs)

async def on_rdata(self, stream_name, instance_name, token, rows):
await super().on_rdata(stream_name, instance_name, token, rows)
Expand Down Expand Up @@ -718,7 +717,7 @@ async def _process_and_notify(self, stream_name, instance_name, token, rows):
if entities:
self.notifier.on_new_event("to_device_key", token, users=entities)
elif stream_name == DeviceListsStream.NAME:
all_room_ids = set()
all_room_ids = set() # type: Set[str]
for row in rows:
if row.entity.startswith("@"):
room_ids = await self.store.get_rooms_for_user(row.entity)
Expand Down Expand Up @@ -769,24 +768,33 @@ def on_remote_server_up(self, server: str):


class FederationSenderHandler(object):
"""Processes the replication stream and forwards the appropriate entries
to the federation sender.
"""Processes the fedration replication stream
This class is only instantiate on the worker responsible for sending outbound
federation transactions. It receives rows from the replication stream and forwards
the appropriate entries to the FederationSender class.
"""

def __init__(self, hs: GenericWorkerServer, replication_client):
def __init__(self, hs: GenericWorkerServer):
self.store = hs.get_datastore()
self._is_mine_id = hs.is_mine_id
self.federation_sender = hs.get_federation_sender()
self.replication_client = replication_client

self._hs = hs

# if the worker is restarted, we want to pick up where we left off in
# the replication stream, so load the position from the database.
#
# XXX is this actually worthwhile? Whenever the master is restarted, we'll
# drop some rows anyway (which is mostly fine because we're only dropping
# typing and presence notifications). If the replication stream is
# unreliable, why do we do all this hoop-jumping to store the position in the
# database? See also https://github.com/matrix-org/synapse/issues/7535.
#
self.federation_position = self.store.federation_out_pos_startup
self._fed_position_linearizer = Linearizer(name="_fed_position_linearizer")

self._fed_position_linearizer = Linearizer(name="_fed_position_linearizer")
self._last_ack = self.federation_position

self._room_serials = {}
self._room_typing = {}

def on_start(self):
# There may be some events that are persisted but haven't been sent,
# so send them now.
Expand Down Expand Up @@ -849,22 +857,34 @@ async def _on_new_receipts(self, rows):
await self.federation_sender.send_read_receipt(receipt_info)

async def update_token(self, token):
"""Update the record of where we have processed to in the federation stream.
Called after we have processed a an update received over replication. Sends
a FEDERATION_ACK back to the master, and stores the token that we have processed
in `federation_stream_position` so that we can restart where we left off.
"""
try:
self.federation_position = token

# We linearize here to ensure we don't have races updating the token
#
# XXX this appears to be redundant, since the ReplicationCommandHandler
# has a linearizer which ensures that we only process one line of
# replication data at a time. Should we remove it, or is it doing useful
# service for robustness? Or could we replace it with an assertion that
# we're not being re-entered?

with (await self._fed_position_linearizer.queue(None)):
if self._last_ack < self.federation_position:
await self.store.update_federation_out_pos(
"federation", self.federation_position
)
await self.store.update_federation_out_pos(
"federation", self.federation_position
)

# We ACK this token over replication so that the master can drop
# its in memory queues
self.replication_client.send_federation_ack(
self.federation_position
)
self._last_ack = self.federation_position
# We ACK this token over replication so that the master can drop
# its in memory queues
self._hs.get_tcp_replication().send_federation_ack(
self.federation_position
)
self._last_ack = self.federation_position
except Exception:
logger.exception("Error updating federation stream position")

Expand Down
71 changes: 71 additions & 0 deletions tests/replication/test_federation_ack.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
# -*- coding: utf-8 -*-
# Copyright 2020 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import mock

from synapse.app.generic_worker import GenericWorkerServer
from synapse.replication.tcp.commands import FederationAckCommand
from synapse.replication.tcp.protocol import AbstractConnection
from synapse.replication.tcp.streams.federation import FederationStream

from tests.unittest import HomeserverTestCase


class FederationAckTestCase(HomeserverTestCase):
def default_config(self) -> dict:
config = super().default_config()
config["worker_app"] = "synapse.app.federation_sender"
config["send_federation"] = True
return config

def make_homeserver(self, reactor, clock):
hs = self.setup_test_homeserver(homeserverToUse=GenericWorkerServer)
return hs

def test_federation_ack_sent(self):
"""A FEDERATION_ACK should be sent back after each RDATA federation
This test checks that the federation sender is correctly sending back
FEDERATION_ACK messages. The test works by spinning up a federation_sender
worker server, and then fishing out its ReplicationCommandHandler. We wire
the RCH up to a mock connection (so that we can observe the command being sent)
and then poke in an RDATA row.
XXX: it might be nice to do this by pretending to be a synapse master worker
(or a redis server), and having the worker connect to us via a mocked-up TCP
transport, rather than assuming that the implementation has a
ReplicationCommandHandler.
"""
rch = self.hs.get_tcp_replication()

# wire up the ReplicationCommandHandler to a mock connection
mock_connection = mock.Mock(spec=AbstractConnection)
rch.new_connection(mock_connection)

# tell it it received an RDATA row
self.get_success(
rch.on_rdata(
"federation",
"master",
token=10,
rows=[FederationStream.FederationStreamRow(type="x", data=[1, 2, 3])],
)
)

# now check that the FEDERATION_ACK was sent
mock_connection.send_command.assert_called_once()
cmd = mock_connection.send_command.call_args[0][0]
assert isinstance(cmd, FederationAckCommand)
self.assertEqual(cmd.token, 10)

0 comments on commit 5c12410

Please sign in to comment.