Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Debug for device lists updates #11760

Merged
merged 4 commits into from
Jan 20, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/11760.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add optional debugging to investigate [issue 8631](https://github.com/matrix-org/synapse/issues/8631).
12 changes: 12 additions & 0 deletions synapse/federation/sender/transaction_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import synapse.server

logger = logging.getLogger(__name__)
issue_8631_logger = logging.getLogger("synapse.8631_debug")

last_pdu_ts_metric = Gauge(
"synapse_federation_last_sent_pdu_time",
Expand Down Expand Up @@ -124,6 +125,17 @@ async def send_new_transaction(
len(pdus),
len(edus),
)
if issue_8631_logger.isEnabledFor(logging.DEBUG):
DEVICE_UPDATE_EDUS = {"m.device_list_update", "m.signing_key_update"}
device_list_updates = [
edu.content for edu in edus if edu.edu_type in DEVICE_UPDATE_EDUS
]
if device_list_updates:
clokep marked this conversation as resolved.
Show resolved Hide resolved
issue_8631_logger.debug(
"transaction [%s] includes device list updates: %s",
transaction.transaction_id,
device_list_updates,
)

# Actually send the transaction

Expand Down
15 changes: 15 additions & 0 deletions synapse/federation/transport/server/federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
from synapse.util.versionstring import get_version_string

logger = logging.getLogger(__name__)
issue_8631_logger = logging.getLogger("synapse.8631_debug")


class BaseFederationServerServlet(BaseFederationServlet):
Expand Down Expand Up @@ -95,6 +96,20 @@ async def on_PUT(
len(transaction_data.get("edus", [])),
)

if issue_8631_logger.isEnabledFor(logging.DEBUG):
DEVICE_UPDATE_EDUS = {"m.device_list_update", "m.signing_key_update"}
device_list_updates = [
edu.content
for edu in transaction_data.get("edus", [])
if edu.edu_type in DEVICE_UPDATE_EDUS
]
if device_list_updates:
issue_8631_logger.debug(
"transaction [%s] includes device list updates: %s",
DMRobertson marked this conversation as resolved.
Show resolved Hide resolved
transaction_id,
device_list_updates,
)

except Exception as e:
logger.exception(e)
return 400, {"error": "Invalid transaction"}
Expand Down
21 changes: 19 additions & 2 deletions synapse/storage/databases/main/devices.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
from synapse.server import HomeServer

logger = logging.getLogger(__name__)
issue_8631_logger = logging.getLogger("synapse.8631_debug")

DROP_DEVICE_LIST_STREAMS_NON_UNIQUE_INDEXES = (
"drop_device_list_streams_non_unique_indexes"
Expand Down Expand Up @@ -222,13 +223,18 @@ async def get_device_updates_by_remote(
limit,
)

# We need to ensure `updates` doesn't grow too big.
# Currently: `len(updates) <= limit`.
# Note for later that `len(updates) <= limit`.
DMRobertson marked this conversation as resolved.
Show resolved Hide resolved

# Return an empty list if there are no updates
if not updates:
return now_stream_id, []

if issue_8631_logger.isEnabledFor(logging.DEBUG):
data = {(user, device): stream_id for user, device, stream_id, _ in updates}
issue_8631_logger.debug(
"device updates need to be sent to %s: %s", destination, data
)

# get the cross-signing keys of the users in the list, so that we can
# determine which of the device changes were cross-signing keys
users = {r[0] for r in updates}
Expand Down Expand Up @@ -365,6 +371,17 @@ async def get_device_updates_by_remote(
# and remove the length budgeting above.
results.append(("org.matrix.signing_key_update", result))

if issue_8631_logger.isEnabledFor(logging.DEBUG):
for (user_id, edu) in results:
issue_8631_logger.debug(
"device update to %s for %s from %s to %s: %s",
destination,
user_id,
from_stream_id,
last_processed_stream_id,
edu,
)

return last_processed_stream_id, results

def _get_device_updates_by_remote_txn(
Expand Down