Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Add local support for the new spaces summary endpoint #10549

Merged
merged 10 commits into from
Aug 10, 2021
123 changes: 120 additions & 3 deletions synapse/handlers/space_summary.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
from collections import deque
from typing import (
TYPE_CHECKING,
Collection,
Deque,
Dict,
Iterable,
List,
Expand All @@ -38,9 +38,11 @@
Membership,
RoomTypes,
)
from synapse.api.errors import Codes, SynapseError
from synapse.events import EventBase
from synapse.events.utils import format_event_for_client_v2
from synapse.types import JsonDict
from synapse.util.stringutils import random_string

if TYPE_CHECKING:
from synapse.server import HomeServer
Expand All @@ -67,6 +69,12 @@ def __init__(self, hs: "HomeServer"):
self._server_name = hs.hostname
self._federation_client = hs.get_federation_client()

# TODO Allow for multiple workers to share this data.
# TODO Expire pagination tokens.
self._pagination_sessions: Dict[
str, Tuple[Deque[_RoomQueueEntry], Set[str]]
] = {}
clokep marked this conversation as resolved.
Show resolved Hide resolved

async def get_space_summary(
self,
requester: str,
Expand Down Expand Up @@ -130,7 +138,7 @@ async def get_space_summary(
requester, None, room_id, suggested_only, max_children
)

events: Collection[JsonDict] = []
events: Sequence[JsonDict] = []
if room_entry:
rooms_result.append(room_entry.room)
events = room_entry.children
Expand Down Expand Up @@ -245,6 +253,110 @@ async def get_space_summary(

return {"rooms": rooms_result, "events": events_result}

async def get_room_hierarchy(
self,
requester: str,
room_id: str,
suggested_only: bool = False,
max_depth: Optional[int] = None,
limit: Optional[int] = None,
from_token: Optional[str] = None,
) -> JsonDict:
"""
Implementation of the room hierarchy C-S API.

Args:
requester: The user ID of the user making this request.
room_id: The room ID to start the summary at (the "root" room).
suggested_only: Whether we should only return children with the "suggested"
flag set.
max_depth: The maximum depth in the tree to explore, must be a
non-negative integer.

0 would correspond to just the root room, 1 would include just
the root room's children, etc.
limit: An optional limit on the number of rooms to return per
page. Must be a positive integer.
from_token: An optional pagination token.

Returns:
The JSON hierarchy dictionary.
"""
# first of all, check that the user is in the room in question (or it's
# world-readable)
await self._auth.check_user_in_room_or_world_readable(room_id, requester)

# If this is continuing a previous session, pull the persisted data.
if from_token:
if from_token not in self._pagination_sessions:
raise SynapseError(400, "Unknown pagination token", Codes.INVALID_PARAM)

# TODO Assert that other parameters have not changed.
(room_queue, processed_rooms) = self._pagination_sessions[from_token]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is going to go fairly badly wrong when we get two concurrent requests with the same from_token, and both end up popping entries from the same Deque. We might need a ResponseCache to provide locking? Or to copy the deque so that we can safely process both requests?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A response cache sounds like it would be good regardless, since this could be a somewhat heavy calculation?

else:
# the queue of rooms to process
room_queue = deque((_RoomQueueEntry(room_id, ()),))

# Rooms we have already processed.
processed_rooms = set()

rooms_result: List[JsonDict] = []

# Cap the limit to a server-side maximum.
if limit is None:
limit = MAX_ROOMS
else:
limit = min(limit, MAX_ROOMS)

# Iterate through the queue until we reach the limit or run out of
# rooms to include.
while room_queue and len(rooms_result) < limit:
queue_entry = room_queue.popleft()
room_id = queue_entry.room_id
if room_id in processed_rooms:
# already done this room
continue

logger.debug("Processing room %s", room_id)

is_in_room = await self._store.is_host_joined(room_id, self._server_name)
if is_in_room:
room_entry = await self._summarize_local_room(
requester,
None,
room_id,
suggested_only,
# TODO Handle max children.
max_children=None,
)

if room_entry:
rooms_result.append(room_entry.as_json())

# Add the child to the queue. We have already validated
# that the vias are a list of server names.
#
# TODO Handle max_depth
room_queue.extendleft(
_RoomQueueEntry(ev["state_key"], ev["content"]["via"])
for ev in reversed(room_entry.children)
)

processed_rooms.add(room_id)
else:
# TODO Federation.
pass

result: JsonDict = {"rooms": rooms_result}

# If there's additional data, generate a pagination token (and persist state).
if room_queue:
next_token = random_string(24)
result["next_token"] = next_token
self._pagination_sessions[next_token] = (room_queue, processed_rooms)

return result

async def federation_space_summary(
self,
origin: str,
Expand Down Expand Up @@ -655,7 +767,12 @@ class _RoomEntry:
# An iterable of the sorted, stripped children events for children of this room.
#
# This may not include all children.
children: Collection[JsonDict] = ()
children: Sequence[JsonDict] = ()

def as_json(self) -> JsonDict:
result = dict(self.room)
result["children_state"] = self.children
return result


def _has_valid_via(e: EventBase) -> bool:
Expand Down
41 changes: 41 additions & 0 deletions synapse/rest/client/v1/room.py
Original file line number Diff line number Diff line change
Expand Up @@ -1445,6 +1445,46 @@ async def on_POST(
)


class RoomHierarchyRestServlet(RestServlet):
PATTERNS = (
re.compile(
"^/_matrix/client/unstable/org.matrix.msc2946"
"/rooms/(?P<room_id>[^/]*)/hierarchy$"
),
)

def __init__(self, hs: "HomeServer"):
super().__init__()
self._auth = hs.get_auth()
self._space_summary_handler = hs.get_space_summary_handler()

async def on_GET(
self, request: SynapseRequest, room_id: str
) -> Tuple[int, JsonDict]:
requester = await self._auth.get_user_by_req(request, allow_guest=True)

max_depth = parse_integer(request, "max_depth")
if max_depth is not None and max_depth < 0:
raise SynapseError(
400, "'max_depth' must be a non-negative integer", Codes.BAD_JSON
)

limit = parse_integer(request, "limit")
if limit is not None and limit <= 0:
raise SynapseError(
400, "'limit' must be a positive integer", Codes.BAD_JSON
)

return 200, await self._space_summary_handler.get_room_hierarchy(
requester.user.to_string(),
room_id,
suggested_only=parse_boolean(request, "suggested_only", default=False),
max_depth=max_depth,
limit=limit,
from_token=parse_string(request, "from"),
)


def register_servlets(hs: "HomeServer", http_server, is_worker=False):
msc2716_enabled = hs.config.experimental.msc2716_enabled

Expand All @@ -1463,6 +1503,7 @@ def register_servlets(hs: "HomeServer", http_server, is_worker=False):
RoomTypingRestServlet(hs).register(http_server)
RoomEventContextServlet(hs).register(http_server)
RoomSpaceSummaryRestServlet(hs).register(http_server)
RoomHierarchyRestServlet(hs).register(http_server)
RoomEventServlet(hs).register(http_server)
JoinedRoomsRestServlet(hs).register(http_server)
RoomAliasListServlet(hs).register(http_server)
Expand Down
Loading