Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
remove retry_on_integrity_error wrapper for persist_events (#7848)
Browse files Browse the repository at this point in the history
As far as I can tell from the sentry logs, the only time this has actually done
anything in the last two years is when we had two master workers running at
once, and even then, it made a bit of a mess of it (see
#7845 (comment)).

Generally I feel like this code is doing more harm than good.
  • Loading branch information
richvdh authored Jul 15, 2020
1 parent 8d0097b commit 1d9dca0
Show file tree
Hide file tree
Showing 2 changed files with 1 addition and 67 deletions.
1 change: 1 addition & 0 deletions changelog.d/7848.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Remove redundant `retry_on_integrity_error` wrapper for event persistence code.
67 changes: 0 additions & 67 deletions synapse/storage/data_stores/main/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import itertools
import logging
from collections import OrderedDict, namedtuple
from functools import wraps
from typing import TYPE_CHECKING, Dict, Iterable, List, Tuple

import attr
Expand Down Expand Up @@ -69,27 +68,6 @@ def encode_json(json_object):
_EventCacheEntry = namedtuple("_EventCacheEntry", ("event", "redacted_event"))


def _retry_on_integrity_error(func):
"""Wraps a database function so that it gets retried on IntegrityError,
with `delete_existing=True` passed in.
Args:
func: function that returns a Deferred and accepts a `delete_existing` arg
"""

@wraps(func)
@defer.inlineCallbacks
def f(self, *args, **kwargs):
try:
res = yield func(self, *args, delete_existing=False, **kwargs)
except self.database_engine.module.IntegrityError:
logger.exception("IntegrityError, retrying.")
res = yield func(self, *args, delete_existing=True, **kwargs)
return res

return f


@attr.s(slots=True)
class DeltaState:
"""Deltas to use to update the `current_state_events` table.
Expand Down Expand Up @@ -134,7 +112,6 @@ def __init__(self, hs: "HomeServer", db: Database, main_data_store: "DataStore")
hs.config.worker.writers.events == hs.get_instance_name()
), "Can only instantiate EventsStore on master"

@_retry_on_integrity_error
@defer.inlineCallbacks
def _persist_events_and_state_updates(
self,
Expand All @@ -143,7 +120,6 @@ def _persist_events_and_state_updates(
state_delta_for_room: Dict[str, DeltaState],
new_forward_extremeties: Dict[str, List[str]],
backfilled: bool = False,
delete_existing: bool = False,
):
"""Persist a set of events alongside updates to the current state and
forward extremities tables.
Expand All @@ -157,7 +133,6 @@ def _persist_events_and_state_updates(
new_forward_extremities: Map from room_id to list of event IDs
that are the new forward extremities of the room.
backfilled
delete_existing
Returns:
Deferred: resolves when the events have been persisted
Expand Down Expand Up @@ -197,7 +172,6 @@ def _persist_events_and_state_updates(
self._persist_events_txn,
events_and_contexts=events_and_contexts,
backfilled=backfilled,
delete_existing=delete_existing,
state_delta_for_room=state_delta_for_room,
new_forward_extremeties=new_forward_extremeties,
)
Expand Down Expand Up @@ -341,7 +315,6 @@ def _persist_events_txn(
txn: LoggingTransaction,
events_and_contexts: List[Tuple[EventBase, EventContext]],
backfilled: bool,
delete_existing: bool = False,
state_delta_for_room: Dict[str, DeltaState] = {},
new_forward_extremeties: Dict[str, List[str]] = {},
):
Expand Down Expand Up @@ -393,13 +366,6 @@ def _persist_events_txn(
# From this point onwards the events are only events that we haven't
# seen before.

if delete_existing:
# For paranoia reasons, we go and delete all the existing entries
# for these events so we can reinsert them.
# This gets around any problems with some tables already having
# entries.
self._delete_existing_rows_txn(txn, events_and_contexts=events_and_contexts)

self._store_event_txn(txn, events_and_contexts=events_and_contexts)

# Insert into event_to_state_groups.
Expand Down Expand Up @@ -797,39 +763,6 @@ def _update_outliers_txn(self, txn, events_and_contexts):

return [ec for ec in events_and_contexts if ec[0] not in to_remove]

@classmethod
def _delete_existing_rows_txn(cls, txn, events_and_contexts):
if not events_and_contexts:
# nothing to do here
return

logger.info("Deleting existing")

for table in (
"events",
"event_auth",
"event_json",
"event_edges",
"event_forward_extremities",
"event_reference_hashes",
"event_search",
"event_to_state_groups",
"state_events",
"rejections",
"redactions",
"room_memberships",
):
txn.executemany(
"DELETE FROM %s WHERE event_id = ?" % (table,),
[(ev.event_id,) for ev, _ in events_and_contexts],
)

for table in ("event_push_actions",):
txn.executemany(
"DELETE FROM %s WHERE room_id = ? AND event_id = ?" % (table,),
[(ev.room_id, ev.event_id) for ev, _ in events_and_contexts],
)

def _store_event_txn(self, txn, events_and_contexts):
"""Insert new events into the event and event_json tables
Expand Down

0 comments on commit 1d9dca0

Please sign in to comment.