Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Track in memory events using weakrefs #10533

Merged
merged 9 commits into from
May 17, 2022
1 change: 1 addition & 0 deletions changelog.d/10533.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Improve event caching mechanism to avoid having multiple copies of an event in memory at a time.
6 changes: 5 additions & 1 deletion synapse/events/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ def __get__(self, instance, owner=None):


class _EventInternalMetadata:
__slots__ = ["_dict", "stream_ordering", "outlier"]
__slots__ = ["_dict", "stream_ordering", "outlier", "redacted_by"]

def __init__(self, internal_metadata_dict: JsonDict):
# we have to copy the dict, because it turns out that the same dict is
Expand All @@ -111,6 +111,10 @@ def __init__(self, internal_metadata_dict: JsonDict):
# in the DAG)
self.outlier = False

# Whether this event has a valid redaction event pointing at it (i.e.
# whether it should be redacted before giving to clients).
self.redacted_by: Optional[str] = None
erikjohnston marked this conversation as resolved.
Show resolved Hide resolved

out_of_band_membership: bool = DictProperty("out_of_band_membership")
send_on_behalf_of: str = DictProperty("send_on_behalf_of")
recheck_redaction: bool = DictProperty("recheck_redaction")
Expand Down
43 changes: 40 additions & 3 deletions synapse/storage/databases/main/events_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

import logging
import threading
import weakref
from typing import (
Collection,
Container,
Expand Down Expand Up @@ -177,6 +178,10 @@ def __init__(self, database: DatabasePool, db_conn, hs):
str, ObservableDeferred[Dict[str, _EventCacheEntry]]
] = {}

# We keep track of the events we have currently loaded in memory so that
# we can reuse them even if they've been evicted from the cache.
self._event_ref: Dict[str, EventBase] = weakref.WeakValueDictionary()

self._event_fetch_lock = threading.Condition()
self._event_fetch_list = []
self._event_fetch_ongoing = 0
Expand Down Expand Up @@ -593,6 +598,8 @@ async def _get_events_from_cache_or_db(

def _invalidate_get_event_cache(self, event_id):
self._get_event_cache.invalidate((event_id,))
self._event_ref.pop(event_id, None)
self._current_event_fetches.pop(event_id, None)

def _get_events_from_cache(
self, events: Iterable[str], update_metrics: bool = True
Expand All @@ -608,13 +615,36 @@ def _get_events_from_cache(
event_map = {}

for event_id in events:
# First check if its in the event cache
erikjohnston marked this conversation as resolved.
Show resolved Hide resolved
ret = self._get_event_cache.get(
(event_id,), None, update_metrics=update_metrics
)
if not ret:
continue
if ret:
event_map[event_id] = ret

# Otherwise check if we still have the event in memory.
DMRobertson marked this conversation as resolved.
Show resolved Hide resolved
event = self._event_ref.get(event_id)
if event:
# Reconstruct an event cache entry

redacted_event = None
if event.internal_metadata.redacted_by is not None:
# The event has been redacted, so we generate a redacted
# version.
redacted_event = prune_event(event)
redacted_event.unsigned[
"redacted_by"
] = event.internal_metadata.redacted_by
erikjohnston marked this conversation as resolved.
Show resolved Hide resolved

cache_entry = _EventCacheEntry(
event=event,
redacted_event=redacted_event,
)
event_map[event_id] = cache_entry

event_map[event_id] = ret
# We add the entry back into the cache as we want to keep
# recently queried events in the cache.
self._get_event_cache.set((event_id,), cache_entry)
erikjohnston marked this conversation as resolved.
Show resolved Hide resolved

return event_map

Expand Down Expand Up @@ -886,13 +916,20 @@ async def _get_events_from_db(
original_ev, redactions, event_map
)

if redacted_event:
original_ev.internal_metadata.redacted_by = redacted_event.unsigned[
"redacted_by"
]

cache_entry = _EventCacheEntry(
event=original_ev, redacted_event=redacted_event
)

self._get_event_cache.set((event_id,), cache_entry)
result_map[event_id] = cache_entry

self._event_ref[event_id] = original_ev

return result_map

async def _enqueue_events(self, events):
Expand Down
25 changes: 25 additions & 0 deletions tests/storage/databases/main/test_events_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,31 @@ def test_simple(self):
# We should have fetched the event from the DB
self.assertEqual(ctx.get_resource_usage().evt_db_fetch_count, 1)

def test_event_ref(self):
"""Test that we reuse events that are still in memory but have fallen
out of the cache, rather than requesting them from the DB.
"""

# Reset the event cache
self.store._get_event_cache.clear()

with LoggingContext("test") as ctx:
# We keep hold of the event event though we never use it.
event = self.get_success(self.store.get_event(self.event_id)) # noqa: F841

# We should have fetched the event from the DB
self.assertEqual(ctx.get_resource_usage().evt_db_fetch_count, 1)

# Reset the event cache
self.store._get_event_cache.clear()

with LoggingContext("test") as ctx:
self.get_success(self.store.get_event(self.event_id))

# Since the event is still in memory we shouldn't have fetched it
# from the DB
self.assertEqual(ctx.get_resource_usage().evt_db_fetch_count, 0)

def test_dedupe(self):
"""Test that if we request the same event multiple times we only pull it
out once.
Expand Down