Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Send to-device messages to application services #11215

Merged
merged 49 commits into from
Feb 1, 2022
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
49 commits
Select commit Hold shift + click to select a range
7fbfedb
Add experimental config option to send to-device messages to AS's
anoadragon453 Nov 4, 2021
b7a44d4
Add a new ephemeral AS handler for to_device message edus
anoadragon453 Nov 5, 2021
78bd5ea
Allow setting/getting stream id per appservice for to-device messages
anoadragon453 Nov 5, 2021
7899f82
Add database method to fetch to-device messages by user_ids from db
anoadragon453 Nov 5, 2021
103f410
Add a to_device_stream_id column to the application_services_state table
anoadragon453 Nov 5, 2021
e914f1d
Add tests
anoadragon453 Nov 12, 2021
2930fe6
Changelog
anoadragon453 Nov 5, 2021
f65846b
Make msc2409_to_device_messages_enabled private; remove unnecessary c…
anoadragon453 Nov 19, 2021
ce020c3
Move stream filter back into AppserviceHandler
anoadragon453 Nov 19, 2021
8f1183c
Broaden type hints; update comment
anoadragon453 Nov 19, 2021
401cb2b
Deduplicate ephemeral events to send conditional
anoadragon453 Nov 19, 2021
179dd5a
_handle_to_device -> _get_to_device_messages
anoadragon453 Nov 19, 2021
8b0bbc1
Rename ApplicationServiceEphemeralEventsTestCase
anoadragon453 Nov 19, 2021
31c4b40
Rename user1, user2 in tests to something more useful
anoadragon453 Nov 19, 2021
bd9d963
Simplify registration of appservices in tests
anoadragon453 Nov 19, 2021
8f8226a
Fix existing unit tests
anoadragon453 Nov 22, 2021
b4a4b45
rename set_type_stream_id_for_appservice -> set_appservice_stream_typ…
anoadragon453 Nov 24, 2021
c691ef0
Add some FIXME comments
anoadragon453 Nov 24, 2021
7cf6ad9
Add comment on why we don't NOT NULL to_device_stream_id
anoadragon453 Nov 24, 2021
6d68b8a
Refactor and generalise the sending of arbitrary fields over AS trans…
anoadragon453 Dec 3, 2021
13b25cf
Fix tests to mock _TransactionController.send of ApplicationServiceSc…
anoadragon453 Dec 3, 2021
275e1e0
Add to-device messages as their own special section in AS txns
anoadragon453 Dec 3, 2021
403490d
Insert to-device messages into the new to-device part of AS txns
anoadragon453 Dec 3, 2021
385b3bf
Modify tests to handle new location of to-device messages in AS txns
anoadragon453 Dec 3, 2021
c0b157d
Merge branch 'develop' of github.com:matrix-org/synapse into anoa/e2e…
anoadragon453 Dec 7, 2021
ba91438
Fix calls to create_appservice_txn in tests
anoadragon453 Dec 7, 2021
0685021
Update synapse/storage/schema/main/delta/65/06_msc2409_add_device_id_…
anoadragon453 Jan 11, 2022
e7f6732
Merge branch 'develop' of github.com:matrix-org/synapse into anoa/e2e…
anoadragon453 Jan 11, 2022
3d1661f
rename recipient_user_id_device_id_to_messages -> recipient_device_to…
anoadragon453 Jan 11, 2022
0ac079b
lint
anoadragon453 Jan 11, 2022
822e92a
Refactor storage methods to retrieve to-device messages
anoadragon453 Jan 25, 2022
25488fa
Update old references to get_new_messages_for_device
anoadragon453 Jan 25, 2022
026cb8a
Don't query for to-device messages without a device
anoadragon453 Jan 26, 2022
1121674
Merge branch 'develop' of github.com:matrix-org/synapse into anoa/e2e…
anoadragon453 Jan 26, 2022
8129657
Fix shape of simple_insert_many argument
anoadragon453 Jan 26, 2022
de48ab4
Only import MemoryReactor if type-checking in test_scheduler; fix old…
anoadragon453 Jan 26, 2022
d8b8f74
Update _txn function to match outer method name
anoadragon453 Jan 26, 2022
ced1314
Clarify users arg in _get_to_device_messages docstring
anoadragon453 Jan 28, 2022
c749fcb
assert stream_id returned by get_device_messages is as expected
anoadragon453 Jan 28, 2022
24512fb
Fix get_messages_for_device return type documentation
anoadragon453 Jan 28, 2022
24bc3c5
Clean up limit checking
anoadragon453 Jan 28, 2022
30b74a5
Move DB migration to schema v68
anoadragon453 Jan 28, 2022
80c3721
Accept iterables in enqueue_for_appservice
anoadragon453 Jan 28, 2022
3d8e50d
wording fixes
anoadragon453 Jan 28, 2022
50ebcb9
Apply suggestions from code review
anoadragon453 Jan 28, 2022
089e041
Only allow querying either a single device ID, or all device IDs of u…
anoadragon453 Jan 31, 2022
d06781e
Apply suggestions from code review
anoadragon453 Feb 1, 2022
c334eef
Require a user id/device id pair if passing 'limit'
anoadragon453 Feb 1, 2022
bf93ec4
Merge branch 'develop' of github.com:matrix-org/synapse into anoa/e2e…
anoadragon453 Feb 1, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/11215.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add experimental support for sending to-device messages to application services, as specified by [MSC2409](https://github.com/matrix-org/matrix-doc/pull/2409). Disabled by default.
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we note the experimental configuration flag to enable this?

22 changes: 18 additions & 4 deletions synapse/appservice/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
components.
"""
import logging
from typing import List, Optional
from typing import Iterable, List, Optional

from synapse.appservice import ApplicationService, ApplicationServiceState
from synapse.events import EventBase
Expand Down Expand Up @@ -95,8 +95,20 @@ def submit_event_for_as(self, service: ApplicationService, event: EventBase):
self.queuer.enqueue_event(service, event)

def submit_ephemeral_events_for_as(
self, service: ApplicationService, events: List[JsonDict]
):
self, service: ApplicationService, events: Iterable[JsonDict]
) -> None:
"""
Send ephemeral events to application services, and schedule a new
outgoing AS transaction.

Args:
service: The service to send ephemeral events to.
events: The ephemeral events to send.
"""
# Ensure we have some events to send
if not events:
return

self.queuer.enqueue_ephemeral(service, events)


Expand Down Expand Up @@ -130,7 +142,9 @@ def enqueue_event(self, service: ApplicationService, event: EventBase):
self.queued_events.setdefault(service.id, []).append(event)
self._start_background_request(service)

def enqueue_ephemeral(self, service: ApplicationService, events: List[JsonDict]):
def enqueue_ephemeral(
self, service: ApplicationService, events: Iterable[JsonDict]
) -> None:
self.queued_ephemeral.setdefault(service.id, []).extend(events)
self._start_background_request(service)

Expand Down
8 changes: 8 additions & 0 deletions synapse/config/experimental.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,3 +46,11 @@ def read_config(self, config: JsonDict, **kwargs):

# MSC3266 (room summary api)
self.msc3266_enabled: bool = experimental.get("msc3266_enabled", False)

# MSC2409 (this setting only relates to optionally sending to-device messages).
anoadragon453 marked this conversation as resolved.
Show resolved Hide resolved
# Presence, typing and read receipt EDUs are already sent to application services that
# have opted in to receive them. This setting, if enabled, adds to-device messages
# to that list.
self.msc2409_to_device_messages_enabled: bool = experimental.get(
"msc2409_to_device_messages_enabled", False
)
120 changes: 108 additions & 12 deletions synapse/handlers/appservice.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ def __init__(self, hs: "HomeServer"):
self.clock = hs.get_clock()
self.notify_appservices = hs.config.appservice.notify_appservices
self.event_sources = hs.get_event_sources()
self._msc2409_to_device_messages_enabled = (
hs.config.experimental.msc2409_to_device_messages_enabled
)

self.current_max = 0
self.is_processing = False
Expand Down Expand Up @@ -199,8 +202,9 @@ def notify_interested_services_ephemeral(
Args:
stream_key: The stream the event came from.

`stream_key` can be "typing_key", "receipt_key" or "presence_key". Any other
value for `stream_key` will cause this function to return early.
`stream_key` can be "typing_key", "receipt_key", "presence_key" or
"to_device_key". Any other value for `stream_key` will cause this function
to return early.
anoadragon453 marked this conversation as resolved.
Show resolved Hide resolved

Ephemeral events will only be pushed to appservices that have opted into
receiving them by setting `push_ephemeral` to true in their registration
Expand All @@ -216,8 +220,14 @@ def notify_interested_services_ephemeral(
if not self.notify_appservices:
return

# Ignore any unsupported streams
if stream_key not in ("typing_key", "receipt_key", "presence_key"):
anoadragon453 marked this conversation as resolved.
Show resolved Hide resolved
# Notify appservices of updates in ephemeral event streams.
# Only the following streams are currently supported.
if stream_key not in (
"typing_key",
"receipt_key",
"presence_key",
"to_device_key",
):
return

# Assert that new_token is an integer (and not a RoomStreamToken).
Expand All @@ -233,6 +243,13 @@ def notify_interested_services_ephemeral(
# Additional context: https://github.com/matrix-org/synapse/pull/11137
assert isinstance(new_token, int)

# Ignore to-device messages if the feature flag is not enabled
if (
stream_key == "to_device_key"
anoadragon453 marked this conversation as resolved.
Show resolved Hide resolved
and not self._msc2409_to_device_messages_enabled
):
return

# Check whether there are any appservices which have registered to receive
# ephemeral events.
#
Expand Down Expand Up @@ -285,10 +302,7 @@ async def _notify_interested_services_ephemeral(
):
if stream_key == "receipt_key":
events = await self._handle_receipts(service, new_token)
if events:
self.scheduler.submit_ephemeral_events_for_as(
service, events
)
self.scheduler.submit_ephemeral_events_for_as(service, events)

# Persist the latest handled stream token for this appservice
await self.store.set_type_stream_id_for_appservice(
Expand All @@ -297,16 +311,28 @@ async def _notify_interested_services_ephemeral(

elif stream_key == "presence_key":
events = await self._handle_presence(service, users, new_token)
if events:
self.scheduler.submit_ephemeral_events_for_as(
service, events
)
self.scheduler.submit_ephemeral_events_for_as(service, events)

# Persist the latest handled stream token for this appservice
await self.store.set_type_stream_id_for_appservice(
service, "presence", new_token
)

elif stream_key == "to_device_key":
# Retrieve a list of to-device message events, as well as the
# maximum stream token of the messages we were able to retrieve.
to_device_messages = await self._get_to_device_messages(
service, new_token, users
)
self.scheduler.submit_ephemeral_events_for_as(
service, to_device_messages
)

# Persist the latest handled stream token for this appservice
await self.store.set_type_stream_id_for_appservice(
anoadragon453 marked this conversation as resolved.
Show resolved Hide resolved
service, "to_device", new_token
)
anoadragon453 marked this conversation as resolved.
Show resolved Hide resolved

async def _handle_typing(
self, service: ApplicationService, new_token: int
) -> List[JsonDict]:
Expand Down Expand Up @@ -440,6 +466,76 @@ async def _handle_presence(

return events

async def _get_to_device_messages(
self,
service: ApplicationService,
new_token: int,
users: Collection[Union[str, UserID]],
) -> List[JsonDict]:
"""
Given an application service, determine which events it should receive
from those between the last-recorded typing event stream token for this
appservice and the given stream token.

Args:
service: The application service to check for which events it should receive.
new_token: The latest to-device event stream token.
users: The users that should receive new to-device messages.
anoadragon453 marked this conversation as resolved.
Show resolved Hide resolved

Returns:
A list of JSON dictionaries containing data derived from the typing events
anoadragon453 marked this conversation as resolved.
Show resolved Hide resolved
that should be sent to the given application service.
"""
# Get the stream token that this application service has processed up until
from_key = await self.store.get_type_stream_id_for_appservice(
service, "to_device"
)

# Filter out users that this appservice is not interested in
users_appservice_is_interested_in: List[str] = []
for user in users:
if isinstance(user, UserID):
anoadragon453 marked this conversation as resolved.
Show resolved Hide resolved
user = user.to_string()

if service.is_interested_in_user(user):
users_appservice_is_interested_in.append(user)

if not users_appservice_is_interested_in:
# Return early if the AS was not interested in any of these users
return []

# Retrieve the to-device messages for each user
recipient_user_id_device_id_to_messages = await self.store.get_new_messages(
users_appservice_is_interested_in,
from_key,
new_token,
)

# According to MSC2409, we'll need to add 'to_user_id' and 'to_device_id' fields
# to the event JSON so that the application service will know which user/device
# combination this messages was intended for.
#
# So we mangle this dict into a flat list of to-device messages with the relevant
# user ID and device ID embedded inside each message dict.
message_payload: List[JsonDict] = []
for (
user_id,
device_id,
), messages in recipient_user_id_device_id_to_messages.items():
for message_json in messages:
# Remove 'message_id' from the to-device message, as it's an internal ID
message_json.pop("message_id", None)

message_payload.append(
{
"to_user_id": user_id,
"to_device_id": device_id,
**message_json,
}
)
anoadragon453 marked this conversation as resolved.
Show resolved Hide resolved

return message_payload

async def query_user_exists(self, user_id: str) -> bool:
"""Check if any application service knows this user_id exists.

Expand Down
4 changes: 3 additions & 1 deletion synapse/notifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -452,7 +452,9 @@ def on_new_event(
users,
)
except Exception:
logger.exception("Error notifying application services of event")
logger.exception(
"Error notifying application services of ephemeral event"
anoadragon453 marked this conversation as resolved.
Show resolved Hide resolved
)

def on_new_replication_data(self) -> None:
"""Used to inform replication listeners that something has happened
Expand Down
4 changes: 2 additions & 2 deletions synapse/storage/databases/main/appservice.py
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,7 @@ def get_new_events_for_appservice_txn(txn):
async def get_type_stream_id_for_appservice(
self, service: ApplicationService, type: str
) -> int:
if type not in ("read_receipt", "presence"):
if type not in ("read_receipt", "presence", "to_device"):
raise ValueError(
"Expected type to be a valid application stream id type, got %s"
% (type,)
Expand All @@ -414,7 +414,7 @@ def get_type_stream_id_for_appservice_txn(txn):
async def set_type_stream_id_for_appservice(
self, service: ApplicationService, stream_type: str, pos: Optional[int]
) -> None:
if stream_type not in ("read_receipt", "presence"):
if stream_type not in ("read_receipt", "presence", "to_device"):
raise ValueError(
"Expected type to be a valid application stream id type, got %s"
% (stream_type,)
Expand Down
76 changes: 75 additions & 1 deletion synapse/storage/databases/main/deviceinbox.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
# limitations under the License.

import logging
from typing import TYPE_CHECKING, List, Optional, Tuple
from typing import TYPE_CHECKING, Collection, Dict, List, Optional, Tuple

from synapse.logging import issue9533_logger
from synapse.logging.opentracing import log_kv, set_tag, trace
Expand All @@ -24,6 +24,7 @@
DatabasePool,
LoggingDatabaseConnection,
LoggingTransaction,
make_in_list_sql_clause,
)
from synapse.storage.engines import PostgresEngine
from synapse.storage.util.id_generators import (
Expand Down Expand Up @@ -136,6 +137,79 @@ def process_replication_rows(self, stream_name, instance_name, token, rows):
def get_to_device_stream_token(self):
return self._device_inbox_id_gen.get_current_token()

async def get_new_messages(
anoadragon453 marked this conversation as resolved.
Show resolved Hide resolved
self,
user_ids: Collection[str],
from_stream_id: int,
to_stream_id: int,
) -> Dict[Tuple[str, str], List[JsonDict]]:
"""
Retrieve to-device messages for a given set of user IDs.

Only to-device messages with stream ids between the given boundaries
(from < X <= to) are returned.

Note that a stream ID can be shared by multiple copies of the same message with
different recipient devices. Each (device, message_content) tuple has their own
row in the device_inbox table.

Args:
user_ids: The users to retrieve to-device messages for.
from_stream_id: The lower boundary of stream id to filter with (exclusive).
to_stream_id: The upper boundary of stream id to filter with (inclusive).

Returns:
A list of to-device messages.
"""
# Bail out if none of these users have any messages
for user_id in user_ids:
if self._device_inbox_stream_cache.has_entity_changed(
user_id, from_stream_id
):
break
else:
return {}

def get_new_messages_txn(txn: LoggingTransaction):
# Build a query to select messages from any of the given users that are between
# the given stream id bounds

# Scope to only the given users. We need to use this method as doing so is
# different across database engines.
many_clause_sql, many_clause_args = make_in_list_sql_clause(
self.database_engine, "user_id", user_ids
anoadragon453 marked this conversation as resolved.
Show resolved Hide resolved
)

sql = f"""
SELECT user_id, device_id, message_json FROM device_inbox
WHERE {many_clause_sql}
AND ? < stream_id AND stream_id <= ?
ORDER BY stream_id ASC
anoadragon453 marked this conversation as resolved.
Show resolved Hide resolved
"""

txn.execute(sql, (*many_clause_args, from_stream_id, to_stream_id))

# Create a dictionary of (user ID, device ID) -> list of messages that
# that device is meant to receive.
recipient_user_id_device_id_to_messages: Dict[
Tuple[str, str], List[JsonDict]
] = {}

for row in txn:
recipient_user_id = row[0]
recipient_device_id = row[1]
message_dict = db_to_json(row[2])

recipient_user_id_device_id_to_messages.setdefault(
(recipient_user_id, recipient_device_id), []
).append(message_dict)

return recipient_user_id_device_id_to_messages

return await self.db_pool.runInteraction(
"get_new_messages", get_new_messages_txn
)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm rather uncomfortable that we're duplicating get_new_messages_for_device. Can't we figure out the device ids and use get_new_messages_for_device ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could, though querying by devices rather than user IDs can result in a larger query.

Though, Travis tells me that it's likely that AS users will typically only have 1 device per user, maybe 2. But more would mean sending more data to the db server.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm also happy to just add device_ids to the new method and eliminate get_new_messages_for_device.

Copy link
Member Author

@anoadragon453 anoadragon453 Nov 19, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given the above thread on how querying with device IDs is more efficient, combining these two methods makes sense. However, there's an annoying subtlety here which makes it difficult to support a limit argument when dealing with multiple users/devices.

Stream IDs for to-device messages are only unique to (user_id, device_id) tuples. They are not unique to rows in the device_inbox table. As an example, you can have a device_inbox table that looks like:

      user_id      |                  device_id                  | stream_id |     message_json   | instance_name 
-------------------+---------------------------------------------+-----------+--------------------+---------------
 @alice:localhost  | HAKDUDVZXZ                                  |         1 | {"content":{"A"} | master
 @alice:localhost  | HAKDUDVZXZ                                  |         2 | {"content":{"B"} | master
 @bob:localhost    | TYCGLKNPVJ                                  |         2 | {"content":{"B"} | master
(5 rows)

Note mainly that there are multiple rows with the same stream_id - but there's only ever one (user_id, device_id) combination per stream ID.

The limit parameter of get_new_messages_for_device means the maximum number of messages to return. This can easily be implemented via a LIMIT ? in SQL when dealing with a single (user_id, device_id) combination - as get_new_messages_for_device currently does. But this becomes more complicated when dealing with multiple (user_id, device_id) combinations, as the AS API would like to do.

This is why I dropped limit from get_new_messages. It would instead need to be defined as a limit on the number of stream IDs - which could mean at most a limit of N, could return N x M to-device messages. Where M is defined as the number of unique (user_id, device_id) combinations.


All that being said though, we could just have public methods which only accept:

  • multiple (user_id, device_id) pairs, no limit
  • a single (user_id, device_id) pair, limit

and both call to a unified, private method. How does that sound?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But this becomes more complicated when dealing with multiple (user_id, device_id) combinations, as the AS API would like to do.

I'm not sure I entirely follow this. As long as you ORDER BY stream_id, you will know that you have complete results for all but the last returned stream_id - so you just need to drop the last, potentially incomplete, stream_id from the results before returning them to the caller.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What will the caller than call the method with next time though? If they try again with the last returned stream_id, at best they'll get some duplicated messages, and at worst if the number of messages belonging to a single stream ID > the limit, then we'll get in a never-ending loop. stream_id - 1 has the same problem, and stream_id + 1 will cause messages to be missed.

An alternative approach:

The caller provides from and to tokens, as well as an offset value. The storage function returns the latest processed stream_id as well as an offset, which the caller then hands right back on the next call, and so on.

This way, if we end up hitting our limit in the middle of a stream_ids group of messages, we can identify how much of that group was returned on the last call. We then just offset through that same group on the next call.

We'd need to ensure a stream_ids group of messages are always consistently ordered (ORDER BY stream_id, device_id). We'd also have to store this offset in the database, though that's just another column.

I think that should work, though there's likely a simpler way to do it...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Following on from your example; suppose we call get_new_messages_for_devices with current_stream_id=0, limit=2. We add one to the limit, and make the DB query, which returns the following:

      user_id      |                  device_id                  | stream_id |     message_json   | instance_name 
-------------------+---------------------------------------------+-----------+--------------------+---------------
 @alice:localhost  | HAKDUDVZXZ                                  |         1 | {"content":{"A"} | master
 @alice:localhost  | HAKDUDVZXZ                                  |         2 | {"content":{"B"} | master
 @bob:localhost    | TYCGLKNPVJ                                  |         2 | {"content":{"B"} | master

We can now tell that, at the limit (2), we would have incomplete results for stream_id==2. So, get_new_messages_for_devices dropsall the messages with stream_id==2, and returns the remainder, with stream_pos=1.

Next time round, the caller passes the returned stream_pos (ie, 1) back as current_stream_id (noting that current_stream_id is exclusive) and gets all the messages with stream_id=2.

(You might remember this algorithm from https://github.com/matrix-org/synapse/pull/5156/files#diff-d9219a3564a55443579532e051d6abc32f47e8f4ba44c64459cd4f9d378e22bdR90-R121).

You're right that it fails if there are more messages with a given stream_id for the given devices than there are at the limit. But assuming we're talking about a limit of say 100, that would mean that the AS would have to be managing at least 100 devices... how likely a scenario is this?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right that it fails if there are more messages with a given stream_id for the given devices than there are at the limit. But assuming we're talking about a limit of say 100, that would mean that the AS would have to be managing at least 100 devices... how likely a scenario is this?

Pretty likely I'm afraid - current end-to-end encryption bridge architectures require at least one device per bridged user.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

bleh. In which case, maybe we're better off with your earlier idea of

we could just have public methods which only accept:

* multiple `(user_id, device_id)` pairs, no limit
* a single `(user_id, device_id)` pair, limit

and both call to a unified, private method.

having to store an offset into a given stream_id feels pretty icky to me.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

822e92a refactors the two previous methods into two public methods (get_messages_for_device and get_messages_for_user_devices) which both call the private method _get_device_messages.

A limit option is only allowed for get_messages_for_device, which takes a single user ID / device ID pair.

async def get_new_messages_for_device(
self,
user_id: str,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
/* Copyright 2021 The Matrix.org Foundation C.I.C
anoadragon453 marked this conversation as resolved.
Show resolved Hide resolved
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

-- Add a column to track what to_device stream id that this application
-- service has been caught up to.
ALTER TABLE application_services_state ADD COLUMN to_device_stream_id BIGINT;
richvdh marked this conversation as resolved.
Show resolved Hide resolved
Loading