Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Add local support for the new spaces summary endpoint (MSC2946) (#10549)
Browse files Browse the repository at this point in the history
This adds support for the /hierarchy endpoint, which is an
update to MSC2946. Currently this only supports rooms known
locally to the homeserver.
  • Loading branch information
clokep authored Aug 10, 2021
1 parent 691593b commit fe1d0c8
Show file tree
Hide file tree
Showing 6 changed files with 521 additions and 112 deletions.
2 changes: 1 addition & 1 deletion changelog.d/10527.misc
Original file line number Diff line number Diff line change
@@ -1 +1 @@
Prepare for the new spaces summary endpoint (updates to [MSC2946](https://github.com/matrix-org/matrix-doc/pull/2946)).
Add pagination to the spaces summary based on updates to [MSC2946](https://github.com/matrix-org/matrix-doc/pull/2946).
2 changes: 1 addition & 1 deletion changelog.d/10530.misc
Original file line number Diff line number Diff line change
@@ -1 +1 @@
Prepare for the new spaces summary endpoint (updates to [MSC2946](https://github.com/matrix-org/matrix-doc/pull/2946)).
Add pagination to the spaces summary based on updates to [MSC2946](https://github.com/matrix-org/matrix-doc/pull/2946).
1 change: 1 addition & 0 deletions changelog.d/10549.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add pagination to the spaces summary based on updates to [MSC2946](https://github.com/matrix-org/matrix-doc/pull/2946).
201 changes: 198 additions & 3 deletions synapse/handlers/space_summary.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
from collections import deque
from typing import (
TYPE_CHECKING,
Collection,
Deque,
Dict,
Iterable,
List,
Expand All @@ -38,9 +38,12 @@
Membership,
RoomTypes,
)
from synapse.api.errors import Codes, SynapseError
from synapse.events import EventBase
from synapse.events.utils import format_event_for_client_v2
from synapse.types import JsonDict
from synapse.util.caches.response_cache import ResponseCache
from synapse.util.stringutils import random_string

if TYPE_CHECKING:
from synapse.server import HomeServer
Expand All @@ -57,6 +60,29 @@
MAX_SERVERS_PER_SPACE = 3


@attr.s(slots=True, frozen=True, auto_attribs=True)
class _PaginationKey:
"""The key used to find unique pagination session."""

# The first three entries match the request parameters (and cannot change
# during a pagination session).
room_id: str
suggested_only: bool
max_depth: Optional[int]
# The randomly generated token.
token: str


@attr.s(slots=True, frozen=True, auto_attribs=True)
class _PaginationSession:
"""The information that is stored for pagination."""

# The queue of rooms which are still to process.
room_queue: Deque["_RoomQueueEntry"]
# A set of rooms which have been processed.
processed_rooms: Set[str]


class SpaceSummaryHandler:
def __init__(self, hs: "HomeServer"):
self._clock = hs.get_clock()
Expand All @@ -67,6 +93,21 @@ def __init__(self, hs: "HomeServer"):
self._server_name = hs.hostname
self._federation_client = hs.get_federation_client()

# A map of query information to the current pagination state.
#
# TODO Allow for multiple workers to share this data.
# TODO Expire pagination tokens.
self._pagination_sessions: Dict[_PaginationKey, _PaginationSession] = {}

# If a user tries to fetch the same page multiple times in quick succession,
# only process the first attempt and return its result to subsequent requests.
self._pagination_response_cache: ResponseCache[
Tuple[str, bool, Optional[int], Optional[int], Optional[str]]
] = ResponseCache(
hs.get_clock(),
"get_room_hierarchy",
)

async def get_space_summary(
self,
requester: str,
Expand Down Expand Up @@ -130,7 +171,7 @@ async def get_space_summary(
requester, None, room_id, suggested_only, max_children
)

events: Collection[JsonDict] = []
events: Sequence[JsonDict] = []
if room_entry:
rooms_result.append(room_entry.room)
events = room_entry.children
Expand Down Expand Up @@ -207,6 +248,154 @@ async def get_space_summary(

return {"rooms": rooms_result, "events": events_result}

async def get_room_hierarchy(
self,
requester: str,
requested_room_id: str,
suggested_only: bool = False,
max_depth: Optional[int] = None,
limit: Optional[int] = None,
from_token: Optional[str] = None,
) -> JsonDict:
"""
Implementation of the room hierarchy C-S API.
Args:
requester: The user ID of the user making this request.
requested_room_id: The room ID to start the hierarchy at (the "root" room).
suggested_only: Whether we should only return children with the "suggested"
flag set.
max_depth: The maximum depth in the tree to explore, must be a
non-negative integer.
0 would correspond to just the root room, 1 would include just
the root room's children, etc.
limit: An optional limit on the number of rooms to return per
page. Must be a positive integer.
from_token: An optional pagination token.
Returns:
The JSON hierarchy dictionary.
"""
# If a user tries to fetch the same page multiple times in quick succession,
# only process the first attempt and return its result to subsequent requests.
#
# This is due to the pagination process mutating internal state, attempting
# to process multiple requests for the same page will result in errors.
return await self._pagination_response_cache.wrap(
(requested_room_id, suggested_only, max_depth, limit, from_token),
self._get_room_hierarchy,
requester,
requested_room_id,
suggested_only,
max_depth,
limit,
from_token,
)

async def _get_room_hierarchy(
self,
requester: str,
requested_room_id: str,
suggested_only: bool = False,
max_depth: Optional[int] = None,
limit: Optional[int] = None,
from_token: Optional[str] = None,
) -> JsonDict:
"""See docstring for SpaceSummaryHandler.get_room_hierarchy."""

# first of all, check that the user is in the room in question (or it's
# world-readable)
await self._auth.check_user_in_room_or_world_readable(
requested_room_id, requester
)

# If this is continuing a previous session, pull the persisted data.
if from_token:
pagination_key = _PaginationKey(
requested_room_id, suggested_only, max_depth, from_token
)
if pagination_key not in self._pagination_sessions:
raise SynapseError(400, "Unknown pagination token", Codes.INVALID_PARAM)

# Load the previous state.
pagination_session = self._pagination_sessions[pagination_key]
room_queue = pagination_session.room_queue
processed_rooms = pagination_session.processed_rooms
else:
# the queue of rooms to process
room_queue = deque((_RoomQueueEntry(requested_room_id, ()),))

# Rooms we have already processed.
processed_rooms = set()

rooms_result: List[JsonDict] = []

# Cap the limit to a server-side maximum.
if limit is None:
limit = MAX_ROOMS
else:
limit = min(limit, MAX_ROOMS)

# Iterate through the queue until we reach the limit or run out of
# rooms to include.
while room_queue and len(rooms_result) < limit:
queue_entry = room_queue.popleft()
room_id = queue_entry.room_id
current_depth = queue_entry.depth
if room_id in processed_rooms:
# already done this room
continue

logger.debug("Processing room %s", room_id)

is_in_room = await self._store.is_host_joined(room_id, self._server_name)
if is_in_room:
room_entry = await self._summarize_local_room(
requester,
None,
room_id,
suggested_only,
# TODO Handle max children.
max_children=None,
)

if room_entry:
rooms_result.append(room_entry.as_json())

# Add the child to the queue. We have already validated
# that the vias are a list of server names.
#
# If the current depth is the maximum depth, do not queue
# more entries.
if max_depth is None or current_depth < max_depth:
room_queue.extendleft(
_RoomQueueEntry(
ev["state_key"], ev["content"]["via"], current_depth + 1
)
for ev in reversed(room_entry.children)
)

processed_rooms.add(room_id)
else:
# TODO Federation.
pass

result: JsonDict = {"rooms": rooms_result}

# If there's additional data, generate a pagination token (and persist state).
if room_queue:
next_token = random_string(24)
result["next_token"] = next_token
pagination_key = _PaginationKey(
requested_room_id, suggested_only, max_depth, next_token
)
self._pagination_sessions[pagination_key] = _PaginationSession(
room_queue, processed_rooms
)

return result

async def federation_space_summary(
self,
origin: str,
Expand Down Expand Up @@ -652,6 +841,7 @@ async def _get_child_events(self, room_id: str) -> Iterable[EventBase]:
class _RoomQueueEntry:
room_id: str
via: Sequence[str]
depth: int = 0


@attr.s(frozen=True, slots=True, auto_attribs=True)
Expand All @@ -662,7 +852,12 @@ class _RoomEntry:
# An iterable of the sorted, stripped children events for children of this room.
#
# This may not include all children.
children: Collection[JsonDict] = ()
children: Sequence[JsonDict] = ()

def as_json(self) -> JsonDict:
result = dict(self.room)
result["children_state"] = self.children
return result


def _has_valid_via(e: EventBase) -> bool:
Expand Down
41 changes: 41 additions & 0 deletions synapse/rest/client/v1/room.py
Original file line number Diff line number Diff line change
Expand Up @@ -1445,6 +1445,46 @@ async def on_POST(
)


class RoomHierarchyRestServlet(RestServlet):
PATTERNS = (
re.compile(
"^/_matrix/client/unstable/org.matrix.msc2946"
"/rooms/(?P<room_id>[^/]*)/hierarchy$"
),
)

def __init__(self, hs: "HomeServer"):
super().__init__()
self._auth = hs.get_auth()
self._space_summary_handler = hs.get_space_summary_handler()

async def on_GET(
self, request: SynapseRequest, room_id: str
) -> Tuple[int, JsonDict]:
requester = await self._auth.get_user_by_req(request, allow_guest=True)

max_depth = parse_integer(request, "max_depth")
if max_depth is not None and max_depth < 0:
raise SynapseError(
400, "'max_depth' must be a non-negative integer", Codes.BAD_JSON
)

limit = parse_integer(request, "limit")
if limit is not None and limit <= 0:
raise SynapseError(
400, "'limit' must be a positive integer", Codes.BAD_JSON
)

return 200, await self._space_summary_handler.get_room_hierarchy(
requester.user.to_string(),
room_id,
suggested_only=parse_boolean(request, "suggested_only", default=False),
max_depth=max_depth,
limit=limit,
from_token=parse_string(request, "from"),
)


def register_servlets(hs: "HomeServer", http_server, is_worker=False):
msc2716_enabled = hs.config.experimental.msc2716_enabled

Expand All @@ -1463,6 +1503,7 @@ def register_servlets(hs: "HomeServer", http_server, is_worker=False):
RoomTypingRestServlet(hs).register(http_server)
RoomEventContextServlet(hs).register(http_server)
RoomSpaceSummaryRestServlet(hs).register(http_server)
RoomHierarchyRestServlet(hs).register(http_server)
RoomEventServlet(hs).register(http_server)
JoinedRoomsRestServlet(hs).register(http_server)
RoomAliasListServlet(hs).register(http_server)
Expand Down
Loading

0 comments on commit fe1d0c8

Please sign in to comment.