Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Fix presence timeouts when synchrotron restarts. #6212

Merged
merged 3 commits into from
Oct 18, 2019
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/6212.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix bug where presence would not get timed out correctly if a synchrotron worker is used and restarted.
13 changes: 9 additions & 4 deletions synapse/handlers/presence.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

import logging
from contextlib import contextmanager
from typing import Dict, Set

from six import iteritems, itervalues

Expand Down Expand Up @@ -179,8 +180,9 @@ def __init__(self, hs):
# we assume that all the sync requests on that process have stopped.
# Stored as a dict from process_id to set of user_id, and a dict of
# process_id to millisecond timestamp last updated.
self.external_process_to_current_syncs = {}
self.external_process_last_updated_ms = {}
self.external_process_to_current_syncs = {} # type: Dict[int, Set[str]]
self.external_process_last_updated_ms = {} # type: Dict[int, int]

self.external_sync_linearizer = Linearizer(name="external_sync_linearizer")

# Start a LoopingCall in 30s that fires every 5s.
Expand Down Expand Up @@ -349,10 +351,13 @@ def _handle_timeouts(self):
if now - last_update > EXTERNAL_PROCESS_EXPIRY
]
for process_id in expired_process_ids:
# For each expired process drop tracking info and check the users
# that were syncing on that process to see if they need to be timed
# out.
users_to_check.update(
self.external_process_last_updated_ms.pop(process_id, ())
self.external_process_to_current_syncs.pop(process_id, ())
)
self.external_process_last_update.pop(process_id)
self.external_process_last_updated_ms.pop(process_id)

states = [
self.user_to_current_state.get(user_id, UserPresenceState.default(user_id))
Expand Down
39 changes: 39 additions & 0 deletions tests/handlers/test_presence.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from synapse.events import room_version_to_event_format
from synapse.events.builder import EventBuilder
from synapse.handlers.presence import (
EXTERNAL_PROCESS_EXPIRY,
FEDERATION_PING_INTERVAL,
FEDERATION_TIMEOUT,
IDLE_TIMER,
Expand Down Expand Up @@ -413,6 +414,44 @@ def test_last_active(self):
self.assertEquals(state, new_state)


class PresenceHandlerTestCase(unittest.HomeserverTestCase):
def prepare(self, reactor, clock, hs):
self.presence_handler = hs.get_presence_handler()
self.clock = hs.get_clock()

def test_external_process_timeout(self):
"""Test that if an external process doesn't update the records for a while
we time out their syncing users presence.
"""
process_id = 1
user_id = "@test:server"

# Notify handler that a user is now syncing.
self.get_success(
self.presence_handler.update_external_syncs_row(
process_id, user_id, True, self.clock.time_msec()
)
)

# Check that if we wait a while without telling the handler the user has
# stopped syncing that their presence state doesn't get timed out.
self.reactor.advance(EXTERNAL_PROCESS_EXPIRY / 2)

state = self.get_success(
self.presence_handler.get_state(UserID.from_string(user_id))
)
self.assertEqual(state.state, PresenceState.ONLINE)

# Check that if the external process timeout fires, then the syncing
# user gets timed out
self.reactor.advance(EXTERNAL_PROCESS_EXPIRY)

state = self.get_success(
self.presence_handler.get_state(UserID.from_string(user_id))
)
self.assertEqual(state.state, PresenceState.OFFLINE)


class PresenceJoinTestCase(unittest.HomeserverTestCase):
"""Tests remote servers get told about presence of users in the room when
they join and when new local users join.
Expand Down