Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Include whether the requesting user has participated in a thread. #11577

Merged
merged 6 commits into from
Jan 18, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/11577.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Include whether the requesting user has participated in a thread when generating a summary for [MSC3440](https://github.com/matrix-org/matrix-doc/pull/3440).
2 changes: 1 addition & 1 deletion synapse/handlers/pagination.py
Original file line number Diff line number Diff line change
Expand Up @@ -537,7 +537,7 @@ async def get_messages(
state_dict = await self.store.get_events(list(state_ids.values()))
state = state_dict.values()

aggregations = await self.store.get_bundled_aggregations(events)
aggregations = await self.store.get_bundled_aggregations(events, user_id)

time_now = self.clock.time_msec()

Expand Down
12 changes: 9 additions & 3 deletions synapse/handlers/room.py
Original file line number Diff line number Diff line change
Expand Up @@ -1182,12 +1182,18 @@ async def filter_evts(events: List[EventBase]) -> List[EventBase]:
results["event"] = filtered[0]

# Fetch the aggregations.
aggregations = await self.store.get_bundled_aggregations([results["event"]])
aggregations = await self.store.get_bundled_aggregations(
[results["event"]], user.to_string()
)
aggregations.update(
await self.store.get_bundled_aggregations(results["events_before"])
await self.store.get_bundled_aggregations(
results["events_before"], user.to_string()
)
)
aggregations.update(
await self.store.get_bundled_aggregations(results["events_after"])
await self.store.get_bundled_aggregations(
results["events_after"], user.to_string()
)
)
results["aggregations"] = aggregations

Expand Down
4 changes: 3 additions & 1 deletion synapse/handlers/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -637,7 +637,9 @@ async def _load_filtered_recents(
# as clients will have all the necessary information.
bundled_aggregations = None
if limited or newly_joined_room:
bundled_aggregations = await self.store.get_bundled_aggregations(recents)
bundled_aggregations = await self.store.get_bundled_aggregations(
recents, sync_config.user.to_string()
)

return TimelineBatch(
events=recents,
Expand Down
4 changes: 3 additions & 1 deletion synapse/rest/client/relations.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,9 @@ async def on_GET(
)
# The relations returned for the requested event do include their
# bundled aggregations.
aggregations = await self.store.get_bundled_aggregations(events)
aggregations = await self.store.get_bundled_aggregations(
events, requester.user.to_string()
)
serialized_events = self._event_serializer.serialize_events(
events, now, bundle_aggregations=aggregations
)
Expand Down
4 changes: 3 additions & 1 deletion synapse/rest/client/room.py
Original file line number Diff line number Diff line change
Expand Up @@ -663,7 +663,9 @@ async def on_GET(

if event:
# Ensure there are bundled aggregations available.
aggregations = await self._store.get_bundled_aggregations([event])
aggregations = await self._store.get_bundled_aggregations(
[event], requester.user.to_string()
)

time_now = self.clock.time_msec()
event_dict = self._event_serializer.serialize_event(
Expand Down
7 changes: 7 additions & 0 deletions synapse/storage/databases/main/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -1795,6 +1795,13 @@ def _handle_event_relations(
txn.call_after(
self.store.get_thread_summary.invalidate, (parent_id, event.room_id)
)
# It should be safe to only invalidate the cache if the user has not
# previously participated in the thread, but that's difficult (and
# potentially error-prone) so it is always invalidated.
txn.call_after(
self.store.get_thread_participated.invalidate,
(parent_id, event.room_id, event.sender),
)

def _handle_insertion_event(self, txn: LoggingTransaction, event: EventBase):
"""Handles keeping track of insertion events and edges/connections.
Expand Down
66 changes: 55 additions & 11 deletions synapse/storage/databases/main/relations.py
Original file line number Diff line number Diff line change
Expand Up @@ -384,8 +384,7 @@ def _get_applicable_edit_txn(txn: LoggingTransaction) -> Optional[str]:
async def get_thread_summary(
self, event_id: str, room_id: str
) -> Tuple[int, Optional[EventBase]]:
"""Get the number of threaded replies, the senders of those replies, and
the latest reply (if any) for the given event.
"""Get the number of threaded replies and the latest reply (if any) for the given event.

Args:
event_id: Summarize the thread related to this event ID.
Expand All @@ -398,7 +397,7 @@ async def get_thread_summary(
def _get_thread_summary_txn(
txn: LoggingTransaction,
) -> Tuple[int, Optional[str]]:
# Fetch the count of threaded events and the latest event ID.
# Fetch the latest event ID in the thread.
# TODO Should this only allow m.room.message events.
sql = """
SELECT event_id
Expand All @@ -419,6 +418,7 @@ def _get_thread_summary_txn(

latest_event_id = row[0]

# Fetch the number of threaded replies.
sql = """
SELECT COUNT(event_id)
FROM event_relations
Expand All @@ -443,6 +443,44 @@ def _get_thread_summary_txn(

return count, latest_event

@cached()
async def get_thread_participated(
self, event_id: str, room_id: str, user_id: str
) -> bool:
"""Get whether the requesting user participated in a thread.

This is separate from get_thread_summary since that can be cached across
all users while this value is specific to the requeser.

Args:
event_id: The thread related to this event ID.
room_id: The room the event belongs to.
user_id: The user requesting the summary.

Returns:
True if the requesting user participated in the thread, otherwise false.
"""

def _get_thread_summary_txn(txn: LoggingTransaction) -> bool:
# Fetch whether the requester has participated or not.
sql = """
SELECT 1
FROM event_relations
INNER JOIN events USING (event_id)
WHERE
relates_to_id = ?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any idea how well this performs? E.g. are there relevant indices for the query planner to make use of?

I see

    "event_relations_relates" btree (relates_to_id, relation_type, aggregation_key)
    "events_order_room2" btree (room_id, topological_ordering, stream_ordering)

I'd guess this means it has to scan the entire room or thread to find the sender of interest. Maybe that's fine---we're assuming most threads aren't that long?

For what it's worth:

matrix=> EXPLAIN
SELECT 1
FROM event_relations
INNER JOIN events USING (event_id)
WHERE
    relates_to_id='a'
    AND room_id = 'b'
    AND relation_type = 'c'
    AND sender = 'd'
;
                                              QUERY PLAN                                              
══════════════════════════════════════════════════════════════════════════════════════════════════════
 Nested Loop  (cost=1.40..12.12 rows=1 width=4)
   ->  Index Scan using event_relations_relates on event_relations  (cost=0.69..4.71 rows=1 width=42)
         Index Cond: ((relates_to_id = 'a'::text) AND (relation_type = 'c'::text))
   ->  Index Scan using events_event_id_key on events  (cost=0.70..4.73 rows=1 width=38)
         Index Cond: (event_id = event_relations.event_id)
         Filter: ((room_id = 'b'::text) AND (sender = 'd'::text))
(6 rows)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure, but it is pretty much the same as the other queries we're doing on relations, so I don't think it will be a problem?

I wonder if these queries (in general) would benefit from an index of relates_to_id, relation_type. The current one seems to include aggregation_key.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good to me.

As for the indices, I think the existing one is sufficient, because a btree index on (X, Y, Z) can be used as an index on (X, Y). But I can't find a reference for this. Maybe @reivilibre knows one?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But I can't find a reference for this.

Searching for "Index" and "prefix" gets you stuff related to WHERE strcol LIKE 'foo%'. What I wanted to find was

https://use-the-index-luke.com/sql/where-clause/the-equals-operator/concatenated-keys and https://www.postgresql.org/docs/10/indexes-multicolumn.html . To quote from postgres docs specifically

A multicolumn B-tree index can be used with query conditions that involve any subset of the index's columns, but the index is most efficient when there are constraints on the leading (leftmost) columns.

AND room_id = ?
AND relation_type = ?
AND sender = ?
"""

txn.execute(sql, (event_id, room_id, RelationTypes.THREAD, user_id))
return bool(txn.fetchone())

return await self.db_pool.runInteraction(
"get_thread_summary", _get_thread_summary_txn
)

async def events_have_relations(
self,
parent_ids: List[str],
Expand Down Expand Up @@ -546,14 +584,15 @@ def _get_if_user_has_annotated_event(txn: LoggingTransaction) -> bool:
)

async def _get_bundled_aggregation_for_event(
self, event: EventBase
self, event: EventBase, user_id: str
) -> Optional[Dict[str, Any]]:
"""Generate bundled aggregations for an event.

Note that this does not use a cache, but depends on cached methods.

Args:
event: The event to calculate bundled aggregations for.
user_id: The user requesting the bundled aggregations.

Returns:
The bundled aggregations for an event, if bundled aggregations are
Expand Down Expand Up @@ -598,27 +637,32 @@ async def _get_bundled_aggregation_for_event(

# If this event is the start of a thread, include a summary of the replies.
if self._msc3440_enabled:
(
thread_count,
latest_thread_event,
) = await self.get_thread_summary(event_id, room_id)
thread_count, latest_thread_event = await self.get_thread_summary(
event_id, room_id
)
participated = await self.get_thread_participated(
event_id, room_id, user_id
)
if latest_thread_event:
aggregations[RelationTypes.THREAD] = {
# Don't bundle aggregations as this could recurse forever.
"latest_event": latest_thread_event,
"count": thread_count,
"current_user_participated": participated,
}

# Store the bundled aggregations in the event metadata for later use.
return aggregations

async def get_bundled_aggregations(
self, events: Iterable[EventBase]
self,
events: Iterable[EventBase],
user_id: str,
) -> Dict[str, Dict[str, Any]]:
"""Generate bundled aggregations for events.

Args:
events: The iterable of events to calculate bundled aggregations for.
user_id: The user requesting the bundled aggregations.

Returns:
A map of event ID to the bundled aggregation for the event. Not all
Expand All @@ -631,7 +675,7 @@ async def get_bundled_aggregations(
# TODO Parallelize.
results = {}
for event in events:
event_result = await self._get_bundled_aggregation_for_event(event)
event_result = await self._get_bundled_aggregation_for_event(event, user_id)
if event_result is not None:
results[event.event_id] = event_result

Expand Down
3 changes: 3 additions & 0 deletions tests/rest/client/test_relations.py
Original file line number Diff line number Diff line change
Expand Up @@ -515,6 +515,9 @@ def assert_bundle(actual):
2,
actual[RelationTypes.THREAD].get("count"),
)
self.assertTrue(
actual[RelationTypes.THREAD].get("current_user_participated")
)
# The latest thread event has some fields that don't matter.
self.assert_dict(
{
Expand Down