From 27c00f29f9a9befe72a0dae6cc3e6a74217abd9c Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Fri, 11 Mar 2022 11:09:15 +0000 Subject: [PATCH 1/4] Clarify and log when the stream goes backwards --- synapse/handlers/typing.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py index 3b8912652856..6854428b7ca5 100644 --- a/synapse/handlers/typing.py +++ b/synapse/handlers/typing.py @@ -160,8 +160,9 @@ def process_replication_rows( """Should be called whenever we receive updates for typing stream.""" if self._latest_room_serial > token: - # The master has gone backwards. To prevent inconsistent data, just - # clear everything. + # The typing worker has gone backwards (e.g. it may have restarted). + # To prevent inconsistent data, just clear everything. + logger.info("Typing handler stream went backwards; resetting") self._reset() # Set the latest serial token to whatever the server gave us. From c90d4b53891d0ab47c100dda838bacb56ce35cb0 Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Fri, 11 Mar 2022 11:09:27 +0000 Subject: [PATCH 2/4] Document what TypingStreamRow means --- synapse/replication/tcp/streams/_base.py | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/synapse/replication/tcp/streams/_base.py b/synapse/replication/tcp/streams/_base.py index 23d631a76944..495f2f0285ba 100644 --- a/synapse/replication/tcp/streams/_base.py +++ b/synapse/replication/tcp/streams/_base.py @@ -316,7 +316,19 @@ def __init__(self, hs: "HomeServer"): class TypingStream(Stream): @attr.s(slots=True, frozen=True, auto_attribs=True) class TypingStreamRow: + """ + An entry in the typing stream. + Describes all the users that are 'typing' right now in one room. + + When a user stops typing, it will be streamed as a new update with that + user absent; you can think of the `user_ids` list as overwriting the + entire list that was there previously. + """ + + # The room that this update is for. room_id: str + + # All the users that are 'typing' right now in the specified room. user_ids: List[str] NAME = "typing" From c00b3fa9b67afe60c8b974a3ef4c5181ab96a3df Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Fri, 11 Mar 2022 11:10:15 +0000 Subject: [PATCH 3/4] TCP clients have been replaced by Redis subscribers I know this isn't generic, but generic words are probably less useful than concrete ones for newbies such as myself anyway... maybe revisit if we have alternatives to Redis again? --- synapse/replication/tcp/handler.py | 2 +- synapse/replication/tcp/resource.py | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py index 0d2013a3cfc5..adc5b2a99d86 100644 --- a/synapse/replication/tcp/handler.py +++ b/synapse/replication/tcp/handler.py @@ -711,7 +711,7 @@ def send_remote_server_up(self, server: str) -> None: self.send_command(RemoteServerUpCommand(server)) def stream_update(self, stream_name: str, token: Optional[int], data: Any) -> None: - """Called when a new update is available to stream to clients. + """Called when a new update is available to stream to Redis subscribers. We need to check if the client is interested in the stream or not """ diff --git a/synapse/replication/tcp/resource.py b/synapse/replication/tcp/resource.py index 494e42a2be8f..a18e90a4b33a 100644 --- a/synapse/replication/tcp/resource.py +++ b/synapse/replication/tcp/resource.py @@ -67,8 +67,8 @@ def buildProtocol(self, addr: IAddress) -> ServerReplicationStreamProtocol: class ReplicationStreamer: """Handles replication connections. - This needs to be poked when new replication data may be available. When new - data is available it will propagate to all connected clients. + This needs to be poked when new replication data may be available. + When new data is available it will propagate to all Redis subscribers. """ def __init__(self, hs: "HomeServer"): @@ -109,7 +109,7 @@ def __init__(self, hs: "HomeServer"): def on_notifier_poke(self) -> None: """Checks if there is actually any new data and sends it to the - connections if there are. + Redis subscribers if there are. This should get called each time new data is available, even if it is currently being executed, so that nothing gets missed From cf59faf71f2bb5197145e7e652dfec0bf5d0c170 Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Fri, 11 Mar 2022 11:12:40 +0000 Subject: [PATCH 4/4] Newsfile Signed-off-by: Olivier Wilkinson (reivilibre) --- changelog.d/12211.misc | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/12211.misc diff --git a/changelog.d/12211.misc b/changelog.d/12211.misc new file mode 100644 index 000000000000..d11634a1ee0f --- /dev/null +++ b/changelog.d/12211.misc @@ -0,0 +1 @@ +Improve code documentation for the typing stream over replication. \ No newline at end of file