diff --git a/changelog.d/6714.bugfix b/changelog.d/6714.bugfix new file mode 100644 index 000000000000..410516694fc6 --- /dev/null +++ b/changelog.d/6714.bugfix @@ -0,0 +1 @@ +Fix a bug causing Synapse to not always purge quiet rooms with a low `max_lifetime` in their message retention policies when running the automated purge jobs. diff --git a/changelog.d/6717.misc b/changelog.d/6717.misc new file mode 100644 index 000000000000..a2a7776126b0 --- /dev/null +++ b/changelog.d/6717.misc @@ -0,0 +1 @@ +Add more logging around message retention policies support. diff --git a/synapse/config/server.py b/synapse/config/server.py index 9ac112233b0c..0ec1b0fadd78 100644 --- a/synapse/config/server.py +++ b/synapse/config/server.py @@ -294,6 +294,14 @@ def read_config(self, config, **kwargs): self.retention_default_min_lifetime = None self.retention_default_max_lifetime = None + if self.retention_enabled: + logger.info( + "Message retention policies support enabled with the following default" + " policy: min_lifetime = %s ; max_lifetime = %s", + self.retention_default_min_lifetime, + self.retention_default_max_lifetime, + ) + self.retention_allowed_lifetime_min = retention_config.get( "allowed_lifetime_min" ) diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py index 00a6afc963d4..71d76202c93d 100644 --- a/synapse/handlers/pagination.py +++ b/synapse/handlers/pagination.py @@ -88,6 +88,8 @@ def __init__(self, hs): if hs.config.retention_enabled: # Run the purge jobs described in the configuration file. for job in hs.config.retention_purge_jobs: + logger.info("Setting up purge job with config: %s", job) + self.clock.looping_call( run_as_background_process, job["interval"], @@ -130,11 +132,22 @@ def purge_history_for_rooms_in_range(self, min_ms, max_ms): else: include_null = False + logger.info( + "[purge] Running purge job for %d < max_lifetime <= %d (include NULLs = %s)", + min_ms, + max_ms, + include_null, + ) + rooms = yield self.store.get_rooms_for_retention_period_in_range( min_ms, max_ms, include_null ) + logger.debug("[purge] Rooms to purge: %s", rooms) + for room_id, retention_policy in iteritems(rooms): + logger.info("[purge] Attempting to purge messages in room %s", room_id) + if room_id in self._purges_in_progress_by_room: logger.warning( "[purge] not purging room %s as there's an ongoing purge running" @@ -156,7 +169,7 @@ def purge_history_for_rooms_in_range(self, min_ms, max_ms): stream_ordering = yield self.store.find_first_stream_ordering_after_ts(ts) - r = yield self.store.get_room_event_after_stream_ordering( + r = yield self.store.get_room_event_before_stream_ordering( room_id, stream_ordering, ) if not r: diff --git a/synapse/rest/admin/__init__.py b/synapse/rest/admin/__init__.py index a10b4a9b7263..2932fe2123d8 100644 --- a/synapse/rest/admin/__init__.py +++ b/synapse/rest/admin/__init__.py @@ -107,7 +107,7 @@ async def on_POST(self, request, room_id, event_id): stream_ordering = await self.store.find_first_stream_ordering_after_ts(ts) - r = await self.store.get_room_event_after_stream_ordering( + r = await self.store.get_room_event_before_stream_ordering( room_id, stream_ordering ) if not r: diff --git a/synapse/storage/data_stores/main/stream.py b/synapse/storage/data_stores/main/stream.py index 140da8dad686..056b25b13a3f 100644 --- a/synapse/storage/data_stores/main/stream.py +++ b/synapse/storage/data_stores/main/stream.py @@ -525,8 +525,8 @@ def get_recent_event_ids_for_room(self, room_id, limit, end_token): return rows, token - def get_room_event_after_stream_ordering(self, room_id, stream_ordering): - """Gets details of the first event in a room at or after a stream ordering + def get_room_event_before_stream_ordering(self, room_id, stream_ordering): + """Gets details of the first event in a room at or before a stream ordering Args: room_id (str): @@ -541,15 +541,15 @@ def _f(txn): sql = ( "SELECT stream_ordering, topological_ordering, event_id" " FROM events" - " WHERE room_id = ? AND stream_ordering >= ?" + " WHERE room_id = ? AND stream_ordering <= ?" " AND NOT outlier" - " ORDER BY stream_ordering" + " ORDER BY stream_ordering DESC" " LIMIT 1" ) txn.execute(sql, (room_id, stream_ordering)) return txn.fetchone() - return self.db.runInteraction("get_room_event_after_stream_ordering", _f) + return self.db.runInteraction("get_room_event_before_stream_ordering", _f) @defer.inlineCallbacks def get_room_events_max_id(self, room_id=None):