Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Reduce federation replication traffic #2115

Merged
merged 9 commits into from
Apr 12, 2017
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion synapse/app/federation_sender.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
from synapse.replication.slave.storage.events import SlavedEventStore
from synapse.replication.slave.storage.receipts import SlavedReceiptsStore
from synapse.replication.slave.storage.registration import SlavedRegistrationStore
from synapse.replication.slave.storage.presence import SlavedPresenceStore
from synapse.replication.slave.storage.transactions import TransactionStore
from synapse.replication.slave.storage.devices import SlavedDeviceStore
from synapse.replication.tcp.client import ReplicationClientHandler
Expand Down Expand Up @@ -55,7 +56,7 @@

class FederationSenderSlaveStore(
SlavedDeviceInboxStore, TransactionStore, SlavedReceiptsStore, SlavedEventStore,
SlavedRegistrationStore, SlavedDeviceStore,
SlavedRegistrationStore, SlavedDeviceStore, SlavedPresenceStore,
):
def __init__(self, db_conn, hs):
super(FederationSenderSlaveStore, self).__init__(db_conn, hs)
Expand Down
21 changes: 4 additions & 17 deletions synapse/app/synchrotron.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from synapse.config._base import ConfigError
from synapse.config.homeserver import HomeServerConfig
from synapse.config.logger import setup_logging
from synapse.handlers.presence import PresenceHandler
from synapse.handlers.presence import PresenceHandler, get_interested_parties
from synapse.http.site import SynapseSite
from synapse.http.server import JsonResource
from synapse.metrics.resource import MetricsResource, METRICS_PREFIX
Expand All @@ -44,7 +44,7 @@
from synapse.server import HomeServer
from synapse.storage.client_ips import ClientIpStore
from synapse.storage.engines import create_engine
from synapse.storage.presence import PresenceStore, UserPresenceState
from synapse.storage.presence import UserPresenceState
from synapse.storage.roommember import RoomMemberStore
from synapse.util.httpresourcetree import create_resource_tree
from synapse.util.logcontext import LoggingContext, PreserveLoggingContext, preserve_fn
Expand Down Expand Up @@ -89,16 +89,6 @@ class SynchrotronSlavedStore(
RoomMemberStore.__dict__["did_forget"]
)

# XXX: This is a bit broken because we don't persist the accepted list in a
# way that can be replicated. This means that we don't have a way to
# invalidate the cache correctly.
get_presence_list_accepted = PresenceStore.__dict__[
"get_presence_list_accepted"
]
get_presence_list_observers_accepted = PresenceStore.__dict__[
"get_presence_list_observers_accepted"
]


UPDATE_SYNCING_USERS_MS = 10 * 1000

Expand Down Expand Up @@ -172,7 +162,6 @@ def set_state(self, user, state, ignore_status_msg=False):

get_states = PresenceHandler.get_states.__func__
get_state = PresenceHandler.get_state.__func__
_get_interested_parties = PresenceHandler._get_interested_parties.__func__
current_state_for_users = PresenceHandler.current_state_for_users.__func__

def user_syncing(self, user_id, affect_presence):
Expand Down Expand Up @@ -206,10 +195,8 @@ def _user_syncing():

@defer.inlineCallbacks
def notify_from_replication(self, states, stream_id):
parties = yield self._get_interested_parties(
states, calculate_remote_hosts=False
)
room_ids_to_states, users_to_states, _ = parties
parties = yield get_interested_parties(self.store, states)
room_ids_to_states, users_to_states = parties

self.notifier.on_new_event(
"presence_key", stream_id, rooms=room_ids_to_states.keys(),
Expand Down
69 changes: 34 additions & 35 deletions synapse/federation/send_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,18 +53,19 @@ def __init__(self, hs):
self.server_name = hs.hostname
self.clock = hs.get_clock()
self.notifier = hs.get_notifier()
self.is_mine_id = hs.is_mine_id

self.presence_map = {}
self.presence_changed = sorteddict()
self.presence_map = {} # Pending presence map user_id -> UserPresenceState
self.presence_changed = sorteddict() # Stream position -> user_id

self.keyed_edu = {}
self.keyed_edu_changed = sorteddict()
self.keyed_edu = {} # (destination, key) -> EDU
self.keyed_edu_changed = sorteddict() # stream position -> (destination, key)

self.edus = sorteddict()
self.edus = sorteddict() # stream position -> Edu

self.failures = sorteddict()
self.failures = sorteddict() # stream position -> (destination, Failure)

self.device_messages = sorteddict()
self.device_messages = sorteddict() # stream position -> destination

self.pos = 1
self.pos_time = sorteddict()
Expand Down Expand Up @@ -120,7 +121,9 @@ def _clear_queue_before_pos(self, position_to_delete):
del self.presence_changed[key]

user_ids = set(
user_id for uids in self.presence_changed.values() for _, user_id in uids
user_id
for uids in self.presence_changed.itervalues()
for user_id in uids
)

to_del = [
Expand Down Expand Up @@ -187,18 +190,20 @@ def send_edu(self, destination, edu_type, content, key=None):

self.notifier.on_new_replication_data()

def send_presence(self, destination, states):
"""As per TransactionQueue"""
def send_presence(self, states):
"""As per TransactionQueue

Args:
states (list(UserPresenceState))
"""
pos = self._next_pos()

self.presence_map.update({
state.user_id: state
for state in states
})
# We only want to send presence for our own users, so lets always just
# filter here just in case.
local_states = filter(lambda s: self.is_mine_id(s.user_id), states)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you comment to explain why we are filtering here. I guess it's so that we only send out presence updates for our own users, but also: why would we get as far as here with an update for someone else?


self.presence_changed[pos] = [
(destination, state.user_id) for state in states
]
self.presence_map.update({state.user_id: state for state in local_states})
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you document what presence_map and presence_changed mean in the constructor so I can understand if this is a sane thing?

self.presence_changed[pos] = [state.user_id for state in local_states]

self.notifier.on_new_replication_data()

Expand Down Expand Up @@ -251,15 +256,14 @@ def get_replication_rows(self, from_token, to_token, limit, federation_ack=None)
keys = self.presence_changed.keys()
i = keys.bisect_right(from_token)
j = keys.bisect_right(to_token) + 1
dest_user_ids = set(
(pos, dest_user_id)
dest_user_ids = [
(pos, user_id)
for pos in keys[i:j]
for dest_user_id in self.presence_changed[pos]
)
for user_id in self.presence_changed[pos]
]

for (key, (dest, user_id)) in dest_user_ids:
for (key, user_id) in dest_user_ids:
rows.append((key, PresenceRow(
destination=dest,
state=self.presence_map[user_id],
)))

Expand Down Expand Up @@ -354,26 +358,21 @@ def add_to_buffer(self, buff):


class PresenceRow(BaseFederationRow, namedtuple("PresenceRow", (
"destination", # str
"state", # UserPresenceState
))):
TypeId = "p"

@staticmethod
def from_data(data):
return PresenceRow(
destination=data["destination"],
state=UserPresenceState.from_dict(data["state"])
state=UserPresenceState.from_dict(data)
)

def to_data(self):
return {
"destination": self.destination,
"state": self.state.as_dict()
}
return self.state.as_dict()

def add_to_buffer(self, buff):
buff.presence.setdefault(self.destination, []).append(self.state)
buff.presence.append(self.state)


class KeyedEduRow(BaseFederationRow, namedtuple("KeyedEduRow", (
Expand Down Expand Up @@ -469,7 +468,7 @@ def add_to_buffer(self, buff):


ParsedFederationStreamData = namedtuple("ParsedFederationStreamData", (
"presence", # dict of destination -> [UserPresenceState]
"presence", # list(UserPresenceState)
"keyed_edus", # dict of destination -> { key -> Edu }
"edus", # dict of destination -> [Edu]
"failures", # dict of destination -> [failures]
Expand All @@ -491,7 +490,7 @@ def process_rows_for_federation(transaction_queue, rows):
# them into the appropriate collection and then send them off.

buff = ParsedFederationStreamData(
presence={},
presence=[],
keyed_edus={},
edus={},
failures={},
Expand All @@ -508,8 +507,8 @@ def process_rows_for_federation(transaction_queue, rows):
parsed_row = RowType.from_data(row.data)
parsed_row.add_to_buffer(buff)

for destination, states in buff.presence.iteritems():
transaction_queue.send_presence(destination, states)
if buff.presence:
transaction_queue.send_presence(buff.presence)

for destination, edu_map in buff.keyed_edus.iteritems():
for key, edu in edu_map.items():
Expand Down
85 changes: 76 additions & 9 deletions synapse/federation/transaction_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,11 @@

from synapse.api.errors import HttpResponseException
from synapse.util.async import run_on_reactor
from synapse.util.logcontext import preserve_context_over_fn
from synapse.util.logcontext import preserve_context_over_fn, preserve_fn
from synapse.util.retryutils import NotRetryingDestination, get_retry_limiter
from synapse.util.metrics import measure_func
from synapse.types import get_domain_from_id
from synapse.handlers.presence import format_user_presence_state
from synapse.handlers.presence import format_user_presence_state, get_interested_remotes
import synapse.metrics

import logging
Expand Down Expand Up @@ -78,7 +78,18 @@ def __init__(self, hs):
self.pending_edus_by_dest = edus = {}

# Presence needs to be separate as we send single aggragate EDUs
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not quite sure what this comment means, any more


# Map of user_id -> UserPresenceState for all the pending presence
# to be sent out by user_id. Entries here get processed and put in
# pending_presence_by_dest
self.pending_presence = {}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could you document what all three of these fields mean and their types?

# Map of destination -> user_id -> UserPresenceState of pending presence
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could do with a blank line here

# to be sent to each destinations
self.pending_presence_by_dest = presence = {}

# Pending EDUs by their "key". Keyed EDUs are EDUs that get clobbered
# based on their key (e.g. typing events by room_id)
# Map of destination -> (edu_type, key) -> Edu
self.pending_edus_keyed_by_dest = edus_keyed = {}

metrics.register_callback(
Expand Down Expand Up @@ -113,6 +124,8 @@ def __init__(self, hs):
self._is_processing = False
self._last_poked_id = -1

self._processing_pending_presence = False

def can_send_to(self, destination):
"""Can we send messages to the given server?

Expand Down Expand Up @@ -224,17 +237,71 @@ def _send_pdu(self, pdu, destinations):
self._attempt_new_transaction, destination
)

def send_presence(self, destination, states):
if not self.can_send_to(destination):
return
@preserve_fn # the caller should not yield on this
@defer.inlineCallbacks
def send_presence(self, states):
"""Send the new presence states to the appropriate destinations.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"Start sending" ... "if we are not already sending updates" or something

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you didn't like my suggestion?


self.pending_presence_by_dest.setdefault(destination, {}).update({
This actually queues up the presence states ready for sending and
triggers a background task to process them and send out the transactions.

Args:
states (list(UserPresenceState))
"""

# First we queue up the new presence by user ID, so multiple presence
# updates in quick successtion are correctly handled
# We only want to send presence for our own users, so lets always just
# filter here just in case.
self.pending_presence.update({
state.user_id: state for state in states
if self.is_mine_id(state.user_id)
})

preserve_context_over_fn(
self._attempt_new_transaction, destination
)
# We then handle the new pending presence in batches, first figuring
# out the destinations we need to send each state to and then poking it
# to attempt a new transaction. We linearize this so that we don't
# accidentally mess up the ordering and send multiple presence updates
# in the wrong order
if self._processing_pending_presence:
return

self._processing_pending_presence = True
try:
while True:
states = self.pending_presence
self.pending_presence = {}

if not states:
break

yield self._process_presence_inner(states)
finally:
self._processing_pending_presence = False

@measure_func("txnqueue._process_presence")
@defer.inlineCallbacks
def _process_presence_inner(self, states):
"""Given a list of states populate self.pending_presence_by_dest and
poke to send a new transaction to each destination

Args:
states (list(UserPresenceState))
"""
hosts_and_states = yield get_interested_remotes(self.store, states)

for destinations, states in hosts_and_states:
for destination in destinations:
if not self.can_send_to(destination):
continue

self.pending_presence_by_dest.setdefault(
destination, {}
).update({
state.user_id: state for state in states
})

preserve_fn(self._attempt_new_transaction)(destination)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The above is copied verbatim from the old send_presence function


def send_edu(self, destination, edu_type, content, key=None):
edu = Edu(
Expand Down
Loading