Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Bundle aggregations outside of the serialization method #11612

Merged
merged 4 commits into from
Jan 7, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
115 changes: 33 additions & 82 deletions synapse/events/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,7 @@
# limitations under the License.
import collections.abc
import re
from typing import (
TYPE_CHECKING,
Any,
Callable,
Dict,
Iterable,
List,
Mapping,
Optional,
Union,
)
from typing import Any, Callable, Dict, Iterable, List, Mapping, Optional, Union

from frozendict import frozendict

Expand All @@ -37,9 +27,6 @@

from . import EventBase

if TYPE_CHECKING:
from synapse.server import HomeServer

# Split strings on "." but not "\." This uses a negative lookbehind assertion for '\'
# (?<!stuff) matches if the current position in the string is not preceded
# by a match for 'stuff'.
Expand Down Expand Up @@ -385,17 +372,12 @@ class EventClientSerializer:
clients.
"""

def __init__(self, hs: "HomeServer"):
self.store = hs.get_datastore()
self._msc1849_enabled = hs.config.experimental.msc1849_enabled
self._msc3440_enabled = hs.config.experimental.msc3440_enabled

async def serialize_event(
self,
event: Union[JsonDict, EventBase],
time_now: int,
*,
bundle_aggregations: bool = False,
bundle_aggregations: Optional[Dict[str, JsonDict]] = None,
**kwargs: Any,
) -> JsonDict:
"""Serializes a single event.
Expand All @@ -418,66 +400,41 @@ async def serialize_event(
serialized_event = serialize_event(event, time_now, **kwargs)

# Check if there are any bundled aggregations to include with the event.
#
# Do not bundle aggregations if any of the following at true:
#
# * Support is disabled via the configuration or the caller.
# * The event is a state event.
# * The event has been redacted.
if (
self._msc1849_enabled
and bundle_aggregations
and not event.is_state()
and not event.internal_metadata.is_redacted()
Comment on lines -430 to -431
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This bit moved to the top of the helper function in the previous commit.

):
await self._injected_bundled_aggregations(event, time_now, serialized_event)
if bundle_aggregations:
event_aggregations = bundle_aggregations.get(event.event_id)
if event_aggregations:
await self._injected_bundled_aggregations(
event,
time_now,
bundle_aggregations[event.event_id],
serialized_event,
)

return serialized_event

async def _injected_bundled_aggregations(
self, event: EventBase, time_now: int, serialized_event: JsonDict
self,
event: EventBase,
time_now: int,
aggregations: JsonDict,
serialized_event: JsonDict,
) -> None:
"""Potentially injects bundled aggregations into the unsigned portion of the serialized event.

Args:
event: The event being serialized.
time_now: The current time in milliseconds
aggregations: The bundled aggregation to serialize.
serialized_event: The serialized event which may be modified.

"""
# Do not bundle aggregations for an event which represents an edit or an
# annotation. It does not make sense for them to have related events.
relates_to = event.content.get("m.relates_to")
if isinstance(relates_to, (dict, frozendict)):
relation_type = relates_to.get("rel_type")
if relation_type in (RelationTypes.ANNOTATION, RelationTypes.REPLACE):
return

event_id = event.event_id
room_id = event.room_id

# The bundled aggregations to include.
aggregations = {}

annotations = await self.store.get_aggregation_groups_for_event(
event_id, room_id
)
if annotations.chunk:
aggregations[RelationTypes.ANNOTATION] = annotations.to_dict()

references = await self.store.get_relations_for_event(
event_id, room_id, RelationTypes.REFERENCE, direction="f"
)
if references.chunk:
aggregations[RelationTypes.REFERENCE] = references.to_dict()
# Make a copy in-case the object is cached.
aggregations = aggregations.copy()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The changes in this function seem to be:

  • the logic which builds aggregations is pulled out to the helper in the previous commit
  • aggregations are precalculated and passed into this function
  • any side effects from the old code are maintained: read the relevant data from aggregations and kick off those same side effects

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My hope is that it is identical logic, yep! This still does the serialization of the aggregations, but that's perfectly reasonable I think.

Comment on lines +430 to +431
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need this? Ahh yes: there's a mutation on +469.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wondered if it would make more sense to make the input to this an attrs class so that the types are much clearer, which would mean we would build the dictionary here instead. Would that be clearer? (I think I'd rather do that as a follow-up since it is a decent amount of changes, but could do it here if you'd like!)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that'd be nice, but not crucial. Sounds like it'd be best as a separate change to me.


edit = None
if event.type == EventTypes.Message:
edit = await self.store.get_applicable_edit(event_id, room_id)

if edit:
if RelationTypes.REPLACE in aggregations:
# If there is an edit replace the content, preserving existing
# relations.
edit = aggregations[RelationTypes.REPLACE]

# Ensure we take copies of the edit content, otherwise we risk modifying
# the original event.
Expand All @@ -502,26 +459,20 @@ async def _injected_bundled_aggregations(
}

# If this event is the start of a thread, include a summary of the replies.
if self._msc3440_enabled:
(
thread_count,
latest_thread_event,
) = await self.store.get_thread_summary(event_id, room_id)
if latest_thread_event:
aggregations[RelationTypes.THREAD] = {
# Don't bundle aggregations as this could recurse forever.
"latest_event": await self.serialize_event(
latest_thread_event, time_now, bundle_aggregations=False
),
"count": thread_count,
}

# If any bundled aggregations were found, include them.
if aggregations:
serialized_event["unsigned"].setdefault("m.relations", {}).update(
aggregations
if RelationTypes.THREAD in aggregations:
# Serialize the latest thread event.
latest_thread_event = aggregations[RelationTypes.THREAD]["latest_event"]

# Don't bundle aggregations as this could recurse forever.
aggregations[RelationTypes.THREAD][
"latest_event"
] = await self.serialize_event(
latest_thread_event, time_now, bundle_aggregations=None
)

# Include the bundled aggregations in the event.
serialized_event["unsigned"].setdefault("m.relations", {}).update(aggregations)

async def serialize_events(
self, events: Iterable[Union[JsonDict, EventBase]], time_now: int, **kwargs: Any
) -> List[JsonDict]:
Expand Down
4 changes: 3 additions & 1 deletion synapse/handlers/pagination.py
Original file line number Diff line number Diff line change
Expand Up @@ -537,14 +537,16 @@ async def get_messages(
state_dict = await self.store.get_events(list(state_ids.values()))
state = state_dict.values()

aggregations = await self.store.get_bundled_aggregations(events)

time_now = self.clock.time_msec()

chunk = {
"chunk": (
await self._event_serializer.serialize_events(
events,
time_now,
bundle_aggregations=True,
bundle_aggregations=aggregations,
as_client_event=as_client_event,
)
),
Expand Down
10 changes: 10 additions & 0 deletions synapse/handlers/room.py
Original file line number Diff line number Diff line change
Expand Up @@ -1177,6 +1177,16 @@ async def filter_evts(events: List[EventBase]) -> List[EventBase]:
# `filtered` rather than the event we retrieved from the datastore.
results["event"] = filtered[0]

# Fetch the aggregations.
aggregations = await self.store.get_bundled_aggregations([results["event"]])
aggregations.update(
await self.store.get_bundled_aggregations(results["events_before"])
)
aggregations.update(
await self.store.get_bundled_aggregations(results["events_after"])
)
results["aggregations"] = aggregations

if results["events_after"]:
last_event_id = results["events_after"][-1].event_id
else:
Expand Down
6 changes: 3 additions & 3 deletions synapse/rest/admin/rooms.py
Original file line number Diff line number Diff line change
Expand Up @@ -747,17 +747,17 @@ async def on_GET(
results["events_before"] = await self._event_serializer.serialize_events(
results["events_before"],
time_now,
bundle_aggregations=True,
bundle_aggregations=results["aggregations"],
)
results["event"] = await self._event_serializer.serialize_event(
results["event"],
time_now,
bundle_aggregations=True,
bundle_aggregations=results["aggregations"],
)
results["events_after"] = await self._event_serializer.serialize_events(
results["events_after"],
time_now,
bundle_aggregations=True,
bundle_aggregations=results["aggregations"],
)
results["state"] = await self._event_serializer.serialize_events(
results["state"], time_now
Expand Down
5 changes: 3 additions & 2 deletions synapse/rest/client/relations.py
Original file line number Diff line number Diff line change
Expand Up @@ -228,12 +228,13 @@ async def on_GET(
# Do not bundle aggregations when retrieving the original event because
# we want the content before relations are applied to it.
original_event = await self._event_serializer.serialize_event(
event, now, bundle_aggregations=False
event, now, bundle_aggregations=None
)
# The relations returned for the requested event do include their
# bundled aggregations.
aggregations = await self.store.get_bundled_aggregations(events)
serialized_events = await self._event_serializer.serialize_events(
events, now, bundle_aggregations=True
events, now, bundle_aggregations=aggregations
)

return_value = pagination_chunk.to_dict()
Expand Down
18 changes: 13 additions & 5 deletions synapse/rest/client/room.py
Original file line number Diff line number Diff line change
Expand Up @@ -642,6 +642,7 @@ class RoomEventServlet(RestServlet):
def __init__(self, hs: "HomeServer"):
super().__init__()
self.clock = hs.get_clock()
self._store = hs.get_datastore()
self.event_handler = hs.get_event_handler()
self._event_serializer = hs.get_event_client_serializer()
self.auth = hs.get_auth()
Expand All @@ -660,10 +661,13 @@ async def on_GET(
# https://matrix.org/docs/spec/client_server/r0.5.0#get-matrix-client-r0-rooms-roomid-event-eventid
raise SynapseError(404, "Event not found.", errcode=Codes.NOT_FOUND)

time_now = self.clock.time_msec()
if event:
# Ensure there are bundled aggregations available.
aggregations = await self._store.get_bundled_aggregations([event])

time_now = self.clock.time_msec()
event_dict = await self._event_serializer.serialize_event(
event, time_now, bundle_aggregations=True
event, time_now, bundle_aggregations=aggregations
)
return 200, event_dict

Expand Down Expand Up @@ -709,13 +713,17 @@ async def on_GET(

time_now = self.clock.time_msec()
results["events_before"] = await self._event_serializer.serialize_events(
results["events_before"], time_now, bundle_aggregations=True
results["events_before"],
time_now,
bundle_aggregations=results["aggregations"],
)
results["event"] = await self._event_serializer.serialize_event(
results["event"], time_now, bundle_aggregations=True
results["event"], time_now, bundle_aggregations=results["aggregations"]
)
results["events_after"] = await self._event_serializer.serialize_events(
results["events_after"], time_now, bundle_aggregations=True
results["events_after"],
time_now,
bundle_aggregations=results["aggregations"],
)
results["state"] = await self._event_serializer.serialize_events(
results["state"], time_now
Expand Down
32 changes: 19 additions & 13 deletions synapse/rest/client/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -525,21 +525,14 @@ async def encode_room(
The room, encoded in our response format
"""

def serialize(events: Iterable[EventBase]) -> Awaitable[List[JsonDict]]:
def serialize(
events: Iterable[EventBase],
aggregations: Optional[Dict[str, Dict[str, Any]]] = None,
) -> Awaitable[List[JsonDict]]:
return self._event_serializer.serialize_events(
events,
time_now=time_now,
# Don't bother to bundle aggregations if the timeline is unlimited,
# as clients will have all the necessary information.
# bundle_aggregations=room.timeline.limited,
#
# richvdh 2021-12-15: disable this temporarily as it has too high an
# overhead for initialsyncs. We need to figure out a way that the
# bundling can be done *before* the events are stored in the
# SyncResponseCache so that this part can be synchronous.
#
# Ensure to re-enable the test at tests/rest/client/test_relations.py::RelationsTestCase.test_bundled_aggregations.
bundle_aggregations=False,
bundle_aggregations=aggregations,
token_id=token_id,
event_format=event_formatter,
only_event_fields=only_fields,
Expand All @@ -562,7 +555,20 @@ def serialize(events: Iterable[EventBase]) -> Awaitable[List[JsonDict]]:
)

serialized_state = await serialize(state_events)
serialized_timeline = await serialize(timeline_events)
# Don't bother to bundle aggregations if the timeline is unlimited,
# as clients will have all the necessary information.
# bundle_aggregations=room.timeline.limited,
#
# richvdh 2021-12-15: disable this temporarily as it has too high an
# overhead for initialsyncs. We need to figure out a way that the
# bundling can be done *before* the events are stored in the
# SyncResponseCache so that this part can be synchronous.
#
# Ensure to re-enable the test at tests/rest/client/test_relations.py::RelationsTestCase.test_bundled_aggregations.
# if room.timeline.limited:
# aggregations = await self.store.get_bundled_aggregations(timeline_events)
aggregations = None
serialized_timeline = await serialize(timeline_events, aggregations)

account_data = room.account_data

Expand Down
2 changes: 1 addition & 1 deletion synapse/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -759,7 +759,7 @@ def get_oidc_handler(self) -> "OidcHandler":

@cache_in_self
def get_event_client_serializer(self) -> EventClientSerializer:
return EventClientSerializer(self)
return EventClientSerializer()

@cache_in_self
def get_password_policy_handler(self) -> PasswordPolicyHandler:
Expand Down