Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Clear event caches when we purge history #15609

Merged
merged 12 commits into from
Jun 8, 2023
1 change: 1 addition & 0 deletions changelog.d/15609.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Correctly clear caches when we delete a room.
113 changes: 113 additions & 0 deletions synapse/storage/databases/main/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@
# based on the current state when notifying workers over replication.
CURRENT_STATE_CACHE_NAME = "cs_cache_fake"

# As above, but for invalidating
DELETE_ROOM_CACHE_NAME = "dr_cache_fake"


class CacheInvalidationWorkerStore(SQLBaseStore):
def __init__(
Expand Down Expand Up @@ -175,6 +178,14 @@ def process_replication_rows(
room_id = row.keys[0]
members_changed = set(row.keys[1:])
self._invalidate_state_caches(room_id, members_changed)
elif row.cache_func == DELETE_ROOM_CACHE_NAME:
if row.keys is None:
raise Exception(
"Can't send an 'invalidate all' for 'delete room' cache"
)

room_id = row.keys[0]
self._invalidate_caches_for_room(room_id)
else:
self._attempt_to_invalidate_cache(row.cache_func, row.keys)

Expand Down Expand Up @@ -226,6 +237,9 @@ def _invalidate_caches_for_event(
relates_to: Optional[str],
backfilled: bool,
) -> None:
# XXX: If you add something to this function make sure you add it to
# `_invalidate_caches_for_room` as well.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this accurate? You now call explictly call _invalidate_caches_for_events and _invalidate_caches_for_room in the delete room case.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Whoops, that should say _invalidate_caches_for_events

# This invalidates any local in-memory cached event objects, the original
# process triggering the invalidation is responsible for clearing any external
# cached objects.
Expand Down Expand Up @@ -271,6 +285,102 @@ def _invalidate_caches_for_event(
self._attempt_to_invalidate_cache("get_thread_participated", (relates_to,))
self._attempt_to_invalidate_cache("get_threads", (room_id,))

def _invalidate_caches_for_room_and_stream(
self, txn: LoggingTransaction, room_id: str
) -> None:
"""Invalidate caches associated with rooms, and stream to replication.

Used when we delete events in rooms, but don't know which events we've
deleted.
"""

self._send_invalidation_to_replication(txn, DELETE_ROOM_CACHE_NAME, [room_id])
txn.call_after(self._invalidate_caches_for_room, room_id)

def _invalidate_caches_for_room(self, room_id: str) -> None:
"""Invalidate caches associated with rooms.

Used when we delete events in rooms, but don't know which events we've
deleted.
"""

self._invalidate_local_get_event_cache_all() # type: ignore[attr-defined]
DMRobertson marked this conversation as resolved.
Show resolved Hide resolved

self._attempt_to_invalidate_cache("have_seen_event", (room_id,))
self._attempt_to_invalidate_cache("get_latest_event_ids_in_room", (room_id,))
self._attempt_to_invalidate_cache(
"get_unread_event_push_actions_by_room_for_user", (room_id,)
)

self._attempt_to_invalidate_cache("_get_membership_from_event_id", None)
self._attempt_to_invalidate_cache("get_relations_for_event", None)
self._attempt_to_invalidate_cache("get_applicable_edit", None)
self._attempt_to_invalidate_cache("get_thread_id", None)
self._attempt_to_invalidate_cache("get_thread_id_for_receipts", None)
self._attempt_to_invalidate_cache("get_invited_rooms_for_local_user", None)
self._attempt_to_invalidate_cache(
"get_rooms_for_user_with_stream_ordering", None
)
self._attempt_to_invalidate_cache("get_rooms_for_user", None)
self._attempt_to_invalidate_cache("get_references_for_event", None)
self._attempt_to_invalidate_cache("get_thread_summary", None)
self._attempt_to_invalidate_cache("get_thread_participated", None)
self._attempt_to_invalidate_cache("get_threads", (room_id,))

self._attempt_to_invalidate_cache("_get_state_group_for_event", None)

# Also invalidate room based caches

self._attempt_to_invalidate_cache("get_account_data_for_room", None)
self._attempt_to_invalidate_cache("get_account_data_for_room_and_type", None)
self._attempt_to_invalidate_cache("get_aliases_for_room", (room_id,))
self._attempt_to_invalidate_cache("get_latest_event_ids_in_room", (room_id,))
self._attempt_to_invalidate_cache("_get_forward_extremeties_for_room", None)
self._attempt_to_invalidate_cache(
"get_unread_event_push_actions_by_room_for_user", (room_id,)
)
self._attempt_to_invalidate_cache("get_event_ordering", None)
self._attempt_to_invalidate_cache("is_partial_state_event", None)
self._attempt_to_invalidate_cache(
"_get_linearized_receipts_for_room", (room_id,)
)
self._attempt_to_invalidate_cache("is_room_blocked", (room_id,))
self._attempt_to_invalidate_cache("get_retention_policy_for_room", (room_id,))
self._attempt_to_invalidate_cache(
"_get_partial_state_servers_at_join", (room_id,)
)
self._attempt_to_invalidate_cache("is_partial_state_room", (room_id,))
self._attempt_to_invalidate_cache("get_invited_rooms_for_local_user", None)
self._attempt_to_invalidate_cache("_get_joined_profile_from_event_id", None)
self._attempt_to_invalidate_cache("is_host_invited", None)
self._attempt_to_invalidate_cache("is_host_joined", None)
self._attempt_to_invalidate_cache(
"get_current_hosts_in_room_ordered", (room_id,)
)
self._attempt_to_invalidate_cache("did_forget", None)
self._attempt_to_invalidate_cache("get_forgotten_rooms_for_user", None)
self._attempt_to_invalidate_cache("_get_membership_from_event_id", None)
self._attempt_to_invalidate_cache("get_room_version_id", (room_id,))

# And delete state caches.

self._attempt_to_invalidate_cache("get_partial_current_state_ids", (room_id,))
self._attempt_to_invalidate_cache("get_users_in_room", (room_id,))
self._attempt_to_invalidate_cache("get_current_hosts_in_room", (room_id,))
self._attempt_to_invalidate_cache("get_users_in_room_with_profiles", (room_id,))
self._attempt_to_invalidate_cache("get_number_joined_users_in_room", (room_id,))
self._attempt_to_invalidate_cache("get_local_users_in_room", (room_id,))
self._attempt_to_invalidate_cache("does_pair_of_users_share_a_room", None)
self._attempt_to_invalidate_cache("get_user_in_room_with_profile", None)
self._attempt_to_invalidate_cache(
"get_rooms_for_user_with_stream_ordering", None
)
self._attempt_to_invalidate_cache("get_rooms_for_user", None)
self._attempt_to_invalidate_cache("get_room_summary", (room_id,))
self._attempt_to_invalidate_cache("get_partial_current_state_ids", (room_id,))
DMRobertson marked this conversation as resolved.
Show resolved Hide resolved

# Plus we should clear the state cache in the state handler.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this TODO?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, yeah, I convinced myself that we don't need to worry about the state_cache in the state handler for now. As we do check before we insert new state groups that we haven't deleted the referenced state groups.

Will remove.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. Can you update the PR description to reflect this too?


async def invalidate_cache_and_stream(
self, cache_name: str, keys: Tuple[Any, ...]
) -> None:
Expand Down Expand Up @@ -377,6 +487,9 @@ def _send_invalidation_to_replication(
"Can't stream invalidate all with magic current state cache"
)

if cache_name == DELETE_ROOM_CACHE_NAME and keys is None:
raise Exception("Can't stream invalidate all with magic delete room cache")

if isinstance(self.database_engine, PostgresEngine):
assert self._cache_id_gen is not None

Expand Down
9 changes: 9 additions & 0 deletions synapse/storage/databases/main/events_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -903,6 +903,15 @@ def _invalidate_local_get_event_cache(self, event_id: str) -> None:
self._event_ref.pop(event_id, None)
self._current_event_fetches.pop(event_id, None)

def _invalidate_local_get_event_cache_all(self) -> None:
"""Clears the in-memory get event caches.

Used when we purge room history.
"""
self._get_event_cache.clear()
self._event_ref.clear()
self._current_event_fetches.clear()
DMRobertson marked this conversation as resolved.
Show resolved Hide resolved

async def _get_events_from_cache(
self, events: Iterable[str], update_metrics: bool = True
) -> Dict[str, EventCacheEntry]:
Expand Down
8 changes: 3 additions & 5 deletions synapse/storage/databases/main/purge_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,8 @@ def _purge_history_txn(

logger.info("[purge] done")

self._invalidate_caches_for_room_and_stream(txn, room_id)

return referenced_state_groups

async def purge_room(self, room_id: str) -> List[int]:
Expand Down Expand Up @@ -485,10 +487,6 @@ def _purge_room_txn(self, txn: LoggingTransaction, room_id: str) -> List[int]:
# index on them. In any case we should be clearing out 'stream' tables
# periodically anyway (#5888)

# TODO: we could probably usefully do a bunch more cache invalidation here

# XXX: as with purge_history, this is racy, but no worse than other races
# that already exist.
self._invalidate_cache_and_stream(txn, self.have_seen_event, (room_id,))
self._invalidate_caches_for_room_and_stream(txn, room_id)

return state_groups
2 changes: 1 addition & 1 deletion synapse/util/caches/lrucache.py
Original file line number Diff line number Diff line change
Expand Up @@ -864,5 +864,5 @@ def invalidate_local(self, key: KT) -> None:
async def contains(self, key: KT) -> bool:
return self._lru_cache.contains(key)

async def clear(self) -> None:
def clear(self) -> None:
self._lru_cache.clear()
2 changes: 1 addition & 1 deletion tests/handlers/test_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ def test_unknown_room_version(self) -> None:
# Blow away caches (supported room versions can only change due to a restart).
self.store.get_rooms_for_user_with_stream_ordering.invalidate_all()
self.store.get_rooms_for_user.invalidate_all()
self.get_success(self.store._get_event_cache.clear())
self.store._get_event_cache.clear()
self.store._event_ref.clear()

# The rooms should be excluded from the sync response.
Expand Down
3 changes: 0 additions & 3 deletions tests/rest/client/test_read_marker.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,9 +131,6 @@ def send_message() -> str:
event = self.get_success(self.store.get_event(event_id_1, allow_none=True))
assert event is None

# TODO See https://github.com/matrix-org/synapse/issues/13476
self.store.get_event_ordering.invalidate_all()

# Test moving the read marker to a newer event
event_id_2 = send_message()
channel = self.make_request(
Expand Down
8 changes: 4 additions & 4 deletions tests/storage/databases/main/test_events_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
self.event_id = res["event_id"]

# Reset the event cache so the tests start with it empty
self.get_success(self.store._get_event_cache.clear())
self.store._get_event_cache.clear()

def test_simple(self) -> None:
"""Test that we cache events that we pull from the DB."""
Expand All @@ -205,7 +205,7 @@ def test_event_ref(self) -> None:
"""

# Reset the event cache
self.get_success(self.store._get_event_cache.clear())
self.store._get_event_cache.clear()

with LoggingContext("test") as ctx:
# We keep hold of the event event though we never use it.
Expand All @@ -215,7 +215,7 @@ def test_event_ref(self) -> None:
self.assertEqual(ctx.get_resource_usage().evt_db_fetch_count, 1)

# Reset the event cache
self.get_success(self.store._get_event_cache.clear())
self.store._get_event_cache.clear()

with LoggingContext("test") as ctx:
self.get_success(self.store.get_event(self.event_id))
Expand Down Expand Up @@ -390,7 +390,7 @@ def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
self.event_id = res["event_id"]

# Reset the event cache so the tests start with it empty
self.get_success(self.store._get_event_cache.clear())
self.store._get_event_cache.clear()

@contextmanager
def blocking_get_event_calls(
Expand Down