Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Add more logging around message retention policies support #6717

Merged
merged 15 commits into from
Jan 17, 2020
Merged
1 change: 1 addition & 0 deletions changelog.d/6714.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix a bug causing Synapse to not always purge quiet rooms with a low `max_lifetime` in their message retention policies.
8 changes: 8 additions & 0 deletions synapse/config/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,14 @@ def read_config(self, config, **kwargs):
self.retention_default_min_lifetime = None
self.retention_default_max_lifetime = None

if self.retention_enabled:
logger.info(
"Message retention policies support enabled with the following default"
" policy: min_lifetime = %s ; max_lifetime = %s",
self.retention_default_min_lifetime,
self.retention_default_max_lifetime,
)

self.retention_allowed_lifetime_min = retention_config.get(
"allowed_lifetime_min"
)
Expand Down
15 changes: 14 additions & 1 deletion synapse/handlers/pagination.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ def __init__(self, hs):
if hs.config.retention_enabled:
# Run the purge jobs described in the configuration file.
for job in hs.config.retention_purge_jobs:
logger.info("Setting up purge job with config: %s", job)

self.clock.looping_call(
run_as_background_process,
job["interval"],
Expand Down Expand Up @@ -130,11 +132,22 @@ def purge_history_for_rooms_in_range(self, min_ms, max_ms):
else:
include_null = False

logger.info(
"[purge] Running purge job for %d < max_lifetime <= %d (include NULLs = %s)",
min_ms,
max_ms,
include_null,
)

rooms = yield self.store.get_rooms_for_retention_period_in_range(
min_ms, max_ms, include_null
)

logger.debug("[purge] Rooms to purge: %s", rooms)

for room_id, retention_policy in iteritems(rooms):
logger.info("[purge] Attempting to purge messages in room %s", room_id)

if room_id in self._purges_in_progress_by_room:
logger.warning(
"[purge] not purging room %s as there's an ongoing purge running"
Expand All @@ -156,7 +169,7 @@ def purge_history_for_rooms_in_range(self, min_ms, max_ms):

stream_ordering = yield self.store.find_first_stream_ordering_after_ts(ts)

r = yield self.store.get_room_event_after_stream_ordering(
r = yield self.store.get_room_event_before_stream_ordering(
room_id, stream_ordering,
)
if not r:
Expand Down
68 changes: 56 additions & 12 deletions synapse/storage/data_stores/main/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -536,20 +536,64 @@ def get_room_event_after_stream_ordering(self, room_id, stream_ordering):
Deferred[(int, int, str)]:
(stream ordering, topological ordering, event_id)
"""
return self.db.runInteraction(
"get_room_event_after_stream_ordering",
self.get_room_event_around_stream_ordering_txn,
room_id,
stream_ordering,
"f",
)

def _f(txn):
sql = (
"SELECT stream_ordering, topological_ordering, event_id"
" FROM events"
" WHERE room_id = ? AND stream_ordering >= ?"
" AND NOT outlier"
" ORDER BY stream_ordering"
" LIMIT 1"
)
txn.execute(sql, (room_id, stream_ordering))
return txn.fetchone()
def get_room_event_before_stream_ordering(self, room_id, stream_ordering):
"""Gets details of the first event in a room at or before a stream ordering

Args:
room_id (str):
stream_ordering (int):

return self.db.runInteraction("get_room_event_after_stream_ordering", _f)
Returns:
Deferred[(int, int, str)]:
(stream ordering, topological ordering, event_id)
"""
return self.db.runInteraction(
"get_room_event_before_stream_ordering",
self.get_room_event_around_stream_ordering_txn,
room_id,
stream_ordering,
"b",
)

def get_room_event_around_stream_ordering_txn(
self, txn, room_id, stream_ordering, dir="f"
):
"""Gets details of the first event in a room at or either after or before a
stream ordering, depending on the provided direction.

Args:
room_id (str):
stream_ordering (int):
dir (str): Direction in which we're looking towards in the room's history,
either "f" (forward) or "b" (backward).

Returns:
Deferred[(int, int, str)]:
(stream ordering, topological ordering, event_id)
"""
# Figure out which comparison operation to perform and how to order the results,
# using the provided direction.
op = "<=" if dir == "b" else ">="
order = "DESC" if dir == "b" else "ASC"

sql = (
"SELECT stream_ordering, topological_ordering, event_id"
" FROM events"
" WHERE room_id = ? AND stream_ordering %s ?"
" AND NOT outlier"
" ORDER BY stream_ordering %s"
" LIMIT 1"
) % (op, order)
txn.execute(sql, (room_id, stream_ordering))
return txn.fetchone()

@defer.inlineCallbacks
def get_room_events_max_id(self, room_id=None):
Expand Down