Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Clarifications and small fixes to to-device related code #11247

Merged
merged 11 commits into from
Nov 9, 2021
1 change: 1 addition & 0 deletions changelog.d/11247.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Clean up code relating to to-device messages and sending ephemeral events to application services.
24 changes: 20 additions & 4 deletions synapse/handlers/appservice.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ def notify_interested_services_ephemeral(
self,
stream_key: str,
new_token: Union[int, RoomStreamToken],
users: Optional[Collection[Union[str, UserID]]] = None,
users: Collection[Union[str, UserID]],
) -> None:
"""
This is called by the notifier in the background when an ephemeral event is handled
Expand All @@ -203,7 +203,9 @@ def notify_interested_services_ephemeral(
value for `stream_key` will cause this function to return early.

Ephemeral events will only be pushed to appservices that have opted into
them.
receiving them by setting `push_ephemeral` to true in their registration
file. Note that while MSC2409 is experimental, this option is called
`de.sorunome.msc2409.push_ephemeral`.

Appservices will only receive ephemeral events that fall within their
registered user and room namespaces.
Expand All @@ -214,6 +216,7 @@ def notify_interested_services_ephemeral(
if not self.notify_appservices:
return

# Ignore any unsupported streams
if stream_key not in ("typing_key", "receipt_key", "presence_key"):
return

Expand All @@ -230,18 +233,25 @@ def notify_interested_services_ephemeral(
# Additional context: https://github.com/matrix-org/synapse/pull/11137
assert isinstance(new_token, int)

# Check whether there are any appservices which have registered to receive
# ephemeral events.
#
# Note that whether these events are actually relevant to these appservices
# is decided later on.
services = [
service
for service in self.store.get_app_services()
if service.supports_ephemeral
]
if not services:
# Bail out early if none of the target appservices have explicitly registered
# to receive these ephemeral events.
return

# We only start a new background process if necessary rather than
# optimistically (to cut down on overhead).
self._notify_interested_services_ephemeral(
services, stream_key, new_token, users or []
services, stream_key, new_token, users
)

@wrap_as_background_process("notify_interested_services_ephemeral")
Expand All @@ -252,7 +262,7 @@ async def _notify_interested_services_ephemeral(
new_token: int,
users: Collection[Union[str, UserID]],
) -> None:
logger.debug("Checking interested services for %s" % (stream_key))
logger.debug("Checking interested services for %s", stream_key)
with Measure(self.clock, "notify_interested_services_ephemeral"):
for service in services:
if stream_key == "typing_key":
Expand Down Expand Up @@ -345,6 +355,9 @@ async def _handle_receipts(

Args:
service: The application service to check for which events it should receive.
new_token: A receipts event stream token. Purely used to double-check that the
from_token we pull from the database isn't greater than or equal to this
token. Prevents accidentally duplicating work.

Returns:
A list of JSON dictionaries containing data derived from the read receipts that
Expand Down Expand Up @@ -382,6 +395,9 @@ async def _handle_presence(
Args:
service: The application service that ephemeral events are being sent to.
users: The users that should receive the presence update.
new_token: A presence update stream token. Purely used to double-check that the
from_token we pull from the database isn't greater than or equal to this
token. Prevents accidentally duplicating work.

Returns:
A list of json dictionaries containing data derived from the presence events
Expand Down
31 changes: 27 additions & 4 deletions synapse/handlers/devicemessage.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,13 @@ def __init__(self, hs: "HomeServer"):
)

async def on_direct_to_device_edu(self, origin: str, content: JsonDict) -> None:
"""
Handle receiving to-device messages from remote homeservers.

Args:
origin: The remote homeserver.
content: The JSON dictionary containing the to-device messages.
"""
local_messages = {}
sender_user_id = content["sender"]
if origin != get_domain_from_id(sender_user_id):
Expand Down Expand Up @@ -135,12 +142,16 @@ async def on_direct_to_device_edu(self, origin: str, content: JsonDict) -> None:
message_type, sender_user_id, by_device
)

stream_id = await self.store.add_messages_from_remote_to_device_inbox(
# Add messages to the database.
# Retrieve the stream id of the last-processed to-device message.
last_stream_id = await self.store.add_messages_from_remote_to_device_inbox(
origin, message_id, local_messages
)

# Notify listeners that there are new to-device messages to process,
# handing them the latest stream id.
self.notifier.on_new_event(
"to_device_key", stream_id, users=local_messages.keys()
"to_device_key", last_stream_id, users=local_messages.keys()
)

async def _check_for_unknown_devices(
Expand Down Expand Up @@ -195,6 +206,14 @@ async def send_device_message(
message_type: str,
messages: Dict[str, Dict[str, JsonDict]],
) -> None:
"""
Handle a request from a user to send to-device message(s).

Args:
requester: The user that is sending the to-device messages.
message_type: The type of to-device messages that are being sent.
messages: A dictionary containing recipients mapped to messages intended for them.
"""
sender_user_id = requester.user.to_string()

message_id = random_string(16)
Expand Down Expand Up @@ -257,12 +276,16 @@ async def send_device_message(
"org.matrix.opentracing_context": json_encoder.encode(context),
}

stream_id = await self.store.add_messages_to_device_inbox(
# Add messages to the database.
# Retrieve the stream id of the last-processed to-device message.
last_stream_id = await self.store.add_messages_to_device_inbox(
local_messages, remote_edu_contents
)

# Notify listeners that there are new to-device messages to process,
# handing them the latest stream id.
self.notifier.on_new_event(
"to_device_key", stream_id, users=local_messages.keys()
"to_device_key", last_stream_id, users=local_messages.keys()
)

if self.federation_sender:
Expand Down
8 changes: 4 additions & 4 deletions synapse/storage/databases/main/appservice.py
Original file line number Diff line number Diff line change
Expand Up @@ -412,16 +412,16 @@ def get_type_stream_id_for_appservice_txn(txn):
)

async def set_type_stream_id_for_appservice(
self, service: ApplicationService, type: str, pos: Optional[int]
self, service: ApplicationService, stream_type: str, pos: Optional[int]
) -> None:
if type not in ("read_receipt", "presence"):
if stream_type not in ("read_receipt", "presence"):
raise ValueError(
"Expected type to be a valid application stream id type, got %s"
% (type,)
% (stream_type,)
)

def set_type_stream_id_for_appservice_txn(txn):
stream_id_type = "%s_stream_id" % type
stream_id_type = "%s_stream_id" % stream_type
txn.execute(
"UPDATE application_services_state SET %s = ? WHERE as_id=?"
% stream_id_type,
Expand Down
23 changes: 20 additions & 3 deletions synapse/storage/databases/main/deviceinbox.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,10 @@ async def get_new_messages_for_device(
limit: The maximum number of messages to retrieve.

Returns:
A list of messages for the device and where in the stream the messages got to.
A tuple containing:
* A list of messages for the device.
* The max stream token of these messages. There may be more to retrieve
if the given limit was reached.
"""
has_changed = self._device_inbox_stream_cache.has_entity_changed(
user_id, last_stream_id
Expand All @@ -153,12 +156,19 @@ def get_new_messages_for_device_txn(txn):
txn.execute(
sql, (user_id, device_id, last_stream_id, current_stream_id, limit)
)

messages = []
stream_pos = current_stream_id
clokep marked this conversation as resolved.
Show resolved Hide resolved

for row in txn:
stream_pos = row[0]
messages.append(db_to_json(row[1]))

# If the limit was not reached we know that there's no more data for this
# user/device pair up to current_stream_id.
if len(messages) < limit:
stream_pos = current_stream_id

return messages, stream_pos

return await self.db_pool.runInteraction(
Expand Down Expand Up @@ -260,13 +270,20 @@ def get_new_messages_for_remote_destination_txn(txn):
" LIMIT ?"
)
txn.execute(sql, (destination, last_stream_id, current_stream_id, limit))

messages = []
anoadragon453 marked this conversation as resolved.
Show resolved Hide resolved
stream_pos = current_stream_id

for row in txn:
stream_pos = row[0]
messages.append(db_to_json(row[1]))

# If the limit was not reached we know that there's no more data for this
# user/device pair up to current_stream_id.
if len(messages) < limit:
log_kv({"message": "Set stream position to current position"})
stream_pos = current_stream_id

return messages, stream_pos

return await self.db_pool.runInteraction(
Expand Down Expand Up @@ -372,8 +389,8 @@ async def add_messages_to_device_inbox(
"""Used to send messages from this server.

Args:
local_messages_by_user_and_device:
Dictionary of user_id to device_id to message.
local_messages_by_user_then_device:
Dictionary of recipient user_id to recipient device_id to message.
remote_messages_by_destination:
Dictionary of destination server_name to the EDU JSON to send.

Expand Down
8 changes: 6 additions & 2 deletions tests/handlers/test_appservice.py
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,9 @@ def test_notify_interested_services_ephemeral(self):
make_awaitable(([event], None))
)

self.handler.notify_interested_services_ephemeral("receipt_key", 580)
self.handler.notify_interested_services_ephemeral(
"receipt_key", 580, ["@fakerecipient:example.com"]
)
self.mock_scheduler.submit_ephemeral_events_for_as.assert_called_once_with(
interested_service, [event]
)
Expand Down Expand Up @@ -300,7 +302,9 @@ def test_notify_interested_services_ephemeral_out_of_order(self):
make_awaitable(([event], None))
)

self.handler.notify_interested_services_ephemeral("receipt_key", 579)
self.handler.notify_interested_services_ephemeral(
"receipt_key", 580, ["@fakerecipient:example.com"]
)
self.mock_scheduler.submit_ephemeral_events_for_as.assert_not_called()

def _mkservice(self, is_interested, protocols=None):
Expand Down