From 1a77fe301c8dc597ea821805ee29410a604e35f1 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 2 Jun 2021 16:44:09 +0100 Subject: [PATCH 01/15] Hoist up allow_rejected --- .../storage/databases/main/events_worker.py | 29 +++++++++---------- synapse/storage/databases/main/roommember.py | 6 ++-- 2 files changed, 16 insertions(+), 19 deletions(-) diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py index 3c86adab5650..35084bd1639c 100644 --- a/synapse/storage/databases/main/events_worker.py +++ b/synapse/storage/databases/main/events_worker.py @@ -495,7 +495,7 @@ async def _get_events_from_cache_or_db(self, event_ids, allow_rejected=False): map from event id to result """ event_entry_map = self._get_events_from_cache( - event_ids, allow_rejected=allow_rejected + event_ids, ) missing_events_ids = [e for e in event_ids if e not in event_entry_map] @@ -510,22 +510,30 @@ async def _get_events_from_cache_or_db(self, event_ids, allow_rejected=False): # of the database to check it. # missing_events = await self._get_events_from_db( - missing_events_ids, allow_rejected=allow_rejected + missing_events_ids, ) event_entry_map.update(missing_events) + if not allow_rejected: + event_entry_map = { + event_id: entry + for event_id, entry in event_entry_map.items() + if not entry.event.rejected_reason + } + return event_entry_map def _invalidate_get_event_cache(self, event_id): self._get_event_cache.invalidate((event_id,)) - def _get_events_from_cache(self, events, allow_rejected, update_metrics=True): + def _get_events_from_cache( + self, events, update_metrics=True + ) -> Dict[str, _EventCacheEntry]: """Fetch events from the caches Args: events (Iterable[str]): list of event_ids to fetch - allow_rejected (bool): Whether to return events that were rejected update_metrics (bool): Whether to update the cache hit ratio metrics Returns: @@ -542,10 +550,7 @@ def _get_events_from_cache(self, events, allow_rejected, update_metrics=True): if not ret: continue - if allow_rejected or not ret.event.rejected_reason: - event_map[event_id] = ret - else: - event_map[event_id] = None + event_map[event_id] = ret return event_map @@ -672,7 +677,7 @@ def fire(evs, exc): with PreserveLoggingContext(): self.hs.get_reactor().callFromThread(fire, event_list, e) - async def _get_events_from_db(self, event_ids, allow_rejected=False): + async def _get_events_from_db(self, event_ids): """Fetch a bunch of events from the database. Returned events will be added to the cache for future lookups. @@ -682,9 +687,6 @@ async def _get_events_from_db(self, event_ids, allow_rejected=False): Args: event_ids (Iterable[str]): The event_ids of the events to fetch - allow_rejected (bool): Whether to include rejected events. If False, - rejected events are omitted from the response. - Returns: Dict[str, _EventCacheEntry]: map from event id to result. May return extra events which @@ -717,9 +719,6 @@ async def _get_events_from_db(self, event_ids, allow_rejected=False): rejected_reason = row["rejected_reason"] - if not allow_rejected and rejected_reason: - continue - # If the event or metadata cannot be parsed, log the error and act # as if the event is unknown. try: diff --git a/synapse/storage/databases/main/roommember.py b/synapse/storage/databases/main/roommember.py index 68f1b40ea693..e8157ba3d4eb 100644 --- a/synapse/storage/databases/main/roommember.py +++ b/synapse/storage/databases/main/roommember.py @@ -629,14 +629,12 @@ async def _get_joined_users_from_context( # We don't update the event cache hit ratio as it completely throws off # the hit ratio counts. After all, we don't populate the cache if we # miss it here - event_map = self._get_events_from_cache( - member_event_ids, allow_rejected=False, update_metrics=False - ) + event_map = self._get_events_from_cache(member_event_ids, update_metrics=False) missing_member_event_ids = [] for event_id in member_event_ids: ev_entry = event_map.get(event_id) - if ev_entry: + if ev_entry and not ev_entry.event.rejected_reason: if ev_entry.event.membership == Membership.JOIN: users_in_room[ev_entry.event.state_key] = ProfileInfo( display_name=ev_entry.event.content.get("displayname", None), From 27e4f5cba7bc3f832365295d59fb5078a1670ce7 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 2 Jun 2021 16:50:57 +0100 Subject: [PATCH 02/15] Type hints --- .../storage/databases/main/events_worker.py | 40 +++++++++---------- 1 file changed, 20 insertions(+), 20 deletions(-) diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py index 35084bd1639c..2e2a6be9684f 100644 --- a/synapse/storage/databases/main/events_worker.py +++ b/synapse/storage/databases/main/events_worker.py @@ -14,7 +14,6 @@ import logging import threading -from collections import namedtuple from typing import ( Collection, Container, @@ -27,6 +26,7 @@ overload, ) +import attr from constantly import NamedConstant, Names from typing_extensions import Literal @@ -74,7 +74,10 @@ EVENT_QUEUE_TIMEOUT_S = 0.1 # Timeout when waiting for requests for events -_EventCacheEntry = namedtuple("_EventCacheEntry", ("event", "redacted_event")) +@attr.s(slots=True, auto_attribs=True) +class _EventCacheEntry: + event: EventBase + redacted_event: Optional[EventBase] class EventRedactBehaviour(Names): @@ -476,7 +479,9 @@ async def get_events_as_list( return events - async def _get_events_from_cache_or_db(self, event_ids, allow_rejected=False): + async def _get_events_from_cache_or_db( + self, event_ids: Iterable[str], allow_rejected: bool = False + ) -> Dict[str, _EventCacheEntry]: """Fetch a bunch of events from the cache or the database. If events are pulled from the database, they will be cached for future lookups. @@ -485,14 +490,13 @@ async def _get_events_from_cache_or_db(self, event_ids, allow_rejected=False): Args: - event_ids (Iterable[str]): The event_ids of the events to fetch + event_ids: The event_ids of the events to fetch - allow_rejected (bool): Whether to include rejected events. If False, + allow_rejected: Whether to include rejected events. If False, rejected events are omitted from the response. Returns: - Dict[str, _EventCacheEntry]: - map from event id to result + map from event id to result """ event_entry_map = self._get_events_from_cache( event_ids, @@ -528,18 +532,13 @@ def _invalidate_get_event_cache(self, event_id): self._get_event_cache.invalidate((event_id,)) def _get_events_from_cache( - self, events, update_metrics=True + self, events: Iterable[str], update_metrics: bool = True ) -> Dict[str, _EventCacheEntry]: """Fetch events from the caches Args: - events (Iterable[str]): list of event_ids to fetch - update_metrics (bool): Whether to update the cache hit ratio metrics - - Returns: - dict of event_id -> _EventCacheEntry for each event_id in cache. If - allow_rejected is `False` then there will still be an entry but it - will be `None` + events: list of event_ids to fetch + update_metrics: Whether to update the cache hit ratio metrics """ event_map = {} @@ -677,7 +676,9 @@ def fire(evs, exc): with PreserveLoggingContext(): self.hs.get_reactor().callFromThread(fire, event_list, e) - async def _get_events_from_db(self, event_ids): + async def _get_events_from_db( + self, event_ids: Iterable[str] + ) -> Dict[str, _EventCacheEntry]: """Fetch a bunch of events from the database. Returned events will be added to the cache for future lookups. @@ -685,12 +686,11 @@ async def _get_events_from_db(self, event_ids): Unknown events are omitted from the response. Args: - event_ids (Iterable[str]): The event_ids of the events to fetch + event_ids: The event_ids of the events to fetch Returns: - Dict[str, _EventCacheEntry]: - map from event id to result. May return extra events which - weren't asked for. + map from event id to result. May return extra events which + weren't asked for. """ fetched_events = {} events_to_fetch = event_ids From d472a08997f66d87d3428872c55866bb31599fe4 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 2 Jun 2021 17:10:17 +0100 Subject: [PATCH 03/15] Track in flight get events --- .../storage/databases/main/events_worker.py | 70 +++++++++++++++++-- 1 file changed, 64 insertions(+), 6 deletions(-) diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py index 2e2a6be9684f..22d91c3eb97e 100644 --- a/synapse/storage/databases/main/events_worker.py +++ b/synapse/storage/databases/main/events_worker.py @@ -42,7 +42,11 @@ from synapse.events import EventBase, make_event_from_dict from synapse.events.snapshot import EventContext from synapse.events.utils import prune_event -from synapse.logging.context import PreserveLoggingContext, current_context +from synapse.logging.context import ( + PreserveLoggingContext, + current_context, + make_deferred_yieldable, +) from synapse.metrics.background_process_metrics import ( run_as_background_process, wrap_as_background_process, @@ -56,6 +60,8 @@ from synapse.storage.util.id_generators import MultiWriterIdGenerator, StreamIdGenerator from synapse.storage.util.sequence import build_sequence_generator from synapse.types import JsonDict, get_domain_from_id +from synapse.util import unwrapFirstError +from synapse.util.async_helpers import ObservableDeferred from synapse.util.caches.descriptors import cached, cachedList from synapse.util.caches.lrucache import LruCache from synapse.util.iterutils import batch_iter @@ -164,6 +170,10 @@ def __init__(self, database: DatabasePool, db_conn, hs): max_size=hs.config.caches.event_cache_size, ) + # Map from event ID to a deferred that will result in an + # Dict[str, _EventCacheEntry]. + self._current_event_fetches: Dict[str, ObservableDeferred] = {} + self._event_fetch_lock = threading.Condition() self._event_fetch_list = [] self._event_fetch_ongoing = 0 @@ -502,7 +512,36 @@ async def _get_events_from_cache_or_db( event_ids, ) - missing_events_ids = [e for e in event_ids if e not in event_entry_map] + missing_events_ids = {e for e in event_ids if e not in event_entry_map} + + # We now look up if we're already fetching some of the events in the DB, + # if so we wait for those lookups to finish instead of pulling the same + # events out of the DB multiple times. + already_fetching: Dict[str, defer.Deferred] = {} + + # We also add entries to `self._current_event_fetches` for each event + # we're going to pull from the DB. We use a single deferred that + # resolves to all the events we pulled from the DB (this will result in + # this function returning more events than requested, but that can + # happen already due to `_get_events_from_db`). + fetching_deferred = ObservableDeferred(defer.Deferred()) + + for event_id in missing_events_ids: + deferred = self._current_event_fetches.get(event_id) + if deferred is not None: + # We're already pulling the event out of the DB, ad the deferred + # to the collection of deferreds to wait on. + already_fetching[event_id] = deferred.observe() + else: + # We're not already pulling the event from the DB, so add our + # deferred to the the map of events that are being fetched. + self._current_event_fetches[event_id] = fetching_deferred + fetching_deferred.observe().addBoth( + lambda _, event_id: self._current_event_fetches.pop(event_id, None), + event_id, + ) + + missing_events_ids.difference_update(already_fetching) if missing_events_ids: log_ctx = current_context() @@ -513,11 +552,30 @@ async def _get_events_from_cache_or_db( # the events have been redacted, and if so pulling the redaction event out # of the database to check it. # - missing_events = await self._get_events_from_db( - missing_events_ids, - ) + try: + missing_events = await self._get_events_from_db( + missing_events_ids, + ) + + event_entry_map.update(missing_events) + except Exception as e: + fetching_deferred.errback(e) + raise e + + fetching_deferred.callback(missing_events) + + if already_fetching: + # Wait for the other event requests to finish and add their results + # to ours. + results = await make_deferred_yieldable( + defer.gatherResults( + already_fetching.values(), + consumeErrors=True, + ) + ).addErrback(unwrapFirstError) - event_entry_map.update(missing_events) + for result in results: + event_entry_map.update(result) if not allow_rejected: event_entry_map = { From 5afe3d23d61bed90d854253fc2033b60a4b33d5b Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 2 Jun 2021 17:37:33 +0100 Subject: [PATCH 04/15] Track in memory events using weakrefs --- synapse/events/__init__.py | 6 ++- .../storage/databases/main/events_worker.py | 41 +++++++++++++++++-- 2 files changed, 43 insertions(+), 4 deletions(-) diff --git a/synapse/events/__init__.py b/synapse/events/__init__.py index 0298af4c02d7..c6db7a10b5a6 100644 --- a/synapse/events/__init__.py +++ b/synapse/events/__init__.py @@ -97,7 +97,7 @@ def __get__(self, instance, owner=None): class _EventInternalMetadata: - __slots__ = ["_dict", "stream_ordering", "outlier"] + __slots__ = ["_dict", "stream_ordering", "outlier", "redacted_by"] def __init__(self, internal_metadata_dict: JsonDict): # we have to copy the dict, because it turns out that the same dict is @@ -111,6 +111,10 @@ def __init__(self, internal_metadata_dict: JsonDict): # in the DAG) self.outlier = False + # Whether this event has a valid redaction event pointing at it (i.e. + # whether it should be redacted before giving to clients). + self.redacted_by: Optional[str] = None + out_of_band_membership: bool = DictProperty("out_of_band_membership") send_on_behalf_of: str = DictProperty("send_on_behalf_of") recheck_redaction: bool = DictProperty("recheck_redaction") diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py index 22d91c3eb97e..f910a132e0bf 100644 --- a/synapse/storage/databases/main/events_worker.py +++ b/synapse/storage/databases/main/events_worker.py @@ -14,6 +14,7 @@ import logging import threading +import weakref from typing import ( Collection, Container, @@ -174,6 +175,10 @@ def __init__(self, database: DatabasePool, db_conn, hs): # Dict[str, _EventCacheEntry]. self._current_event_fetches: Dict[str, ObservableDeferred] = {} + # We keep track of the events we have currently loaded in memory so that + # we can reuse them even if they've been evicted from the cache. + self._event_ref: Dict[str, EventBase] = weakref.WeakValueDictionary() + self._event_fetch_lock = threading.Condition() self._event_fetch_list = [] self._event_fetch_ongoing = 0 @@ -588,6 +593,8 @@ async def _get_events_from_cache_or_db( def _invalidate_get_event_cache(self, event_id): self._get_event_cache.invalidate((event_id,)) + self._event_ref.pop(event_id, None) + self._current_event_fetches.pop(event_id, None) def _get_events_from_cache( self, events: Iterable[str], update_metrics: bool = True @@ -601,13 +608,34 @@ def _get_events_from_cache( event_map = {} for event_id in events: + # First check if its in the event cache ret = self._get_event_cache.get( (event_id,), None, update_metrics=update_metrics ) - if not ret: - continue + if ret: + event_map[event_id] = ret + + # Otherwise check if we still have the event in memory. + event = self._event_ref.get(event_id) + if event: + redacted_event = None + if event.internal_metadata.redacted_by is not None: + # The event has been redacted, so we generate a redacted + # version. + redacted_event = prune_event(event) + redacted_event.unsigned[ + "redacted_by" + ] = event.internal_metadata.redacted_by + + cache_entry = _EventCacheEntry( + event=event, + redacted_event=redacted_event, + ) + event_map[event_id] = cache_entry - event_map[event_id] = ret + # We add the entry back into the cache as we want to keep + # recently queried events in the cache. + self._get_event_cache.set((event_id,), cache_entry) return event_map @@ -877,6 +905,11 @@ async def _get_events_from_db( original_ev, redactions, event_map ) + if redacted_event: + original_ev.internal_metadata.redacted_by = redacted_event.unsigned[ + "redacted_by" + ] + cache_entry = _EventCacheEntry( event=original_ev, redacted_event=redacted_event ) @@ -884,6 +917,8 @@ async def _get_events_from_db( self._get_event_cache.set((event_id,), cache_entry) result_map[event_id] = cache_entry + self._event_ref[event_id] = original_ev + return result_map async def _enqueue_events(self, events): From b292141b9dff435cf863536e2dad6deb4cdec2e1 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 3 Jun 2021 16:26:57 +0100 Subject: [PATCH 05/15] Tests --- .../databases/main/test_events_worker.py | 75 +++++++++++++++++++ 1 file changed, 75 insertions(+) diff --git a/tests/storage/databases/main/test_events_worker.py b/tests/storage/databases/main/test_events_worker.py index 932970fd9ad1..2ace4197ae69 100644 --- a/tests/storage/databases/main/test_events_worker.py +++ b/tests/storage/databases/main/test_events_worker.py @@ -14,7 +14,10 @@ import json from synapse.logging.context import LoggingContext +from synapse.rest import admin +from synapse.rest.client.v1 import login, room from synapse.storage.databases.main.events_worker import EventsWorkerStore +from synapse.util.async_helpers import yieldable_gather_results from tests import unittest @@ -94,3 +97,75 @@ def test_query_via_event_cache(self): res = self.get_success(self.store.have_seen_events("room1", ["event10"])) self.assertEquals(res, {"event10"}) self.assertEquals(ctx.get_resource_usage().db_txn_count, 0) + + +class EventCacheTestCase(unittest.HomeserverTestCase): + """Test that the various layers of event cache works.""" + + servlets = [ + admin.register_servlets, + room.register_servlets, + login.register_servlets, + ] + + def prepare(self, reactor, clock, hs): + self.store: EventsWorkerStore = hs.get_datastore() + + self.user = self.register_user("user", "pass") + self.token = self.login(self.user, "pass") + + self.room = self.helper.create_room_as(self.user, tok=self.token) + + res = self.helper.send(self.room, tok=self.token) + self.event_id = res["event_id"] + + # Reset the event cache so the tests start with it empty + self.store._get_event_cache.clear() + + def test_simple(self): + """Test that we cache events that we pull from the DB.""" + + with LoggingContext("test") as ctx: + self.get_success(self.store.get_event(self.event_id)) + + # We should have fetched the event from the DB + self.assertEqual(ctx.get_resource_usage().evt_db_fetch_count, 1) + + def test_event_ref(self): + """Test that we reuse events that are still in memory but have fallen + out of the cache, rather than requesting them from the DB. + """ + + # Reset the event cache + self.store._get_event_cache.clear() + + with LoggingContext("test") as ctx: + # We keep hold of the event event though we never use it. + event = self.get_success(self.store.get_event(self.event_id)) # noqa: F841 + + # We should have fetched the event from the DB + self.assertEqual(ctx.get_resource_usage().evt_db_fetch_count, 1) + + # Reset the event cache + self.store._get_event_cache.clear() + + with LoggingContext("test") as ctx: + self.get_success(self.store.get_event(self.event_id)) + + # Since the event is still in memory we shouldn't have fetched it + # from the DB + self.assertEqual(ctx.get_resource_usage().evt_db_fetch_count, 0) + + def test_dedupe(self): + """Test that if we request the same event multiple times we only pull it + out once. + """ + + with LoggingContext("test") as ctx: + d = yieldable_gather_results( + self.store.get_event, [self.event_id, self.event_id] + ) + self.get_success(d) + + # We should have fetched the event from the DB + self.assertEqual(ctx.get_resource_usage().evt_db_fetch_count, 1) From 9d6a77baa241bf705e6c79a10cdbfd6e0c163f80 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 4 Jun 2021 11:00:06 +0100 Subject: [PATCH 06/15] Newsfile --- changelog.d/10119.misc | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/10119.misc diff --git a/changelog.d/10119.misc b/changelog.d/10119.misc new file mode 100644 index 000000000000..21bf2c32f18d --- /dev/null +++ b/changelog.d/10119.misc @@ -0,0 +1 @@ +Try and ensure we only have one copy of an event in memory at a time. From 918f7be53f1aba77a5f2b958a013b5085ca42a01 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 3 Aug 2021 09:48:51 +0100 Subject: [PATCH 07/15] Revert "Track in memory events using weakrefs" This reverts commit 5afe3d23d61bed90d854253fc2033b60a4b33d5b. --- synapse/events/__init__.py | 6 +-- .../storage/databases/main/events_worker.py | 41 ++----------------- 2 files changed, 4 insertions(+), 43 deletions(-) diff --git a/synapse/events/__init__.py b/synapse/events/__init__.py index c6db7a10b5a6..0298af4c02d7 100644 --- a/synapse/events/__init__.py +++ b/synapse/events/__init__.py @@ -97,7 +97,7 @@ def __get__(self, instance, owner=None): class _EventInternalMetadata: - __slots__ = ["_dict", "stream_ordering", "outlier", "redacted_by"] + __slots__ = ["_dict", "stream_ordering", "outlier"] def __init__(self, internal_metadata_dict: JsonDict): # we have to copy the dict, because it turns out that the same dict is @@ -111,10 +111,6 @@ def __init__(self, internal_metadata_dict: JsonDict): # in the DAG) self.outlier = False - # Whether this event has a valid redaction event pointing at it (i.e. - # whether it should be redacted before giving to clients). - self.redacted_by: Optional[str] = None - out_of_band_membership: bool = DictProperty("out_of_band_membership") send_on_behalf_of: str = DictProperty("send_on_behalf_of") recheck_redaction: bool = DictProperty("recheck_redaction") diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py index f910a132e0bf..22d91c3eb97e 100644 --- a/synapse/storage/databases/main/events_worker.py +++ b/synapse/storage/databases/main/events_worker.py @@ -14,7 +14,6 @@ import logging import threading -import weakref from typing import ( Collection, Container, @@ -175,10 +174,6 @@ def __init__(self, database: DatabasePool, db_conn, hs): # Dict[str, _EventCacheEntry]. self._current_event_fetches: Dict[str, ObservableDeferred] = {} - # We keep track of the events we have currently loaded in memory so that - # we can reuse them even if they've been evicted from the cache. - self._event_ref: Dict[str, EventBase] = weakref.WeakValueDictionary() - self._event_fetch_lock = threading.Condition() self._event_fetch_list = [] self._event_fetch_ongoing = 0 @@ -593,8 +588,6 @@ async def _get_events_from_cache_or_db( def _invalidate_get_event_cache(self, event_id): self._get_event_cache.invalidate((event_id,)) - self._event_ref.pop(event_id, None) - self._current_event_fetches.pop(event_id, None) def _get_events_from_cache( self, events: Iterable[str], update_metrics: bool = True @@ -608,34 +601,13 @@ def _get_events_from_cache( event_map = {} for event_id in events: - # First check if its in the event cache ret = self._get_event_cache.get( (event_id,), None, update_metrics=update_metrics ) - if ret: - event_map[event_id] = ret - - # Otherwise check if we still have the event in memory. - event = self._event_ref.get(event_id) - if event: - redacted_event = None - if event.internal_metadata.redacted_by is not None: - # The event has been redacted, so we generate a redacted - # version. - redacted_event = prune_event(event) - redacted_event.unsigned[ - "redacted_by" - ] = event.internal_metadata.redacted_by - - cache_entry = _EventCacheEntry( - event=event, - redacted_event=redacted_event, - ) - event_map[event_id] = cache_entry + if not ret: + continue - # We add the entry back into the cache as we want to keep - # recently queried events in the cache. - self._get_event_cache.set((event_id,), cache_entry) + event_map[event_id] = ret return event_map @@ -905,11 +877,6 @@ async def _get_events_from_db( original_ev, redactions, event_map ) - if redacted_event: - original_ev.internal_metadata.redacted_by = redacted_event.unsigned[ - "redacted_by" - ] - cache_entry = _EventCacheEntry( event=original_ev, redacted_event=redacted_event ) @@ -917,8 +884,6 @@ async def _get_events_from_db( self._get_event_cache.set((event_id,), cache_entry) result_map[event_id] = cache_entry - self._event_ref[event_id] = original_ev - return result_map async def _enqueue_events(self, events): From d120766395dc8db996b30a71cbe3140ea87147c8 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 3 Aug 2021 09:49:46 +0100 Subject: [PATCH 08/15] Remove weakref test --- .../databases/main/test_events_worker.py | 25 ------------------- 1 file changed, 25 deletions(-) diff --git a/tests/storage/databases/main/test_events_worker.py b/tests/storage/databases/main/test_events_worker.py index 2ace4197ae69..d05d36768535 100644 --- a/tests/storage/databases/main/test_events_worker.py +++ b/tests/storage/databases/main/test_events_worker.py @@ -131,31 +131,6 @@ def test_simple(self): # We should have fetched the event from the DB self.assertEqual(ctx.get_resource_usage().evt_db_fetch_count, 1) - def test_event_ref(self): - """Test that we reuse events that are still in memory but have fallen - out of the cache, rather than requesting them from the DB. - """ - - # Reset the event cache - self.store._get_event_cache.clear() - - with LoggingContext("test") as ctx: - # We keep hold of the event event though we never use it. - event = self.get_success(self.store.get_event(self.event_id)) # noqa: F841 - - # We should have fetched the event from the DB - self.assertEqual(ctx.get_resource_usage().evt_db_fetch_count, 1) - - # Reset the event cache - self.store._get_event_cache.clear() - - with LoggingContext("test") as ctx: - self.get_success(self.store.get_event(self.event_id)) - - # Since the event is still in memory we shouldn't have fetched it - # from the DB - self.assertEqual(ctx.get_resource_usage().evt_db_fetch_count, 0) - def test_dedupe(self): """Test that if we request the same event multiple times we only pull it out once. From 059c85ee1f6d0e49f7bf0245bb18e39e0a875590 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 3 Aug 2021 09:50:31 +0100 Subject: [PATCH 09/15] Doc funcs may return rejected events. --- synapse/storage/databases/main/events_worker.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py index 22d91c3eb97e..ea41a586740e 100644 --- a/synapse/storage/databases/main/events_worker.py +++ b/synapse/storage/databases/main/events_worker.py @@ -592,7 +592,7 @@ def _invalidate_get_event_cache(self, event_id): def _get_events_from_cache( self, events: Iterable[str], update_metrics: bool = True ) -> Dict[str, _EventCacheEntry]: - """Fetch events from the caches + """Fetch events from the caches, may return rejected events. Args: events: list of event_ids to fetch @@ -737,7 +737,8 @@ def fire(evs, exc): async def _get_events_from_db( self, event_ids: Iterable[str] ) -> Dict[str, _EventCacheEntry]: - """Fetch a bunch of events from the database. + """Fetch a bunch of events from the database, may return rejected + events. Returned events will be added to the cache for future lookups. From a8b777da69c3c448345deb06db84e6dc4df9af4d Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 3 Aug 2021 09:51:37 +0100 Subject: [PATCH 10/15] Fixup comment --- synapse/storage/databases/main/events_worker.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py index ea41a586740e..0fb9f8aabd3e 100644 --- a/synapse/storage/databases/main/events_worker.py +++ b/synapse/storage/databases/main/events_worker.py @@ -529,7 +529,7 @@ async def _get_events_from_cache_or_db( for event_id in missing_events_ids: deferred = self._current_event_fetches.get(event_id) if deferred is not None: - # We're already pulling the event out of the DB, ad the deferred + # We're already pulling the event out of the DB. Add the deferred # to the collection of deferreds to wait on. already_fetching[event_id] = deferred.observe() else: From af02613fe3574dcc2c3ba9d01c525f7fad6974bb Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 3 Aug 2021 09:55:35 +0100 Subject: [PATCH 11/15] Refactor _current_event_fetches --- .../storage/databases/main/events_worker.py | 28 +++++++++---------- 1 file changed, 13 insertions(+), 15 deletions(-) diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py index 0fb9f8aabd3e..e5a27680b84c 100644 --- a/synapse/storage/databases/main/events_worker.py +++ b/synapse/storage/databases/main/events_worker.py @@ -519,27 +519,12 @@ async def _get_events_from_cache_or_db( # events out of the DB multiple times. already_fetching: Dict[str, defer.Deferred] = {} - # We also add entries to `self._current_event_fetches` for each event - # we're going to pull from the DB. We use a single deferred that - # resolves to all the events we pulled from the DB (this will result in - # this function returning more events than requested, but that can - # happen already due to `_get_events_from_db`). - fetching_deferred = ObservableDeferred(defer.Deferred()) - for event_id in missing_events_ids: deferred = self._current_event_fetches.get(event_id) if deferred is not None: # We're already pulling the event out of the DB. Add the deferred # to the collection of deferreds to wait on. already_fetching[event_id] = deferred.observe() - else: - # We're not already pulling the event from the DB, so add our - # deferred to the the map of events that are being fetched. - self._current_event_fetches[event_id] = fetching_deferred - fetching_deferred.observe().addBoth( - lambda _, event_id: self._current_event_fetches.pop(event_id, None), - event_id, - ) missing_events_ids.difference_update(already_fetching) @@ -547,6 +532,15 @@ async def _get_events_from_cache_or_db( log_ctx = current_context() log_ctx.record_event_fetch(len(missing_events_ids)) + # Add entries to `self._current_event_fetches` for each event we're + # going to pull from the DB. We use a single deferred that resolves + # to all the events we pulled from the DB (this will result in this + # function returning more events than requested, but that can happen + # already due to `_get_events_from_db`). + fetching_deferred = ObservableDeferred(defer.Deferred()) + for event_id in missing_events_ids: + self._current_event_fetches[event_id] = fetching_deferred + # Note that _get_events_from_db is also responsible for turning db rows # into FrozenEvents (via _get_event_from_row), which involves seeing if # the events have been redacted, and if so pulling the redaction event out @@ -561,6 +555,10 @@ async def _get_events_from_cache_or_db( except Exception as e: fetching_deferred.errback(e) raise e + finally: + # Ensure that we mark these events as no longer being fetched. + for event_id in missing_events_ids: + self._current_event_fetches.pop(event_id, None) fetching_deferred.callback(missing_events) From 0f9e19cd1100c974bb4d32265a575a63a10f3f3e Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 3 Aug 2021 09:56:20 +0100 Subject: [PATCH 12/15] Correctly drop logging contexts --- synapse/storage/databases/main/events_worker.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py index e5a27680b84c..f17eb9d49c61 100644 --- a/synapse/storage/databases/main/events_worker.py +++ b/synapse/storage/databases/main/events_worker.py @@ -553,14 +553,16 @@ async def _get_events_from_cache_or_db( event_entry_map.update(missing_events) except Exception as e: - fetching_deferred.errback(e) + with PreserveLoggingContext(): + fetching_deferred.errback(e) raise e finally: # Ensure that we mark these events as no longer being fetched. for event_id in missing_events_ids: self._current_event_fetches.pop(event_id, None) - fetching_deferred.callback(missing_events) + with PreserveLoggingContext(): + fetching_deferred.callback(missing_events) if already_fetching: # Wait for the other event requests to finish and add their results From 3d46c4a19fcc67d12eac3e2e85278b719f74b373 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 3 Aug 2021 10:04:36 +0100 Subject: [PATCH 13/15] Add types to ObservableDeferred --- synapse/storage/databases/main/events_worker.py | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py index f17eb9d49c61..21169eb2eab5 100644 --- a/synapse/storage/databases/main/events_worker.py +++ b/synapse/storage/databases/main/events_worker.py @@ -170,9 +170,12 @@ def __init__(self, database: DatabasePool, db_conn, hs): max_size=hs.config.caches.event_cache_size, ) - # Map from event ID to a deferred that will result in an - # Dict[str, _EventCacheEntry]. - self._current_event_fetches: Dict[str, ObservableDeferred] = {} + # Map from event ID to a deferred that will result in a map from event + # ID to cache entry. Note that the returned dict may not have the + # requested event in it if the event isn't in the DB. + self._current_event_fetches: Dict[ + str, ObservableDeferred[Dict[str, _EventCacheEntry]] + ] = {} self._event_fetch_lock = threading.Condition() self._event_fetch_list = [] @@ -537,7 +540,9 @@ async def _get_events_from_cache_or_db( # to all the events we pulled from the DB (this will result in this # function returning more events than requested, but that can happen # already due to `_get_events_from_db`). - fetching_deferred = ObservableDeferred(defer.Deferred()) + fetching_deferred: ObservableDeferred[ + Dict[str, _EventCacheEntry] + ] = ObservableDeferred(defer.Deferred()) for event_id in missing_events_ids: self._current_event_fetches[event_id] = fetching_deferred From b095d2126e17df35777e5c831db0546ffff46563 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 4 Aug 2021 13:26:11 +0100 Subject: [PATCH 14/15] Apply suggestions from code review Co-authored-by: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> --- changelog.d/10119.misc | 2 +- synapse/storage/databases/main/events_worker.py | 9 ++++++--- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/changelog.d/10119.misc b/changelog.d/10119.misc index 21bf2c32f18d..f70dc6496fcf 100644 --- a/changelog.d/10119.misc +++ b/changelog.d/10119.misc @@ -1 +1 @@ -Try and ensure we only have one copy of an event in memory at a time. +Improve event caching mechanism to avoid having multiple copies of an event in memory at a time. diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py index 21169eb2eab5..486506e5292b 100644 --- a/synapse/storage/databases/main/events_worker.py +++ b/synapse/storage/databases/main/events_worker.py @@ -597,7 +597,9 @@ def _invalidate_get_event_cache(self, event_id): def _get_events_from_cache( self, events: Iterable[str], update_metrics: bool = True ) -> Dict[str, _EventCacheEntry]: - """Fetch events from the caches, may return rejected events. + """Fetch events from the caches. + + May return rejected events. Args: events: list of event_ids to fetch @@ -742,8 +744,9 @@ def fire(evs, exc): async def _get_events_from_db( self, event_ids: Iterable[str] ) -> Dict[str, _EventCacheEntry]: - """Fetch a bunch of events from the database, may return rejected - events. + """Fetch a bunch of events from the database. + + May return rejected events. Returned events will be added to the cache for future lookups. From 8ea267a7c00f7647dfbd97d1f9877cb3cb84a3d2 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 4 Aug 2021 13:32:48 +0100 Subject: [PATCH 15/15] Fix lint --- synapse/storage/databases/main/events_worker.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py index 486506e5292b..375463e4e979 100644 --- a/synapse/storage/databases/main/events_worker.py +++ b/synapse/storage/databases/main/events_worker.py @@ -598,7 +598,7 @@ def _get_events_from_cache( self, events: Iterable[str], update_metrics: bool = True ) -> Dict[str, _EventCacheEntry]: """Fetch events from the caches. - + May return rejected events. Args: @@ -745,7 +745,7 @@ async def _get_events_from_db( self, event_ids: Iterable[str] ) -> Dict[str, _EventCacheEntry]: """Fetch a bunch of events from the database. - + May return rejected events. Returned events will be added to the cache for future lookups.