Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Fix spinloop during partial state sync when a prev event is in backoff #15351

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/15351.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix a bug introduced in Synapse 1.70.0 where the background sync from a faster join could spin for hours when one of the events involved had been marked for backoff.
17 changes: 13 additions & 4 deletions synapse/api/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@

if typing.TYPE_CHECKING:
from synapse.config.homeserver import HomeServerConfig
from synapse.types import JsonDict
from synapse.types import JsonDict, StrCollection

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -682,18 +682,27 @@ class FederationPullAttemptBackoffError(RuntimeError):
Attributes:
event_id: The event_id which we are refusing to pull
message: A custom error message that gives more context
retry_after_ms: The remaining backoff interval, in milliseconds
"""

def __init__(self, event_ids: List[str], message: Optional[str]):
self.event_ids = event_ids
def __init__(
self, event_ids: "StrCollection", message: Optional[str], retry_after_ms: int
):
event_ids = list(event_ids)

if message:
error_message = message
else:
error_message = f"Not attempting to pull event_ids={self.event_ids} because we already tried to pull them recently (backing off)."
error_message = (
f"Not attempting to pull event_ids={event_ids} because we already "
"tried to pull them recently (backing off)."
)

super().__init__(error_message)

self.event_ids = event_ids
self.retry_after_ms = retry_after_ms


class HttpResponseException(CodeMessageException):
"""
Expand Down
36 changes: 18 additions & 18 deletions synapse/handlers/federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -1949,27 +1949,25 @@ async def _sync_partial_state_room(
)
for event in events:
for attempt in itertools.count():
# We try a new destination on every iteration.
try:
await self._federation_event_handler.update_state_for_partial_state_event(
destination, event
)
while True:
try:
await self._federation_event_handler.update_state_for_partial_state_event(
destination, event
)
break
except FederationPullAttemptBackoffError as e:
# We are in the backoff period for one of the event's
# prev_events. Wait it out and try again after.
logger.warning(
"%s; waiting for %d ms...", e, e.retry_after_ms
)
await self.clock.sleep(e.retry_after_ms / 1000)

# Success, no need to try the rest of the destinations.
break
except FederationPullAttemptBackoffError as exc:
# Log a warning about why we failed to process the event (the error message
# for `FederationPullAttemptBackoffError` is pretty good)
logger.warning("_sync_partial_state_room: %s", exc)
# We do not record a failed pull attempt when we backoff fetching a missing
# `prev_event` because not being able to fetch the `prev_events` just means
# we won't be able to de-outlier the pulled event. But we can still use an
# `outlier` in the state/auth chain for another event. So we shouldn't stop
# a downstream event from trying to pull it.
#
# This avoids a cascade of backoff for all events in the DAG downstream from
# one event backoff upstream.
except FederationError as e:
# TODO: We should `record_event_failed_pull_attempt` here,
# see https://github.com/matrix-org/synapse/issues/13700

if attempt == len(destinations) - 1:
# We have tried every remote server for this event. Give up.
# TODO(faster_joins) giving up isn't the right thing to do
Expand All @@ -1986,6 +1984,8 @@ async def _sync_partial_state_room(
destination,
e,
)
# TODO: We should `record_event_failed_pull_attempt` here,
# see https://github.com/matrix-org/synapse/issues/13700
raise

# Try the next remote server.
Expand Down
24 changes: 17 additions & 7 deletions synapse/handlers/federation_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ class FederationEventHandler:
"""

def __init__(self, hs: "HomeServer"):
self._clock = hs.get_clock()
self._store = hs.get_datastores().main
self._storage_controllers = hs.get_storage_controllers()
self._state_storage_controller = self._storage_controllers.state
Expand Down Expand Up @@ -1038,8 +1039,8 @@ async def _compute_event_context_with_maybe_missing_prevs(

Raises:
FederationPullAttemptBackoffError if we are are deliberately not attempting
to pull the given event over federation because we've already done so
recently and are backing off.
to pull one of the given event's `prev_event`s over federation because
we've already done so recently and are backing off.
FederationError if we fail to get the state from the remote server after any
missing `prev_event`s.
"""
Expand All @@ -1053,13 +1054,22 @@ async def _compute_event_context_with_maybe_missing_prevs(
# If we've already recently attempted to pull this missing event, don't
# try it again so soon. Since we have to fetch all of the prev_events, we can
# bail early here if we find any to ignore.
prevs_to_ignore = await self._store.get_event_ids_to_not_pull_from_backoff(
room_id, missing_prevs
prevs_with_pull_backoff = (
await self._store.get_event_ids_to_not_pull_from_backoff(
room_id, missing_prevs
)
)
if len(prevs_to_ignore) > 0:
if len(prevs_with_pull_backoff) > 0:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the better name 👍

raise FederationPullAttemptBackoffError(
event_ids=prevs_to_ignore,
message=f"While computing context for event={event_id}, not attempting to pull missing prev_event={prevs_to_ignore[0]} because we already tried to pull recently (backing off).",
event_ids=prevs_with_pull_backoff.keys(),
message=(
f"While computing context for event={event_id}, not attempting to "
f"pull missing prev_events={list(prevs_with_pull_backoff.keys())} "
"because we already tried to pull recently (backing off)."
),
retry_after_ms=(
max(prevs_with_pull_backoff.values()) - self._clock.time_msec()
),
)

if not missing_prevs:
Expand Down
35 changes: 21 additions & 14 deletions synapse/storage/databases/main/event_federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -1544,7 +1544,7 @@ async def get_event_ids_to_not_pull_from_backoff(
self,
room_id: str,
event_ids: Collection[str],
) -> List[str]:
) -> Dict[str, int]:
"""
Filter down the events to ones that we've failed to pull before recently. Uses
exponential backoff.
Expand All @@ -1554,7 +1554,8 @@ async def get_event_ids_to_not_pull_from_backoff(
event_ids: A list of events to filter down

Returns:
List of event_ids that should not be attempted to be pulled
A dictionary of event_ids that should not be attempted to be pulled and the
next timestamp at which we may try pulling them again.
"""
event_failed_pull_attempts = await self.db_pool.simple_select_many_batch(
table="event_failed_pull_attempts",
Expand All @@ -1570,22 +1571,28 @@ async def get_event_ids_to_not_pull_from_backoff(
)

current_time = self._clock.time_msec()
return [
event_failed_pull_attempt["event_id"]
for event_failed_pull_attempt in event_failed_pull_attempts

event_ids_with_backoff = {}
for event_failed_pull_attempt in event_failed_pull_attempts:
event_id = event_failed_pull_attempt["event_id"]
# Exponential back-off (up to the upper bound) so we don't try to
# pull the same event over and over. ex. 2hr, 4hr, 8hr, 16hr, etc.
if current_time
< event_failed_pull_attempt["last_attempt_ts"]
+ (
2
** min(
event_failed_pull_attempt["num_attempts"],
BACKFILL_EVENT_EXPONENTIAL_BACKOFF_MAXIMUM_DOUBLING_STEPS,
backoff_end_time = (
event_failed_pull_attempt["last_attempt_ts"]
+ (
2
** min(
event_failed_pull_attempt["num_attempts"],
BACKFILL_EVENT_EXPONENTIAL_BACKOFF_MAXIMUM_DOUBLING_STEPS,
)
)
* BACKFILL_EVENT_EXPONENTIAL_BACKOFF_STEP_MILLISECONDS
)
* BACKFILL_EVENT_EXPONENTIAL_BACKOFF_STEP_MILLISECONDS
]

if current_time < backoff_end_time: # `backoff_end_time` is exclusive
event_ids_with_backoff[event_id] = backoff_end_time

return event_ids_with_backoff

async def get_missing_events(
self,
Expand Down
13 changes: 9 additions & 4 deletions tests/storage/test_event_federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -1143,19 +1143,24 @@ def test_get_event_ids_to_not_pull_from_backoff(self) -> None:
tok = self.login("alice", "test")
room_id = self.helper.create_room_as(room_creator=user_id, tok=tok)

failure_time = self.clock.time_msec()
self.get_success(
self.store.record_event_failed_pull_attempt(
room_id, "$failed_event_id", "fake cause"
)
)

event_ids_to_backoff = self.get_success(
event_ids_with_backoff = self.get_success(
self.store.get_event_ids_to_not_pull_from_backoff(
room_id=room_id, event_ids=["$failed_event_id", "$normal_event_id"]
)
)

self.assertEqual(event_ids_to_backoff, ["$failed_event_id"])
self.assertEqual(
event_ids_with_backoff,
# We expect a 2^1 hour backoff after a single failed attempt.
{"$failed_event_id": failure_time + 2 * 60 * 60 * 1000},
)

def test_get_event_ids_to_not_pull_from_backoff_retry_after_backoff_duration(
self,
Expand All @@ -1179,14 +1184,14 @@ def test_get_event_ids_to_not_pull_from_backoff_retry_after_backoff_duration(
# attempt (2^1 hours).
self.reactor.advance(datetime.timedelta(hours=2).total_seconds())

event_ids_to_backoff = self.get_success(
event_ids_with_backoff = self.get_success(
self.store.get_event_ids_to_not_pull_from_backoff(
room_id=room_id, event_ids=["$failed_event_id", "$normal_event_id"]
)
)
# Since this function only returns events we should backoff from, time has
# elapsed past the backoff range so there is no events to backoff from.
self.assertEqual(event_ids_to_backoff, [])
self.assertEqual(event_ids_with_backoff, {})


@attr.s(auto_attribs=True)
Expand Down