Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Merge pull request #5727 from matrix-org/uhoreg/e2e_cross-signing2-part3
Browse files Browse the repository at this point in the history
Cross-signing [4/4] -- federation edition
  • Loading branch information
uhoreg authored Nov 1, 2019
2 parents 3b4216f + c3fc176 commit 53d7680
Show file tree
Hide file tree
Showing 7 changed files with 241 additions and 47 deletions.
1 change: 1 addition & 0 deletions changelog.d/5727.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add federation support for cross-signing.
8 changes: 4 additions & 4 deletions synapse/federation/sender/per_destination_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -360,20 +360,20 @@ def _get_device_update_edus(self, limit):
last_device_list = self._last_device_list_stream_id

# Retrieve list of new device updates to send to the destination
now_stream_id, results = yield self._store.get_devices_by_remote(
now_stream_id, results = yield self._store.get_device_updates_by_remote(
self._destination, last_device_list, limit=limit
)
edus = [
Edu(
origin=self._server_name,
destination=self._destination,
edu_type="m.device_list_update",
edu_type=edu_type,
content=content,
)
for content in results
for (edu_type, content) in results
]

assert len(edus) <= limit, "get_devices_by_remote returned too many EDUs"
assert len(edus) <= limit, "get_device_updates_by_remote returned too many EDUs"

return (edus, now_stream_id)

Expand Down
13 changes: 12 additions & 1 deletion synapse/handlers/device.py
Original file line number Diff line number Diff line change
Expand Up @@ -459,7 +459,18 @@ def notify_user_signature_update(self, from_user_id, user_ids):
@defer.inlineCallbacks
def on_federation_query_user_devices(self, user_id):
stream_id, devices = yield self.store.get_devices_with_keys_by_user(user_id)
return {"user_id": user_id, "stream_id": stream_id, "devices": devices}
master_key = yield self.store.get_e2e_cross_signing_key(user_id, "master")
self_signing_key = yield self.store.get_e2e_cross_signing_key(
user_id, "self_signing"
)

return {
"user_id": user_id,
"stream_id": stream_id,
"devices": devices,
"master_key": master_key,
"self_signing_key": self_signing_key,
}

@defer.inlineCallbacks
def user_left_room(self, user, room_id):
Expand Down
137 changes: 128 additions & 9 deletions synapse/handlers/e2e_keys.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
get_verify_key_from_cross_signing_key,
)
from synapse.util import unwrapFirstError
from synapse.util.async_helpers import Linearizer
from synapse.util.caches.expiringcache import ExpiringCache
from synapse.util.retryutils import NotRetryingDestination

logger = logging.getLogger(__name__)
Expand All @@ -49,10 +51,19 @@ def __init__(self, hs):
self.is_mine = hs.is_mine
self.clock = hs.get_clock()

self._edu_updater = SigningKeyEduUpdater(hs, self)

federation_registry = hs.get_federation_registry()

# FIXME: switch to m.signing_key_update when MSC1756 is merged into the spec
federation_registry.register_edu_handler(
"org.matrix.signing_key_update",
self._edu_updater.incoming_signing_key_update,
)
# doesn't really work as part of the generic query API, because the
# query request requires an object POST, but we abuse the
# "query handler" interface.
hs.get_federation_registry().register_query_handler(
federation_registry.register_query_handler(
"client_keys", self.on_federation_query_client_keys
)

Expand Down Expand Up @@ -208,13 +219,15 @@ def do_remote_query(destination):
if user_id in destination_query:
results[user_id] = keys

for user_id, key in remote_result["master_keys"].items():
if user_id in destination_query:
cross_signing_keys["master_keys"][user_id] = key
if "master_keys" in remote_result:
for user_id, key in remote_result["master_keys"].items():
if user_id in destination_query:
cross_signing_keys["master_keys"][user_id] = key

for user_id, key in remote_result["self_signing_keys"].items():
if user_id in destination_query:
cross_signing_keys["self_signing_keys"][user_id] = key
if "self_signing_keys" in remote_result:
for user_id, key in remote_result["self_signing_keys"].items():
if user_id in destination_query:
cross_signing_keys["self_signing_keys"][user_id] = key

except Exception as e:
failure = _exception_to_failure(e)
Expand Down Expand Up @@ -252,7 +265,7 @@ def get_cross_signing_keys_from_cache(self, query, from_user_id):
Returns:
defer.Deferred[dict[str, dict[str, dict]]]: map from
(master|self_signing|user_signing) -> user_id -> key
(master_keys|self_signing_keys|user_signing_keys) -> user_id -> key
"""
master_keys = {}
self_signing_keys = {}
Expand Down Expand Up @@ -344,7 +357,16 @@ def on_federation_query_client_keys(self, query_body):
"""
device_keys_query = query_body.get("device_keys", {})
res = yield self.query_local_devices(device_keys_query)
return {"device_keys": res}
ret = {"device_keys": res}

# add in the cross-signing keys
cross_signing_keys = yield self.get_cross_signing_keys_from_cache(
device_keys_query, None
)

ret.update(cross_signing_keys)

return ret

@trace
@defer.inlineCallbacks
Expand Down Expand Up @@ -1058,3 +1080,100 @@ class SignatureListItem:
target_user_id = attr.ib()
target_device_id = attr.ib()
signature = attr.ib()


class SigningKeyEduUpdater(object):
"""Handles incoming signing key updates from federation and updates the DB"""

def __init__(self, hs, e2e_keys_handler):
self.store = hs.get_datastore()
self.federation = hs.get_federation_client()
self.clock = hs.get_clock()
self.e2e_keys_handler = e2e_keys_handler

self._remote_edu_linearizer = Linearizer(name="remote_signing_key")

# user_id -> list of updates waiting to be handled.
self._pending_updates = {}

# Recently seen stream ids. We don't bother keeping these in the DB,
# but they're useful to have them about to reduce the number of spurious
# resyncs.
self._seen_updates = ExpiringCache(
cache_name="signing_key_update_edu",
clock=self.clock,
max_len=10000,
expiry_ms=30 * 60 * 1000,
iterable=True,
)

@defer.inlineCallbacks
def incoming_signing_key_update(self, origin, edu_content):
"""Called on incoming signing key update from federation. Responsible for
parsing the EDU and adding to pending updates list.
Args:
origin (string): the server that sent the EDU
edu_content (dict): the contents of the EDU
"""

user_id = edu_content.pop("user_id")
master_key = edu_content.pop("master_key", None)
self_signing_key = edu_content.pop("self_signing_key", None)

if get_domain_from_id(user_id) != origin:
logger.warning("Got signing key update edu for %r from %r", user_id, origin)
return

room_ids = yield self.store.get_rooms_for_user(user_id)
if not room_ids:
# We don't share any rooms with this user. Ignore update, as we
# probably won't get any further updates.
return

self._pending_updates.setdefault(user_id, []).append(
(master_key, self_signing_key)
)

yield self._handle_signing_key_updates(user_id)

@defer.inlineCallbacks
def _handle_signing_key_updates(self, user_id):
"""Actually handle pending updates.
Args:
user_id (string): the user whose updates we are processing
"""

device_handler = self.e2e_keys_handler.device_handler

with (yield self._remote_edu_linearizer.queue(user_id)):
pending_updates = self._pending_updates.pop(user_id, [])
if not pending_updates:
# This can happen since we batch updates
return

device_ids = []

logger.info("pending updates: %r", pending_updates)

for master_key, self_signing_key in pending_updates:
if master_key:
yield self.store.set_e2e_cross_signing_key(
user_id, "master", master_key
)
_, verify_key = get_verify_key_from_cross_signing_key(master_key)
# verify_key is a VerifyKey from signedjson, which uses
# .version to denote the portion of the key ID after the
# algorithm and colon, which is the device ID
device_ids.append(verify_key.version)
if self_signing_key:
yield self.store.set_e2e_cross_signing_key(
user_id, "self_signing", self_signing_key
)
_, verify_key = get_verify_key_from_cross_signing_key(
self_signing_key
)
device_ids.append(verify_key.version)

yield device_handler.notify_device_update(user_id, device_ids)
Loading

0 comments on commit 53d7680

Please sign in to comment.