Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Reduce federation replication traffic #2115

Merged
merged 9 commits into from
Apr 12, 2017
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 2 additions & 13 deletions synapse/app/federation_sender.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,11 @@
from synapse.replication.slave.storage.events import SlavedEventStore
from synapse.replication.slave.storage.receipts import SlavedReceiptsStore
from synapse.replication.slave.storage.registration import SlavedRegistrationStore
from synapse.replication.slave.storage.presence import SlavedPresenceStore
from synapse.replication.slave.storage.transactions import TransactionStore
from synapse.replication.slave.storage.devices import SlavedDeviceStore
from synapse.replication.tcp.client import ReplicationClientHandler
from synapse.storage.engines import create_engine
from synapse.storage.presence import PresenceStore
from synapse.util.async import Linearizer
from synapse.util.httpresourcetree import create_resource_tree
from synapse.util.logcontext import LoggingContext, PreserveLoggingContext, preserve_fn
Expand All @@ -56,7 +56,7 @@

class FederationSenderSlaveStore(
SlavedDeviceInboxStore, TransactionStore, SlavedReceiptsStore, SlavedEventStore,
SlavedRegistrationStore, SlavedDeviceStore,
SlavedRegistrationStore, SlavedDeviceStore, SlavedPresenceStore,
):
def __init__(self, db_conn, hs):
super(FederationSenderSlaveStore, self).__init__(db_conn, hs)
Expand All @@ -81,17 +81,6 @@ def _get_federation_out_pos(self, db_conn):

return rows[0][0] if rows else -1

# XXX: This is a bit broken because we don't persist the accepted list in a
# way that can be replicated. This means that we don't have a way to
# invalidate the cache correctly.
# This is fine since in practice nobody uses the presence list stuff...
get_presence_list_accepted = PresenceStore.__dict__[
"get_presence_list_accepted"
]
get_presence_list_observers_accepted = PresenceStore.__dict__[
"get_presence_list_observers_accepted"
]


class FederationSenderServer(HomeServer):
def get_db_conn(self, run_new_connection=True):
Expand Down
17 changes: 3 additions & 14 deletions synapse/app/synchrotron.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from synapse.config._base import ConfigError
from synapse.config.homeserver import HomeServerConfig
from synapse.config.logger import setup_logging
from synapse.handlers.presence import PresenceHandler
from synapse.handlers.presence import PresenceHandler, get_interested_parties
from synapse.http.site import SynapseSite
from synapse.http.server import JsonResource
from synapse.metrics.resource import MetricsResource, METRICS_PREFIX
Expand All @@ -44,7 +44,7 @@
from synapse.server import HomeServer
from synapse.storage.client_ips import ClientIpStore
from synapse.storage.engines import create_engine
from synapse.storage.presence import PresenceStore, UserPresenceState
from synapse.storage.presence import UserPresenceState
from synapse.storage.roommember import RoomMemberStore
from synapse.util.httpresourcetree import create_resource_tree
from synapse.util.logcontext import LoggingContext, PreserveLoggingContext, preserve_fn
Expand Down Expand Up @@ -89,16 +89,6 @@ class SynchrotronSlavedStore(
RoomMemberStore.__dict__["did_forget"]
)

# XXX: This is a bit broken because we don't persist the accepted list in a
# way that can be replicated. This means that we don't have a way to
# invalidate the cache correctly.
get_presence_list_accepted = PresenceStore.__dict__[
"get_presence_list_accepted"
]
get_presence_list_observers_accepted = PresenceStore.__dict__[
"get_presence_list_observers_accepted"
]


UPDATE_SYNCING_USERS_MS = 10 * 1000

Expand Down Expand Up @@ -172,7 +162,6 @@ def set_state(self, user, state, ignore_status_msg=False):

get_states = PresenceHandler.get_states.__func__
get_state = PresenceHandler.get_state.__func__
_get_interested_parties = PresenceHandler._get_interested_parties.__func__
current_state_for_users = PresenceHandler.current_state_for_users.__func__

def user_syncing(self, user_id, affect_presence):
Expand Down Expand Up @@ -206,7 +195,7 @@ def _user_syncing():

@defer.inlineCallbacks
def notify_from_replication(self, states, stream_id):
parties = yield self._get_interested_parties(states)
parties = yield get_interested_parties(self.store, states)
room_ids_to_states, users_to_states = parties

self.notifier.on_new_event(
Expand Down
22 changes: 14 additions & 8 deletions synapse/federation/send_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,17 +55,17 @@ def __init__(self, hs):
self.notifier = hs.get_notifier()
self.is_mine_id = hs.is_mine_id

self.presence_map = {}
self.presence_changed = sorteddict()
self.presence_map = {} # Pending presence map user_id -> UserPresenceState
self.presence_changed = sorteddict() # Stream position -> user_id

self.keyed_edu = {}
self.keyed_edu_changed = sorteddict()
self.keyed_edu = {} # (destination, key) -> EDU
self.keyed_edu_changed = sorteddict() # stream position -> (destination, key)

self.edus = sorteddict()
self.edus = sorteddict() # stream position -> Edu

self.failures = sorteddict()
self.failures = sorteddict() # stream position -> (destination, Failure)

self.device_messages = sorteddict()
self.device_messages = sorteddict() # stream position -> destination

self.pos = 1
self.pos_time = sorteddict()
Expand Down Expand Up @@ -191,9 +191,15 @@ def send_edu(self, destination, edu_type, content, key=None):
self.notifier.on_new_replication_data()

def send_presence(self, states):
"""As per TransactionQueue"""
"""As per TransactionQueue

Args:
states (list(UserPresenceState))
"""
pos = self._next_pos()

# We only want to send presence for our own users, so lets always just
# filter here just in case.
local_states = filter(lambda s: self.is_mine_id(s.user_id), states)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you comment to explain why we are filtering here. I guess it's so that we only send out presence updates for our own users, but also: why would we get as far as here with an update for someone else?


self.presence_map.update({state.user_id: state for state in local_states})
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you document what presence_map and presence_changed mean in the constructor so I can understand if this is a sane thing?

Expand Down
58 changes: 22 additions & 36 deletions synapse/federation/transaction_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
from synapse.util.retryutils import NotRetryingDestination, get_retry_limiter
from synapse.util.metrics import measure_func
from synapse.types import get_domain_from_id
from synapse.handlers.presence import format_user_presence_state
from synapse.handlers.presence import format_user_presence_state, get_interested_remotes
import synapse.metrics

import logging
Expand Down Expand Up @@ -78,8 +78,18 @@ def __init__(self, hs):
self.pending_edus_by_dest = edus = {}

# Presence needs to be separate as we send single aggragate EDUs
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not quite sure what this comment means, any more


# Map of user_id -> UserPresenceState for all the pending presence
# to be sent out by user_id. Entries here get processed and put in
# pending_presence_by_dest
self.pending_presence = {}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could you document what all three of these fields mean and their types?

# Map of destination -> user_id -> UserPresenceState of pending presence
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could do with a blank line here

# to be sent to each destinations
self.pending_presence_by_dest = presence = {}

# Pending EDUs by their "key". Keyed EDUs are EDUs that get clobbered
# based on their key (e.g. typing events by room_id)
# Map of destination -> (edu_type, key) -> Edu
self.pending_edus_keyed_by_dest = edus_keyed = {}

metrics.register_callback(
Expand Down Expand Up @@ -227,18 +237,26 @@ def _send_pdu(self, pdu, destinations):
self._attempt_new_transaction, destination
)

@preserve_fn
@preserve_fn # the caller should not yield on this
@defer.inlineCallbacks
def send_presence(self, states):
"""Send the new presence states to the appropriate destinations.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"Start sending" ... "if we are not already sending updates" or something

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you didn't like my suggestion?


This actually queues up the presence states ready for sending and
triggers a background task to process them and send out the transactions.

Args:
states (list(UserPresenceState))
"""

# First we queue up the new presence by user ID, so multiple presence
# updates in quick successtion are correctly handled
self.pending_presence.update({state.user_id: state for state in states})
# We only want to send presence for our own users, so lets always just
# filter here just in case.
self.pending_presence.update({
state.user_id: state for state in states
if self.is_mine_id(state.user_id)
})

# We then handle the new pending presence in batches, first figuring
# out the destinations we need to send each state to and then poking it
Expand Down Expand Up @@ -270,40 +288,8 @@ def _process_presence_inner(self, states):
Args:
states (list(UserPresenceState))
"""
# First we look up the rooms each user is in (as well as any explicit
# subscriptions), then for each distinct room we look up the remote
# hosts in those rooms.
room_ids_to_states = {}
users_to_states = {}
for state in states.itervalues():
room_ids = yield self.store.get_rooms_for_user(state.user_id)
for room_id in room_ids:
room_ids_to_states.setdefault(room_id, []).append(state)

plist = yield self.store.get_presence_list_observers_accepted(
state.user_id,
)
for u in plist:
users_to_states.setdefault(u, []).append(state)

hosts_and_states = []
for room_id, states in room_ids_to_states.items():
local_states = filter(lambda s: self.is_mine_id(s.user_id), states)
if not local_states:
continue

hosts = yield self.store.get_hosts_in_room(room_id)
hosts_and_states.append((hosts, local_states))

for user_id, states in users_to_states.items():
local_states = filter(lambda s: self.is_mine_id(s.user_id), states)
if not local_states:
continue

host = get_domain_from_id(user_id)
hosts_and_states.append(([host], local_states))
hosts_and_states = yield get_interested_remotes(self.store, states)

# And now finally queue up new transactions
for destinations, states in hosts_and_states:
for destination in destinations:
if not self.can_send_to(destination):
Expand Down
108 changes: 77 additions & 31 deletions synapse/handlers/presence.py
Original file line number Diff line number Diff line change
Expand Up @@ -610,42 +610,14 @@ def current_state_for_users(self, user_ids):

defer.returnValue(states)

@defer.inlineCallbacks
def _get_interested_parties(self, states):
"""Given a list of states return which entities (rooms, users, servers)
are interested in the given states.

Returns:
2-tuple: `(room_ids_to_states, users_to_states)`,
with each item being a dict of `entity_name` -> `[UserPresenceState]`
"""
room_ids_to_states = {}
users_to_states = {}
for state in states:
room_ids = yield self.store.get_rooms_for_user(state.user_id)
for room_id in room_ids:
room_ids_to_states.setdefault(room_id, []).append(state)

plist = yield self.store.get_presence_list_observers_accepted(state.user_id)
for u in plist:
users_to_states.setdefault(u, []).append(state)

# Always notify self
users_to_states.setdefault(state.user_id, []).append(state)

# TODO: de-dup hosts_to_states, as a single host might have multiple
# of same presence

defer.returnValue((room_ids_to_states, users_to_states))

@defer.inlineCallbacks
def _persist_and_notify(self, states):
"""Persist states in the database, poke the notifier and send to
interested remote servers
"""
stream_id, max_token = yield self.store.update_presence(states)

parties = yield self._get_interested_parties(states)
parties = yield get_interested_parties(self.store, states)
room_ids_to_states, users_to_states = parties

self.notifier.on_new_event(
Expand All @@ -657,7 +629,7 @@ def _persist_and_notify(self, states):

@defer.inlineCallbacks
def notify_for_states(self, state, stream_id):
parties = yield self._get_interested_parties([state])
parties = yield get_interested_parties(self.store, [state])
room_ids_to_states, users_to_states = parties

self.notifier.on_new_event(
Expand All @@ -669,7 +641,7 @@ def _push_to_remotes(self, states):
"""Sends state updates to remote servers.

Args:
hosts_to_states (list): list(state)
hosts_to_states (list(UserPresenceState))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/hosts_to_states/states/. sorry for not spotting that one before

"""
self.federation.send_presence(states)

Expand Down Expand Up @@ -1318,3 +1290,77 @@ def handle_update(prev_state, new_state, is_mine, wheel_timer, now):
persist_and_notify = True

return new_state, persist_and_notify, federation_ping


@defer.inlineCallbacks
def get_interested_parties(store, states):
"""Given a list of states return which entities (rooms, users)
are interested in the given states.

Args:
states (list(UserPresenceState))

Returns:
2-tuple: `(room_ids_to_states, users_to_states)`,
with each item being a dict of `entity_name` -> `[UserPresenceState]`
"""
room_ids_to_states = {}
users_to_states = {}
for state in states:
room_ids = yield store.get_rooms_for_user(state.user_id)
for room_id in room_ids:
room_ids_to_states.setdefault(room_id, []).append(state)

plist = yield store.get_presence_list_observers_accepted(state.user_id)
for u in plist:
users_to_states.setdefault(u, []).append(state)

# Always notify self
users_to_states.setdefault(state.user_id, []).append(state)

defer.returnValue((room_ids_to_states, users_to_states))


@defer.inlineCallbacks
def get_interested_remotes(store, states):
"""Given a list of presence states figure out which remote servers
should be sent which.

All the presence states should be for local users only.

Args:
store (DataStore)
states (list(UserPresenceState))

Returns:
Deferred list of ([destinations], [UserPresenceState]), where for
each row the list of UserPresenceState should be sent to each
destination
"""
hosts_and_states = [] # Final result to return
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

now that you have an (excellent) description of the return value of the function, this probably doesn't really need a comment, but it's harmless enough


# First we look up the rooms each user is in (as well as any explicit
# subscriptions), then for each distinct room we look up the remote
# hosts in those rooms.
room_ids_to_states = {}
users_to_states = {}
for state in states.itervalues():
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we not use get_interested_parties here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Heh, for some reason I got it stuck in my head that they were different

room_ids = yield store.get_rooms_for_user(state.user_id)
for room_id in room_ids:
room_ids_to_states.setdefault(room_id, []).append(state)

plist = yield store.get_presence_list_observers_accepted(
state.user_id,
)
for u in plist:
users_to_states.setdefault(u, []).append(state)

for room_id, states in room_ids_to_states.items():
hosts = yield store.get_hosts_in_room(room_id)
hosts_and_states.append((hosts, states))

for user_id, states in users_to_states.items():
host = get_domain_from_id(user_id)
hosts_and_states.append(([host], states))

defer.returnValue(hosts_and_states)
10 changes: 10 additions & 0 deletions synapse/replication/slave/storage/presence.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,16 @@ def __init__(self, db_conn, hs):
_get_presence_for_user = PresenceStore.__dict__["_get_presence_for_user"]
get_presence_for_users = PresenceStore.__dict__["get_presence_for_users"]

# XXX: This is a bit broken because we don't persist the accepted list in a
# way that can be replicated. This means that we don't have a way to
# invalidate the cache correctly.
get_presence_list_accepted = PresenceStore.__dict__[
"get_presence_list_accepted"
]
get_presence_list_observers_accepted = PresenceStore.__dict__[
"get_presence_list_observers_accepted"
]

def get_current_presence_token(self):
return self._presence_id_gen.get_current_token()

Expand Down