Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Use the proper serialization format when bundling aggregations #12090

Merged
merged 8 commits into from
Mar 3, 2022
24 changes: 13 additions & 11 deletions synapse/appservice/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
TransactionUnusedFallbackKeys,
)
from synapse.events import EventBase
from synapse.events.utils import serialize_event
from synapse.events.utils import SerializeEventConfig, serialize_event
from synapse.http.client import SimpleHttpClient
from synapse.types import JsonDict, ThirdPartyInstanceID
from synapse.util.caches.response_cache import ResponseCache
Expand Down Expand Up @@ -321,16 +321,18 @@ def _serialize(
serialize_event(
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Part of me thinks we should be consistent and use EventSerializer here, but it doesn't really matter unless we might bundled aggregations. I dislike that this is two separate methods though.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sounds like it might be worthwhile as an extra pass if you have the time and it looks sensible as you're doing it?

e,
time_now,
as_client_event=True,
# If this is an invite or a knock membership event, and we're interested
# in this user, then include any stripped state alongside the event.
include_stripped_room_state=(
e.type == EventTypes.Member
and (
e.membership == Membership.INVITE
or e.membership == Membership.KNOCK
)
and service.is_interested_in_user(e.state_key)
config=SerializeEventConfig(
as_client_event=True,
# If this is an invite or a knock membership event, and we're interested
# in this user, then include any stripped state alongside the event.
include_stripped_room_state=(
e.type == EventTypes.Member
and (
e.membership == Membership.INVITE
or e.membership == Membership.KNOCK
)
and service.is_interested_in_user(e.state_key)
),
),
)
for e in events
Expand Down
84 changes: 58 additions & 26 deletions synapse/events/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
Union,
)

import attr
from frozendict import frozendict

from synapse.api.constants import EventContentFields, EventTypes, RelationTypes
Expand Down Expand Up @@ -303,29 +304,37 @@ def format_event_for_client_v2_without_room_id(d: JsonDict) -> JsonDict:
return d


@attr.s(slots=True, frozen=True, auto_attribs=True)
class SerializeEventConfig:
as_client_event: bool = True
# Function to convert from federation format to client format
event_format: Callable[[JsonDict], JsonDict] = format_event_for_client_v1
# ID of the user's auth token - used for namespacing of transaction IDs
token_id: Optional[int] = None
# List of event fields to include. If empty, all fields will be returned.
only_event_fields: Optional[List[str]] = None
# Some events can have stripped room state stored in the `unsigned` field.
# This is required for invite and knock functionality. If this option is
# False, that state will be removed from the event before it is returned.
# Otherwise, it will be kept.
include_stripped_room_state: bool = False


_DEFAULT_SERIALIZE_EVENT_CONFIG = SerializeEventConfig()


def serialize_event(
e: Union[JsonDict, EventBase],
time_now_ms: int,
*,
as_client_event: bool = True,
event_format: Callable[[JsonDict], JsonDict] = format_event_for_client_v1,
token_id: Optional[str] = None,
only_event_fields: Optional[List[str]] = None,
include_stripped_room_state: bool = False,
config: SerializeEventConfig = _DEFAULT_SERIALIZE_EVENT_CONFIG,
) -> JsonDict:
"""Serialize event for clients

Args:
e
time_now_ms
as_client_event
event_format
token_id
only_event_fields
include_stripped_room_state: Some events can have stripped room state
stored in the `unsigned` field. This is required for invite and knock
functionality. If this option is False, that state will be removed from the
event before it is returned. Otherwise, it will be kept.
config: Event serialization config

Returns:
The serialized event dictionary.
Expand All @@ -348,11 +357,13 @@ def serialize_event(

if "redacted_because" in e.unsigned:
d["unsigned"]["redacted_because"] = serialize_event(
e.unsigned["redacted_because"], time_now_ms, event_format=event_format
e.unsigned["redacted_because"],
time_now_ms,
config=SerializeEventConfig(event_format=config.event_format),
)

if token_id is not None:
if token_id == getattr(e.internal_metadata, "token_id", None):
if config.token_id is not None:
if config.token_id == getattr(e.internal_metadata, "token_id", None):
txn_id = getattr(e.internal_metadata, "txn_id", None)
if txn_id is not None:
d["unsigned"]["transaction_id"] = txn_id
Expand All @@ -361,13 +372,14 @@ def serialize_event(
# that are meant to provide metadata about a room to an invitee/knocker. They are
# intended to only be included in specific circumstances, such as down sync, and
# should not be included in any other case.
if not include_stripped_room_state:
if not config.include_stripped_room_state:
d["unsigned"].pop("invite_room_state", None)
d["unsigned"].pop("knock_room_state", None)

if as_client_event:
d = event_format(d)
if config.as_client_event:
d = config.event_format(d)

only_event_fields = config.only_event_fields
if only_event_fields:
if not isinstance(only_event_fields, list) or not all(
isinstance(f, str) for f in only_event_fields
Expand All @@ -390,18 +402,18 @@ def serialize_event(
event: Union[JsonDict, EventBase],
time_now: int,
*,
config: SerializeEventConfig = _DEFAULT_SERIALIZE_EVENT_CONFIG,
bundle_aggregations: Optional[Dict[str, "BundledAggregations"]] = None,
**kwargs: Any,
) -> JsonDict:
"""Serializes a single event.

Args:
event: The event being serialized.
time_now: The current time in milliseconds
config: Event serialization config
bundle_aggregations: Whether to include the bundled aggregations for this
event. Only applies to non-state events. (State events never include
bundled aggregations.)
**kwargs: Arguments to pass to `serialize_event`

Returns:
The serialized event
Expand All @@ -410,7 +422,7 @@ def serialize_event(
if not isinstance(event, EventBase):
return event

serialized_event = serialize_event(event, time_now, **kwargs)
serialized_event = serialize_event(event, time_now, config=config)

# Check if there are any bundled aggregations to include with the event.
if bundle_aggregations:
Expand All @@ -419,6 +431,7 @@ def serialize_event(
self._inject_bundled_aggregations(
event,
time_now,
config,
bundle_aggregations[event.event_id],
serialized_event,
)
Expand Down Expand Up @@ -456,6 +469,7 @@ def _inject_bundled_aggregations(
self,
event: EventBase,
time_now: int,
config: SerializeEventConfig,
aggregations: "BundledAggregations",
serialized_event: JsonDict,
) -> None:
Expand All @@ -466,6 +480,7 @@ def _inject_bundled_aggregations(
time_now: The current time in milliseconds
aggregations: The bundled aggregation to serialize.
serialized_event: The serialized event which may be modified.
config: Event serialization config

"""
serialized_aggregations = {}
Expand Down Expand Up @@ -494,7 +509,10 @@ def _inject_bundled_aggregations(

# Don't bundle aggregations as this could recurse forever.
serialized_latest_event = self.serialize_event(
thread.latest_event, time_now, bundle_aggregations=None
thread.latest_event,
time_now,
config=config,
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know if this is correct or if we should only pass in the format, similar to what we do for redactions:

if "redacted_because" in e.unsigned:
d["unsigned"]["redacted_because"] = serialize_event(
e.unsigned["redacted_because"], time_now_ms, event_format=event_format
)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was discussed a bit in #synapse-dev:matrix.org and we agree that things should definitely be consistent. @erikjohnston thinks that:

Yeah, my natural instinct is that we should propagate the options for both. In practice I doubt it matters tooo much

I'm going to make that change here.

bundle_aggregations=None,
)
# Manually apply an edit, if one exists.
if thread.latest_edit:
Expand All @@ -515,20 +533,34 @@ def _inject_bundled_aggregations(
)

def serialize_events(
self, events: Iterable[Union[JsonDict, EventBase]], time_now: int, **kwargs: Any
self,
events: Iterable[Union[JsonDict, EventBase]],
time_now: int,
*,
config: SerializeEventConfig = _DEFAULT_SERIALIZE_EVENT_CONFIG,
bundle_aggregations: Optional[Dict[str, "BundledAggregations"]] = None,
) -> List[JsonDict]:
"""Serializes multiple events.

Args:
event
time_now: The current time in milliseconds
**kwargs: Arguments to pass to `serialize_event`
config: Event serialization config
bundle_aggregations: Whether to include the bundled aggregations for this
event. Only applies to non-state events. (State events never include
bundled aggregations.)

Returns:
The list of serialized events
"""
return [
self.serialize_event(event, time_now=time_now, **kwargs) for event in events
self.serialize_event(
event,
time_now,
config=config,
bundle_aggregations=bundle_aggregations,
)
for event in events
]


Expand Down
3 changes: 2 additions & 1 deletion synapse/handlers/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from synapse.api.constants import EduTypes, EventTypes, Membership
from synapse.api.errors import AuthError, SynapseError
from synapse.events import EventBase
from synapse.events.utils import SerializeEventConfig
from synapse.handlers.presence import format_user_presence_state
from synapse.streams.config import PaginationConfig
from synapse.types import JsonDict, UserID
Expand Down Expand Up @@ -120,7 +121,7 @@ async def get_stream(
chunks = self._event_serializer.serialize_events(
events,
time_now,
as_client_event=as_client_event,
config=SerializeEventConfig(as_client_event=as_client_event),
)

chunk = {
Expand Down
9 changes: 6 additions & 3 deletions synapse/handlers/initial_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from synapse.api.constants import EduTypes, EventTypes, Membership
from synapse.api.errors import SynapseError
from synapse.events import EventBase
from synapse.events.utils import SerializeEventConfig
from synapse.events.validator import EventValidator
from synapse.handlers.presence import format_user_presence_state
from synapse.handlers.receipts import ReceiptEventSource
Expand Down Expand Up @@ -156,6 +157,8 @@ async def _snapshot_all_rooms(
if limit is None:
limit = 10

serializer_options = SerializeEventConfig(as_client_event=as_client_event)

async def handle_room(event: RoomsForUser) -> None:
d: JsonDict = {
"room_id": event.room_id,
Expand All @@ -173,7 +176,7 @@ async def handle_room(event: RoomsForUser) -> None:
d["invite"] = self._event_serializer.serialize_event(
invite_event,
time_now,
as_client_event=as_client_event,
config=serializer_options,
)

rooms_ret.append(d)
Expand Down Expand Up @@ -225,7 +228,7 @@ async def handle_room(event: RoomsForUser) -> None:
self._event_serializer.serialize_events(
messages,
time_now=time_now,
as_client_event=as_client_event,
config=serializer_options,
)
),
"start": await start_token.to_string(self.store),
Expand All @@ -235,7 +238,7 @@ async def handle_room(event: RoomsForUser) -> None:
d["state"] = self._event_serializer.serialize_events(
current_state.values(),
time_now=time_now,
as_client_event=as_client_event,
config=serializer_options,
)

account_data_events = []
Expand Down
7 changes: 5 additions & 2 deletions synapse/handlers/pagination.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from synapse.api.constants import EventTypes, Membership
from synapse.api.errors import SynapseError
from synapse.api.filtering import Filter
from synapse.events.utils import SerializeEventConfig
from synapse.handlers.room import ShutdownRoomResponse
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.storage.state import StateFilter
Expand Down Expand Up @@ -541,13 +542,15 @@ async def get_messages(

time_now = self.clock.time_msec()

serialize_options = SerializeEventConfig(as_client_event=as_client_event)

chunk = {
"chunk": (
self._event_serializer.serialize_events(
events,
time_now,
config=serialize_options,
bundle_aggregations=aggregations,
as_client_event=as_client_event,
)
),
"start": await from_token.to_string(self.store),
Expand All @@ -556,7 +559,7 @@ async def get_messages(

if state:
chunk["state"] = self._event_serializer.serialize_events(
state, time_now, as_client_event=as_client_event
state, time_now, config=serialize_options
)

return chunk
Expand Down
9 changes: 7 additions & 2 deletions synapse/rest/client/notifications.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@
from typing import TYPE_CHECKING, Tuple

from synapse.api.constants import ReceiptTypes
from synapse.events.utils import format_event_for_client_v2_without_room_id
from synapse.events.utils import (
SerializeEventConfig,
format_event_for_client_v2_without_room_id,
)
from synapse.http.server import HttpServer
from synapse.http.servlet import RestServlet, parse_integer, parse_string
from synapse.http.site import SynapseRequest
Expand Down Expand Up @@ -75,7 +78,9 @@ async def on_GET(self, request: SynapseRequest) -> Tuple[int, JsonDict]:
self._event_serializer.serialize_event(
notif_events[pa.event_id],
self.clock.time_msec(),
event_format=format_event_for_client_v2_without_room_id,
config=SerializeEventConfig(
event_format=format_event_for_client_v2_without_room_id
),
)
),
}
Expand Down
Loading