Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Track in memory events using weakrefs #10533

Merged
merged 9 commits into from
May 17, 2022
1 change: 1 addition & 0 deletions changelog.d/10533.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Improve event caching mechanism to avoid having multiple copies of an event in memory at a time.
35 changes: 33 additions & 2 deletions synapse/storage/databases/main/events_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

import logging
import threading
import weakref
from enum import Enum, auto
from typing import (
TYPE_CHECKING,
Expand All @@ -23,6 +24,7 @@
Dict,
Iterable,
List,
MutableMapping,
Optional,
Set,
Tuple,
Expand Down Expand Up @@ -248,6 +250,12 @@ def __init__(
str, ObservableDeferred[Dict[str, EventCacheEntry]]
] = {}

# We keep track of the events we have currently loaded in memory so that
# we can reuse them even if they've been evicted from the cache. We only
# track events that don't need redacting in here (as then we don't need
# to track redaction status).
self._event_ref: MutableMapping[str, EventBase] = weakref.WeakValueDictionary()

self._event_fetch_lock = threading.Condition()
self._event_fetch_list: List[
Tuple[Iterable[str], "defer.Deferred[Dict[str, _EventRow]]"]
Expand Down Expand Up @@ -723,6 +731,8 @@ async def get_missing_events_from_db() -> Dict[str, EventCacheEntry]:

def _invalidate_get_event_cache(self, event_id: str) -> None:
self._get_event_cache.invalidate((event_id,))
self._event_ref.pop(event_id, None)
self._current_event_fetches.pop(event_id, None)

def _get_events_from_cache(
self, events: Iterable[str], update_metrics: bool = True
Expand All @@ -738,13 +748,30 @@ def _get_events_from_cache(
event_map = {}

for event_id in events:
# First check if it's in the event cache
ret = self._get_event_cache.get(
(event_id,), None, update_metrics=update_metrics
)
if not ret:
if ret:
event_map[event_id] = ret
continue

event_map[event_id] = ret
# Otherwise check if we still have the event in memory.
DMRobertson marked this conversation as resolved.
Show resolved Hide resolved
event = self._event_ref.get(event_id)
if event:
# Reconstruct an event cache entry

cache_entry = EventCacheEntry(
event=event,
# We don't cache weakrefs to redacted events, so we know
# this is None.
redacted_event=None,
)
event_map[event_id] = cache_entry

# We add the entry back into the cache as we want to keep
# recently queried events in the cache.
self._get_event_cache.set((event_id,), cache_entry)

return event_map

Expand Down Expand Up @@ -1124,6 +1151,10 @@ async def _get_events_from_db(
self._get_event_cache.set((event_id,), cache_entry)
result_map[event_id] = cache_entry

if not redacted_event:
# We only cache references to unredacted events.
self._event_ref[event_id] = original_ev

return result_map

async def _enqueue_events(self, events: Collection[str]) -> Dict[str, _EventRow]:
Expand Down
1 change: 1 addition & 0 deletions tests/handlers/test_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ def test_unknown_room_version(self):
# Blow away caches (supported room versions can only change due to a restart).
self.store.get_rooms_for_user_with_stream_ordering.invalidate_all()
self.store._get_event_cache.clear()
self.store._event_ref.clear()

# The rooms should be excluded from the sync response.
# Get a new request key.
Expand Down
25 changes: 25 additions & 0 deletions tests/storage/databases/main/test_events_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,31 @@ def test_simple(self):
# We should have fetched the event from the DB
self.assertEqual(ctx.get_resource_usage().evt_db_fetch_count, 1)

def test_event_ref(self):
"""Test that we reuse events that are still in memory but have fallen
out of the cache, rather than requesting them from the DB.
"""

# Reset the event cache
self.store._get_event_cache.clear()

with LoggingContext("test") as ctx:
# We keep hold of the event event though we never use it.
event = self.get_success(self.store.get_event(self.event_id)) # noqa: F841

# We should have fetched the event from the DB
self.assertEqual(ctx.get_resource_usage().evt_db_fetch_count, 1)

# Reset the event cache
self.store._get_event_cache.clear()

with LoggingContext("test") as ctx:
self.get_success(self.store.get_event(self.event_id))

# Since the event is still in memory we shouldn't have fetched it
# from the DB
self.assertEqual(ctx.get_resource_usage().evt_db_fetch_count, 0)

def test_dedupe(self):
"""Test that if we request the same event multiple times we only pull it
out once.
Expand Down