Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Update get_pdu to return the original, pristine EventBase #13320

Merged
merged 22 commits into from
Jul 20, 2022
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
ee236ca
Update get_pdu to return original, pristine EventBase
MadLittleMods Jul 18, 2022
79a1b72
Add changelog
MadLittleMods Jul 18, 2022
bfd35fd
Internal change, no specific bugfix
MadLittleMods Jul 18, 2022
e0e20a5
Explain why not
MadLittleMods Jul 18, 2022
22410f2
Add tests
MadLittleMods Jul 19, 2022
09c411b
Some more clarity
MadLittleMods Jul 19, 2022
6029b42
Re-use room ID
MadLittleMods Jul 19, 2022
09167b1
Better still actionable no-fluff assertion message
MadLittleMods Jul 19, 2022
eb6a291
Describe why we use a cache here
MadLittleMods Jul 19, 2022
1c4e57c
Remove direct access to internal property
MadLittleMods Jul 19, 2022
488f5ed
Make it obvious that we're pulling and using a different cache
MadLittleMods Jul 19, 2022
29a5269
Remove assumption/speculation
MadLittleMods Jul 19, 2022
2688e44
Default is already no metadata
MadLittleMods Jul 19, 2022
24913e7
Refactor structure to avoid duplicating the event copy logic
MadLittleMods Jul 19, 2022
0e6dd5a
Pluralization typo
MadLittleMods Jul 19, 2022
5bc75ed
Explain that we return a copy that is safe to modify
MadLittleMods Jul 19, 2022
dea7669
Fix lints
MadLittleMods Jul 19, 2022
72e65a5
Fix description typo
MadLittleMods Jul 19, 2022
86fe0dc
Share event throughout
MadLittleMods Jul 20, 2022
fd879bb
Different comment
MadLittleMods Jul 20, 2022
354678f
Merge branch 'madlittlemods/pristine-get_pdu' of github.com:matrix-or…
MadLittleMods Jul 20, 2022
233077c
Merge branch 'develop' into madlittlemods/pristine-get_pdu
MadLittleMods Jul 20, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/13320.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix `FederationClient.get_pdu()` returning events from the cache as `outliers` instead of original events we saw over federation.
58 changes: 51 additions & 7 deletions synapse/federation/federation_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
RoomVersion,
RoomVersions,
)
from synapse.events import EventBase, builder
from synapse.events import EventBase, builder, make_event_from_dict
from synapse.federation.federation_base import (
FederationBase,
InvalidEventSignatureError,
Expand Down Expand Up @@ -309,7 +309,7 @@ async def get_pdu_from_destination_raw(
)

logger.debug(
"retrieved event id %s from %s: %r",
"get_pdu_from_destination_raw: retrieved event id %s from %s: %r",
event_id,
destination,
transaction_data,
Expand Down Expand Up @@ -358,11 +358,33 @@ async def get_pdu(
The requested PDU, or None if we were unable to find it.
MadLittleMods marked this conversation as resolved.
Show resolved Hide resolved
"""
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a suggestion for a somewhat different approach. It's arguably cleaner, but also probably a bunch more work; I'd be interested in your thoughts.

In my ideal world, _EventInternalMetadata.outlier should be a private, immutable property. Which is to say, you have to know whether you are dealing with an outlier at the time you construct an _EventInternalMetadata and hence when you call make_event_from_dict. We're some way off that goal in the codebase as a whole, but that doesn't stop us thinking about how to move in that direction for this part of the code.

-- #13320 (comment)

I like the idea of having it be immutable 👍

It would mean passing an outlier flag into get_pdu telling it whether we're going to persist the result as an outlier or not. (That's fairly easy, because get_pdu is called in two places: FederationEventHandler.get_event, which only deals in outliers, and FederationClient._check_sigs_and_hash_and_fetch_one, where we can infer the outlieryness from the input pdu).

That then raises a few possibilities: we could continue to construct a new EventBase on each call to get_pdu as proposed here. Or we could use a different cache key depending on the outlier flag and have (up to) two cache entries for each event.

But needing to pass in outlier or the metadata (when we need to add more than just outlier in the future) makes the function signature surface a bit icky. I'd rather just force downstream consumers to create a copy with the metadata they need/expect, or want to add.

We can tackle this in another potential PR though.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, there's definitely a balance of two evils here. I'm happy to punt this for now at least.


logger.debug(
"get_pdu: event_id=%s from destinations=%s", event_id, destinations
)

# TODO: Rate limit the number of times we try and get the same event.

ev = self._get_pdu_cache.get(event_id)
if ev:
return ev
event_copy = None
event_from_cache = self._get_pdu_cache.get(event_id)
MadLittleMods marked this conversation as resolved.
Show resolved Hide resolved
if event_from_cache:
logger.debug("get_pdu: found event_from_cache=%s", event_from_cache)
assert not event_from_cache.internal_metadata.outlier, (
MadLittleMods marked this conversation as resolved.
Show resolved Hide resolved
"Event from cache unexpectedly an `outlier` when it should be pristine and untouched without metadata set. "
MadLittleMods marked this conversation as resolved.
Show resolved Hide resolved
"We are probably not be returning a copy of the event because downstream callers are modifying the event reference we have in the cache."
MadLittleMods marked this conversation as resolved.
Show resolved Hide resolved
)

# Make sure to return a copy because downstream callers will use
# this event reference directly and change our original, pristine,
# untouched PDU. For example when people mark the event as an
# `outlier` (`event.internal_metadata.outlier = true`), we don't
# want that to propagate back into the cache.
event_copy = make_event_from_dict(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we're going to build a new EventBase on each call, why not just cache the raw json?

Copy link
Contributor Author

@MadLittleMods MadLittleMods Jul 19, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would work. But we would have to convert the EventBase back to JSON from get_pdu_from_destination_raw since it needs one for _check_sigs_and_hash(room_version, pdu) anyway (whether that be in get_pdu_from_destination_raw or in get_pdu). I'm inclined to leave it as-is. Feel free to push again

event_from_cache.get_pdu_json(),
event_from_cache.room_version,
internal_metadata_dict=None,
MadLittleMods marked this conversation as resolved.
Show resolved Hide resolved
)

return event_copy

pdu_attempts = self.pdu_destination_tried.setdefault(event_id, {})

Expand All @@ -371,6 +393,13 @@ async def get_pdu(
now = self._clock.time_msec()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wait... does this loop always try all the destinations, even if the first one works? that would be a substantial bug, if it was ever called with more than one destination, which I don't think it is...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like it is bugged. I can add a break or refactor to do the return inside the for-loop in a separate PR after this merges ⏩

Copy link
Contributor Author

@MadLittleMods MadLittleMods Jul 19, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added a FIXME comment in the code for now so we don't forget or at least will know to tackle it at some point. I plan to tackle in a fast-follow-up after this PR since it's a separate bug. Feel free to push to include it here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Followed up in #13346

last_attempt = pdu_attempts.get(destination, 0)
if last_attempt + PDU_RETRY_TIME_MS > now:
logger.debug(
"get_pdu: skipping destination=%s because we tried it recently last_attempt=%s and we only check every %s (now=%s)",
destination,
last_attempt,
PDU_RETRY_TIME_MS,
now,
)
continue

try:
Expand Down Expand Up @@ -403,9 +432,24 @@ async def get_pdu(
continue

if signed_pdu:
MadLittleMods marked this conversation as resolved.
Show resolved Hide resolved
self._get_pdu_cache[event_id] = signed_pdu
self._get_pdu_cache[signed_pdu.event_id] = signed_pdu

# Make sure to return a copy because downstream callers will use this
# event reference directly and change our original, pristine, untouched
# PDU. For example when people mark the event as an `outlier`
# (`event.internal_metadata.outlier = true`), we don't want that to
# propagate back into the cache.
#
# We could get away with only making a new copy of the event when
# pulling from cache but it's probably better to have good hygiene and
# not dirty the cache in the first place as well.
event_copy = make_event_from_dict(
signed_pdu.get_pdu_json(),
signed_pdu.room_version,
internal_metadata_dict=None,
)
MadLittleMods marked this conversation as resolved.
Show resolved Hide resolved

return signed_pdu
return event_copy

async def get_room_state_ids(
self, destination: str, room_id: str, event_id: str
Expand Down
22 changes: 18 additions & 4 deletions synapse/handlers/federation_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -766,10 +766,24 @@ async def _process_pulled_event(
"""
logger.info("Processing pulled event %s", event)

# these should not be outliers.
assert (
not event.internal_metadata.is_outlier()
), "pulled event unexpectedly flagged as outlier"
# This function should not be used to persist outliers (use something
# else) because this does a bunch of operations that aren't necessary
# (extra work; in particular, it makes sure we have all the prev_events
# and resolves the state across those prev events). If you happen to run
# into a situation where the event you're trying to process/backfill is
# marked as an `outlier`, then you should update that spot to return an
# `EventBase` copy that doesn't have `outlier` flag set.
#
# `EventBase` is used to represent both an event we have not yet
# persisted, and one that we have persisted and now keep in the cache.
# In an ideal world this method would only be called with the first type
# of event, but it turns out that's not actually the case and for
# example, you could get an event from cache that is marked as an
# `outlier` (fix up that spot though).
assert not event.internal_metadata.is_outlier(), (
"This is a safe-guard to make sure we're not trying to persist an outlier using this function (use something else). "
"If you're trying to process/backfill an event, this is the right method but you need pass in an event copy that doesn't have `event.internal_metada.outlier = true`."
MadLittleMods marked this conversation as resolved.
Show resolved Hide resolved
)

event_id = event.event_id

Expand Down
23 changes: 20 additions & 3 deletions synapse/storage/databases/main/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -1346,9 +1346,24 @@ def _update_outliers_txn(
event_id: outlier for event_id, outlier in txn
}

logger.debug(
"_update_outliers_txn: events=%s have_persisted=%s",
[ev.event_id for ev, _ in events_and_contexts],
have_persisted,
)

to_remove = set()
for event, context in events_and_contexts:
if event.event_id not in have_persisted:
outlier_persisted = have_persisted.get(event.event_id)
logger.debug(
"_update_outliers_txn: event=%s outlier=%s outlier_persisted=%s",
event.event_id,
event.internal_metadata.is_outlier(),
outlier_persisted,
)

# Ignore events which we haven't persisted at all
if outlier_persisted is None:
continue

to_remove.add(event)
Expand All @@ -1358,7 +1373,6 @@ def _update_outliers_txn(
# was an outlier or not - what we have is at least as good.
continue

outlier_persisted = have_persisted[event.event_id]
if not event.internal_metadata.is_outlier() and outlier_persisted:
# We received a copy of an event that we had already stored as
# an outlier in the database. We now have some state at that event
Expand All @@ -1369,7 +1383,10 @@ def _update_outliers_txn(
# events down /sync. In general they will be historical events, so that
# doesn't matter too much, but that is not always the case.

logger.info("Updating state for ex-outlier event %s", event.event_id)
logger.info(
"_update_outliers_txn: Updating state for ex-outlier event %s",
event.event_id,
)

# insert into event_to_state_groups.
try:
Expand Down
125 changes: 113 additions & 12 deletions tests/federation/test_federation_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from twisted.test.proto_helpers import MemoryReactor

from synapse.api.room_versions import RoomVersions
from synapse.events import EventBase
from synapse.server import HomeServer
from synapse.types import JsonDict
from synapse.util import Clock
Expand All @@ -38,31 +39,35 @@ def prepare(self, reactor: MemoryReactor, clock: Clock, homeserver: HomeServer):
self._mock_agent = mock.create_autospec(twisted.web.client.Agent, spec_set=True)
homeserver.get_federation_http_client().agent = self._mock_agent

def test_get_room_state(self):
creator = f"@creator:{self.OTHER_SERVER_NAME}"
test_room_id = "!room_id"
# Move clock up to somewhat realistic time so the PDU destination retry
# works (`now` needs to be larger than `0 + PDU_RETRY_TIME_MS`).
self.reactor.advance(1000000000)

self.creator = f"@creator:{self.OTHER_SERVER_NAME}"
self.test_room_id = "!room_id"

def test_get_room_state(self):
# mock up some events to use in the response.
# In real life, these would have things in `prev_events` and `auth_events`, but that's
# a bit annoying to mock up, and the code under test doesn't care, so we don't bother.
create_event_dict = self.add_hashes_and_signatures_from_other_server(
{
"room_id": test_room_id,
"room_id": self.test_room_id,
"type": "m.room.create",
"state_key": "",
"sender": creator,
"content": {"creator": creator},
"sender": self.creator,
"content": {"creator": self.creator},
"prev_events": [],
"auth_events": [],
"origin_server_ts": 500,
}
)
member_event_dict = self.add_hashes_and_signatures_from_other_server(
{
"room_id": test_room_id,
"room_id": self.test_room_id,
"type": "m.room.member",
"sender": creator,
"state_key": creator,
"sender": self.creator,
"state_key": self.creator,
"content": {"membership": "join"},
"prev_events": [],
"auth_events": [],
Expand All @@ -71,9 +76,9 @@ def test_get_room_state(self):
)
pl_event_dict = self.add_hashes_and_signatures_from_other_server(
{
"room_id": test_room_id,
"room_id": self.test_room_id,
"type": "m.room.power_levels",
"sender": creator,
"sender": self.creator,
"state_key": "",
"content": {},
"prev_events": [],
Expand Down Expand Up @@ -103,7 +108,7 @@ def test_get_room_state(self):
state_resp, auth_resp = self.get_success(
self.hs.get_federation_client().get_room_state(
"yet_another_server",
test_room_id,
self.test_room_id,
"event_id",
RoomVersions.V9,
)
Expand All @@ -130,6 +135,102 @@ def test_get_room_state(self):
["m.room.create", "m.room.member", "m.room.power_levels"],
)

def test_get_pdu_returns_nothing_when_event_does_not_exist(self):
"""No event should be returned when there the event does not exist"""
MadLittleMods marked this conversation as resolved.
Show resolved Hide resolved
remote_pdu = self.get_success(
self.hs.get_federation_client().get_pdu(
["yet_another_server"],
"event_should_not_exist",
RoomVersions.V9,
)
)
self.assertEqual(remote_pdu, None)

def test_get_pdu(self):
"""Test to make sure an event is returned by `get_pdu()`"""
self._get_pdu_once()

def test_get_pdu_event_from_cache_is_pristine(self):
"""Test that modifications made to events returned by `get_pdu()`
do not propagate back to to the internal cache (events returned should
be a copy).
"""

# Get the PDU in the cache
remote_pdu = self._get_pdu_once()

# Modify the the event reference.
# This change should not make it back to the `_get_pdu_cache`.
remote_pdu.internal_metadata.outlier = True

# Get the event again. This time it should read it from cache.
remote_pdu2 = self.get_success(
self.hs.get_federation_client().get_pdu(
["yet_another_server"],
remote_pdu.event_id,
RoomVersions.V9,
)
)

# Sanity check that we are working against the same event
self.assertEqual(remote_pdu.event_id, remote_pdu2.event_id)

# Make sure the event does not include modification from earlier
self.assertIsNotNone(remote_pdu2)
self.assertEqual(remote_pdu2.internal_metadata.outlier, False)

def _get_pdu_once(self) -> EventBase:
"""Retrieve an event via `get_pdu()` and asserts that an event was returned.
MadLittleMods marked this conversation as resolved.
Show resolved Hide resolved
Also used to prime the cache for subsequent test logic.
"""
message_event_dict = self.add_hashes_and_signatures_from_other_server(
{
"room_id": self.test_room_id,
"type": "m.room.message",
"sender": self.creator,
"state_key": "",
"content": {},
"prev_events": [],
"auth_events": [],
"origin_server_ts": 700,
"depth": 10,
}
)

# mock up the response, and have the agent return it
self._mock_agent.request.side_effect = lambda *args, **kwargs: defer.succeed(
_mock_response(
{
"origin": "yet_another_server",
"origin_server_ts": 900,
"pdus": [
message_event_dict,
],
}
)
)

remote_pdu = self.get_success(
self.hs.get_federation_client().get_pdu(
["yet_another_server"],
"event_id",
RoomVersions.V9,
)
)

# check the right call got made to the agent
self._mock_agent.request.assert_called_once_with(
b"GET",
b"matrix://yet_another_server/_matrix/federation/v1/event/event_id",
headers=mock.ANY,
bodyProducer=None,
)

self.assertIsNotNone(remote_pdu)
self.assertEqual(remote_pdu.internal_metadata.outlier, False)

return remote_pdu


def _mock_response(resp: JsonDict):
body = json.dumps(resp).encode("utf-8")
Expand Down