Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Refactor the way we set outlier #11634

Merged
merged 13 commits into from
Jan 5, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/11634.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Refactor the way that the `outlier` flag is set on events received over federation.
7 changes: 1 addition & 6 deletions synapse/federation/federation_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -215,15 +215,12 @@ def _is_invite_via_3pid(event: EventBase) -> bool:
)


def event_from_pdu_json(
pdu_json: JsonDict, room_version: RoomVersion, outlier: bool = False
) -> EventBase:
def event_from_pdu_json(pdu_json: JsonDict, room_version: RoomVersion) -> EventBase:
"""Construct an EventBase from an event json received over federation

Args:
pdu_json: pdu as received over federation
room_version: The version of the room this event belongs to
outlier: True to mark this event as an outlier

Raises:
SynapseError: if the pdu is missing required fields or is otherwise
Expand All @@ -247,6 +244,4 @@ def event_from_pdu_json(
validate_canonicaljson(pdu_json)

event = make_event_from_dict(pdu_json, room_version)
event.internal_metadata.outlier = outlier

return event
36 changes: 5 additions & 31 deletions synapse/federation/federation_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -265,10 +265,7 @@ async def backfill(

room_version = await self.store.get_room_version(room_id)

pdus = [
event_from_pdu_json(p, room_version, outlier=False)
for p in transaction_data_pdus
]
pdus = [event_from_pdu_json(p, room_version) for p in transaction_data_pdus]

# Check signatures and hash of pdus, removing any from the list that fail checks
pdus[:] = await self._check_sigs_and_hash_and_fetch(
Expand All @@ -282,7 +279,6 @@ async def get_pdu_from_destination_raw(
destination: str,
event_id: str,
room_version: RoomVersion,
outlier: bool = False,
timeout: Optional[int] = None,
) -> Optional[EventBase]:
"""Requests the PDU with given origin and ID from the remote home
Expand All @@ -292,9 +288,6 @@ async def get_pdu_from_destination_raw(
destination: Which homeserver to query
event_id: event to fetch
room_version: version of the room
outlier: Indicates whether the PDU is an `outlier`, i.e. if
it's from an arbitrary point in the context as opposed to part
of the current block of PDUs. Defaults to `False`
timeout: How long to try (in ms) each destination for before
moving to the next destination. None indicates no timeout.

Expand All @@ -316,8 +309,7 @@ async def get_pdu_from_destination_raw(
)

pdu_list: List[EventBase] = [
event_from_pdu_json(p, room_version, outlier=outlier)
for p in transaction_data["pdus"]
event_from_pdu_json(p, room_version) for p in transaction_data["pdus"]
]

if pdu_list and pdu_list[0]:
Expand All @@ -334,7 +326,6 @@ async def get_pdu(
destinations: Iterable[str],
event_id: str,
room_version: RoomVersion,
outlier: bool = False,
timeout: Optional[int] = None,
) -> Optional[EventBase]:
"""Requests the PDU with given origin and ID from the remote home
Expand All @@ -347,9 +338,6 @@ async def get_pdu(
destinations: Which homeservers to query
event_id: event to fetch
room_version: version of the room
outlier: Indicates whether the PDU is an `outlier`, i.e. if
it's from an arbitrary point in the context as opposed to part
of the current block of PDUs. Defaults to `False`
timeout: How long to try (in ms) each destination for before
moving to the next destination. None indicates no timeout.

Expand Down Expand Up @@ -377,7 +365,6 @@ async def get_pdu(
destination=destination,
event_id=event_id,
room_version=room_version,
outlier=outlier,
timeout=timeout,
)

Expand Down Expand Up @@ -435,7 +422,6 @@ async def _check_sigs_and_hash_and_fetch(
origin: str,
pdus: Collection[EventBase],
room_version: RoomVersion,
outlier: bool = False,
) -> List[EventBase]:
"""Takes a list of PDUs and checks the signatures and hashes of each
one. If a PDU fails its signature check then we check if we have it in
Expand All @@ -451,7 +437,6 @@ async def _check_sigs_and_hash_and_fetch(
origin
pdu
room_version
outlier: Whether the events are outliers or not

Returns:
A list of PDUs that have valid signatures and hashes.
Expand All @@ -466,7 +451,6 @@ async def _execute(pdu: EventBase) -> None:
valid_pdu = await self._check_sigs_and_hash_and_fetch_one(
pdu=pdu,
origin=origin,
outlier=outlier,
room_version=room_version,
)

Expand All @@ -482,7 +466,6 @@ async def _check_sigs_and_hash_and_fetch_one(
pdu: EventBase,
origin: str,
room_version: RoomVersion,
outlier: bool = False,
) -> Optional[EventBase]:
"""Takes a PDU and checks its signatures and hashes. If the PDU fails
its signature check then we check if we have it in the database and if
Expand All @@ -494,9 +477,6 @@ async def _check_sigs_and_hash_and_fetch_one(
origin
pdu
room_version
outlier: Whether the events are outliers or not
include_none: Whether to include None in the returned list
for events that have failed their checks

Returns:
The PDU (possibly redacted) if it has valid signatures and hashes.
Expand All @@ -521,7 +501,6 @@ async def _check_sigs_and_hash_and_fetch_one(
destinations=[pdu_origin],
event_id=pdu.event_id,
room_version=room_version,
outlier=outlier,
timeout=10000,
)
except SynapseError:
Expand All @@ -541,13 +520,10 @@ async def get_event_auth(

room_version = await self.store.get_room_version(room_id)

auth_chain = [
event_from_pdu_json(p, room_version, outlier=True)
for p in res["auth_chain"]
]
auth_chain = [event_from_pdu_json(p, room_version) for p in res["auth_chain"]]

signed_auth = await self._check_sigs_and_hash_and_fetch(
destination, auth_chain, outlier=True, room_version=room_version
destination, auth_chain, room_version=room_version
)

return signed_auth
Expand Down Expand Up @@ -816,7 +792,6 @@ async def send_request(destination: str) -> SendJoinResult:
valid_pdu = await self._check_sigs_and_hash_and_fetch_one(
pdu=event,
origin=destination,
outlier=True,
room_version=room_version,
)

Expand Down Expand Up @@ -864,7 +839,6 @@ async def _execute(pdu: EventBase) -> None:
valid_pdu = await self._check_sigs_and_hash_and_fetch_one(
pdu=pdu,
origin=destination,
outlier=True,
room_version=room_version,
)

Expand Down Expand Up @@ -1235,7 +1209,7 @@ async def get_missing_events(
]

signed_events = await self._check_sigs_and_hash_and_fetch(
destination, events, outlier=False, room_version=room_version
destination, events, room_version=room_version
)
except HttpResponseException as e:
if not e.code == 400:
Expand Down
15 changes: 8 additions & 7 deletions synapse/handlers/federation_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -421,9 +421,6 @@ async def process_remote_join(
Raises:
SynapseError if the response is in some way invalid.
"""
for e in itertools.chain(auth_events, state):
e.internal_metadata.outlier = True

event_map = {e.event_id: e for e in itertools.chain(auth_events, state)}

create_event = None
Expand Down Expand Up @@ -1194,7 +1191,6 @@ async def get_event(event_id: str) -> None:
[destination],
event_id,
room_version,
outlier=True,
)
if event is None:
logger.warning(
Expand Down Expand Up @@ -1223,9 +1219,10 @@ async def _auth_and_persist_outliers(
"""Persist a batch of outlier events fetched from remote servers.

We first sort the events to make sure that we process each event's auth_events
before the event itself, and then auth and persist them.
before the event itself.

Notifies about the events where appropriate.
We then mark the events as outliers, persist them to the database, and, where
appropriate (eg, an invite), awake the notifier.

Params:
room_id: the room that the events are meant to be in (though this has
Expand Down Expand Up @@ -1276,7 +1273,8 @@ async def _auth_and_persist_outliers_inner(
Persists a batch of events where we have (theoretically) already persisted all
of their auth events.

Notifies about the events where appropriate.
Marks the events as outliers, auths them, persists them to the database, and,
where appropriate (eg, an invite), awakes the notifier.

Params:
origin: where the events came from
Expand Down Expand Up @@ -1314,6 +1312,9 @@ def prep(event: EventBase) -> Optional[Tuple[EventBase, EventContext]]:
return None
auth.append(ae)

# we're not bothering about room state, so flag the event as an outlier.
event.internal_metadata.outlier = True
Comment on lines +1315 to +1316
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should the docstring of _auth_and_persist_outliers (or _auth_and_persist_outliers_inner) be updated to note that it will mark events as outliers?


context = EventContext.for_outlier()
try:
validate_event_for_room_version(room_version_obj, event)
Expand Down
4 changes: 1 addition & 3 deletions tests/handlers/test_federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -373,9 +373,7 @@ async def get_event_auth(
destination: str, room_id: str, event_id: str
) -> List[EventBase]:
return [
event_from_pdu_json(
ae.get_pdu_json(), room_version=room_version, outlier=True
)
event_from_pdu_json(ae.get_pdu_json(), room_version=room_version)
for ae in auth_events
]

Expand Down