From 688e958ed731d21e37c30e67125613e26d6e05f5 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 14 May 2024 11:52:06 +0100 Subject: [PATCH 1/2] Reduce pauses on large device list changes For large accounts waking up all the relevant notifier streams can cause pauses of the reactor. --- synapse/replication/tcp/client.py | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py index ba257d34e61..5e5387fdcb7 100644 --- a/synapse/replication/tcp/client.py +++ b/synapse/replication/tcp/client.py @@ -55,6 +55,7 @@ ) from synapse.types import PersistedEventPosition, ReadReceipt, StreamKeyType, UserID from synapse.util.async_helpers import Linearizer, timeout_deferred +from synapse.util.iterutils import batch_iter from synapse.util.metrics import Measure if TYPE_CHECKING: @@ -150,9 +151,15 @@ async def on_rdata( if row.entity.startswith("@") and not row.is_signature: room_ids = await self.store.get_rooms_for_user(row.entity) all_room_ids.update(room_ids) - self.notifier.on_new_event( - StreamKeyType.DEVICE_LIST, token, rooms=all_room_ids - ) + + # `all_room_ids` can be large, so let's wake up those streams in batches + for batched_room_ids in batch_iter(all_room_ids, 100): + self.notifier.on_new_event( + StreamKeyType.DEVICE_LIST, token, rooms=batched_room_ids + ) + + # Yield to reactor so that we don't block. + await self._clock.sleep(0) elif stream_name == PushersStream.NAME: for row in rows: if row.deleted: From 023e7fdefdeecced1a48540e757344d3515dcd7a Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 14 May 2024 11:54:43 +0100 Subject: [PATCH 2/2] Newsfile --- changelog.d/17192.misc | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/17192.misc diff --git a/changelog.d/17192.misc b/changelog.d/17192.misc new file mode 100644 index 00000000000..25e157a50a8 --- /dev/null +++ b/changelog.d/17192.misc @@ -0,0 +1 @@ +Improve performance by fixing a reactor pause.