Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Do not consider events by ignored users for bundled aggregations #12235

Merged
merged 25 commits into from
Apr 11, 2022
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
75b1b5f
Filter out events from ignored users in /relations.
clokep Mar 14, 2022
3a51cfe
Add tests for ignored users in bundled aggregations.
clokep Mar 16, 2022
5e73a5e
Filter out ignored users for aggregation groups.
clokep Mar 16, 2022
56dd70a
Rename a variable.
clokep Mar 14, 2022
3357181
Filter out ignored users for threads.
clokep Mar 16, 2022
9eef5cb
Filter out ignored users for references.
clokep Mar 16, 2022
156ef7a
Add a note about edits.
clokep Mar 16, 2022
3bb8071
Newsfragment
clokep Mar 16, 2022
89c89df
Merge remote-tracking branch 'origin/develop' into clokep/ignored-use…
clokep Mar 24, 2022
fffa6ca
Merge remote-tracking branch 'origin/develop' into clokep/ignored-use…
clokep Mar 24, 2022
887fcb0
Merge remote-tracking branch 'origin/develop' into clokep/ignored-use…
clokep Mar 30, 2022
fc7d14b
Merge remote-tracking branch 'origin/develop' into clokep/ignored-use…
clokep Mar 31, 2022
ecae2ad
Do not cache on the ignored users parameter when fetching relations.
clokep Mar 24, 2022
f03a6a8
Newsfragment
clokep Mar 31, 2022
622b621
Do not cache on the ignored users parameter when fetching annotations.
clokep Mar 31, 2022
b60fded
Merge remote-tracking branch 'origin/develop' into clokep/ignored-use…
clokep Apr 5, 2022
30ce317
Add an intermediate method for threads.
clokep Apr 1, 2022
b0d4474
Do not cache on the ignored users parameter when fetching threads.
clokep Apr 6, 2022
1b2c9a1
Add missing docstrings.
clokep Apr 6, 2022
44f6975
Merge remote-tracking branch 'origin/develop' into clokep/ignored-use…
clokep Apr 6, 2022
96d9215
Revert unused changes.
clokep Apr 8, 2022
e2910a4
Docstring.
clokep Apr 8, 2022
7d6fa1e
Add logging.
clokep Apr 8, 2022
61c7526
Merge remote-tracking branch 'origin/develop' into clokep/ignored-use…
clokep Apr 8, 2022
e95bd07
Merge remote-tracking branch 'origin/develop' into clokep/ignored-use…
clokep Apr 11, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/12227.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix a long-standing bug where events from ignored users were still considered for relations.
1 change: 0 additions & 1 deletion changelog.d/12227.misc

This file was deleted.

1 change: 1 addition & 0 deletions changelog.d/12232.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix a long-standing bug where events from ignored users were still considered for relations.
1 change: 0 additions & 1 deletion changelog.d/12232.misc

This file was deleted.

1 change: 1 addition & 0 deletions changelog.d/12235.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix a long-standing bug where events from ignored users were still considered for relations.
41 changes: 33 additions & 8 deletions synapse/handlers/relations.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from typing import TYPE_CHECKING, Dict, Iterable, Optional, cast
from typing import TYPE_CHECKING, Dict, FrozenSet, Iterable, Optional, cast

import attr
from frozendict import frozendict
Expand All @@ -21,6 +21,7 @@
from synapse.api.errors import SynapseError
from synapse.events import EventBase
from synapse.types import JsonDict, Requester, StreamToken
from synapse.visibility import filter_events_for_client

if TYPE_CHECKING:
from synapse.server import HomeServer
Expand Down Expand Up @@ -62,6 +63,7 @@ def __bool__(self) -> bool:
class RelationsHandler:
def __init__(self, hs: "HomeServer"):
self._main_store = hs.get_datastores().main
self._storage = hs.get_storage()
self._auth = hs.get_auth()
self._clock = hs.get_clock()
self._event_handler = hs.get_event_handler()
Expand Down Expand Up @@ -103,7 +105,8 @@ async def get_relations(

user_id = requester.user.to_string()

await self._auth.check_user_in_room_or_world_readable(
# TODO Properly handle a user leaving a room.
(_, member_event_id) = await self._auth.check_user_in_room_or_world_readable(
room_id, user_id, allow_departed_users=True
)

Expand All @@ -113,6 +116,9 @@ async def get_relations(
if event is None:
raise SynapseError(404, "Unknown parent event.")

# Note that ignored users are not passed into get_relations_for_event
# below. Ignored users are handled in filter_events_for_client (and by
# noy passing them in here we should get a better cache hit rate).
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this sound reasonable or do we think we should do the initial filtering by ignoring users too? (I think we would essentially do it twice then -- once in get_relations_for_event, and then again in filter_events_for_client.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense so long as we have this comment here.

pagination_chunk = await self._main_store.get_relations_for_event(
event_id=event_id,
event=event,
Expand All @@ -130,6 +136,10 @@ async def get_relations(
[c["event_id"] for c in pagination_chunk.chunk]
)

events = await filter_events_for_client(
self._storage, user_id, events, is_peeking=(member_event_id is None)
)

now = self._clock.time_msec()
# Do not bundle aggregations when retrieving the original event because
# we want the content before relations are applied to it.
Expand All @@ -152,15 +162,15 @@ async def get_relations(
return return_value

async def _get_bundled_aggregation_for_event(
self, event: EventBase, user_id: str
self, event: EventBase, ignored_users: FrozenSet[str]
) -> Optional[BundledAggregations]:
"""Generate bundled aggregations for an event.

Note that this does not use a cache, but depends on cached methods.

Args:
event: The event to calculate bundled aggregations for.
user_id: The user requesting the bundled aggregations.
ignored_users: The users ignored by the requesting user.

Returns:
The bundled aggregations for an event, if bundled aggregations are
Expand All @@ -184,15 +194,20 @@ async def _get_bundled_aggregation_for_event(
aggregations = BundledAggregations()

annotations = await self._main_store.get_aggregation_groups_for_event(
event_id, room_id
event_id, room_id, ignored_users=ignored_users
)
if annotations.chunk:
aggregations.annotations = await annotations.to_dict(
cast("DataStore", self)
)

references = await self._main_store.get_relations_for_event(
event_id, event, room_id, RelationTypes.REFERENCE, direction="f"
event_id,
event,
room_id,
RelationTypes.REFERENCE,
direction="f",
ignored_users=ignored_users,
)
if references.chunk:
aggregations.references = await references.to_dict(cast("DataStore", self))
Expand Down Expand Up @@ -223,13 +238,21 @@ async def get_bundled_aggregations(
# event ID -> bundled aggregation in non-serialized form.
results: Dict[str, BundledAggregations] = {}

# Fetch any ignored users of the requesting user.
ignored_users = await self._main_store.ignored_users(user_id)

# Fetch other relations per event.
for event in events_by_id.values():
event_result = await self._get_bundled_aggregation_for_event(event, user_id)
event_result = await self._get_bundled_aggregation_for_event(
event, ignored_users
)
if event_result:
results[event.event_id] = event_result

# Fetch any edits (but not for redacted events).
#
# Note that there is no use in limiting edits by ignored users since the
# parent event should be ignored in the first place if the user is ignored.
Comment on lines +446 to +447
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this sound like sound logic? I couldn't come up with a situation where we would need to check edits,

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I think it's probably fine. The only thing is if we decided in future to allow e.g. moderators to edit messages.

edits = await self._main_store.get_applicable_edits(
[
event_id
Expand All @@ -241,7 +264,9 @@ async def get_bundled_aggregations(
results.setdefault(event_id, BundledAggregations()).replace = edit

# Fetch thread summaries.
summaries = await self._main_store.get_thread_summaries(events_by_id.keys())
summaries = await self._main_store.get_thread_summaries(
events_by_id.keys(), ignored_users
)
# Only fetch participated for a limited selection based on what had
# summaries.
participated = await self._main_store.get_threads_participated(
Expand Down
8 changes: 5 additions & 3 deletions synapse/rest/client/relations.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,10 +134,9 @@ async def on_GET(
) -> Tuple[int, JsonDict]:
requester = await self.auth.get_user_by_req(request, allow_guest=True)

user_id = requester.user.to_string()
await self.auth.check_user_in_room_or_world_readable(
room_id,
requester.user.to_string(),
allow_departed_users=True,
room_id, user_id, allow_departed_users=True
)

# This checks that a) the event exists and b) the user is allowed to
Expand All @@ -164,13 +163,16 @@ async def on_GET(
if to_token_str:
to_token = AggregationPaginationToken.from_string(to_token_str)

ignored_users = await self.store.ignored_users(user_id)

pagination_chunk = await self.store.get_aggregation_groups_for_event(
event_id=parent_id,
room_id=room_id,
event_type=event_type,
limit=limit,
from_token=from_token,
to_token=to_token,
ignored_users=ignored_users,
)

return 200, await pagination_chunk.to_dict(self.store)
Expand Down
20 changes: 17 additions & 3 deletions synapse/storage/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -2171,7 +2171,10 @@ def simple_search_list_txn(


def make_in_list_sql_clause(
database_engine: BaseDatabaseEngine, column: str, iterable: Collection[Any]
database_engine: BaseDatabaseEngine,
column: str,
iterable: Collection[Any],
include: bool = True,
) -> Tuple[str, list]:
"""Returns an SQL clause that checks the given column is in the iterable.

Expand All @@ -2184,6 +2187,8 @@ def make_in_list_sql_clause(
database_engine
column: Name of the column
iterable: The values to check the column against.
include: True if the resulting rows must include one of the given values,
False if it must exclude them.

Returns:
A tuple of SQL query and the args
Expand All @@ -2192,9 +2197,18 @@ def make_in_list_sql_clause(
if database_engine.supports_using_any_list:
# This should hopefully be faster, but also makes postgres query
# stats easier to understand.
return "%s = ANY(?)" % (column,), [list(iterable)]
if include:
sql = f"{column} = ANY(?)"
else:
sql = f"{column} != ANY(?)"
return sql, [list(iterable)]
else:
return "%s IN (%s)" % (column, ",".join("?" for _ in iterable)), list(iterable)
values = ",".join("?" for _ in iterable)
if include:
sql = f"{column} IN ({values})"
else:
sql = f"{column} NOT IN ({values})"
return sql, list(iterable)
clokep marked this conversation as resolved.
Show resolved Hide resolved


KV = TypeVar("KV")
Expand Down
57 changes: 50 additions & 7 deletions synapse/storage/databases/main/relations.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
TYPE_CHECKING,
Collection,
Dict,
FrozenSet,
Iterable,
List,
Optional,
Expand Down Expand Up @@ -73,6 +74,7 @@ async def get_relations_for_event(
direction: str = "b",
from_token: Optional[StreamToken] = None,
to_token: Optional[StreamToken] = None,
ignored_users: FrozenSet[str] = frozenset(),
) -> PaginationChunk:
"""Get a list of relations for an event, ordered by topological ordering.

Expand All @@ -88,6 +90,7 @@ async def get_relations_for_event(
oldest first (`"f"`).
from_token: Fetch rows from the given token, or from the start if None.
to_token: Fetch rows up to the given token, or up to the end if None.
ignored_users: The users ignored by the requesting user.

Returns:
List of event IDs that match relations requested. The rows are of
Expand All @@ -113,6 +116,16 @@ async def get_relations_for_event(
where_clause.append("aggregation_key = ?")
where_args.append(aggregation_key)

if ignored_users:
(
ignored_users_clause_sql,
ignored_users_clause_args,
) = make_in_list_sql_clause(
self.database_engine, "sender", ignored_users, include=False
)
where_clause.append(ignored_users_clause_sql)
where_args.extend(ignored_users_clause_args)

pagination_clause = generate_pagination_where_clause(
direction=direction,
column_names=("topological_ordering", "stream_ordering"),
Expand Down Expand Up @@ -151,18 +164,18 @@ def _get_recent_references_for_event_txn(

last_topo_id = None
last_stream_id = None
events = []
event_ids = []
for row in txn:
# Do not include edits for redacted events as they leak event
# content.
if not is_redacted or row[1] != RelationTypes.REPLACE:
events.append({"event_id": row[0]})
event_ids.append({"event_id": row[0]})
last_topo_id = row[2]
last_stream_id = row[3]

# If there are more events, generate the next pagination key.
next_token = None
if len(events) > limit and last_topo_id and last_stream_id:
if len(event_ids) > limit and last_topo_id and last_stream_id:
next_key = RoomStreamToken(last_topo_id, last_stream_id)
if from_token:
next_token = from_token.copy_and_replace("room_key", next_key)
Expand All @@ -180,7 +193,9 @@ def _get_recent_references_for_event_txn(
)

return PaginationChunk(
chunk=list(events[:limit]), next_batch=next_token, prev_batch=from_token
chunk=list(event_ids[:limit]),
next_batch=next_token,
prev_batch=from_token,
)

return await self.db_pool.runInteraction(
Expand Down Expand Up @@ -260,6 +275,7 @@ async def get_aggregation_groups_for_event(
direction: str = "b",
from_token: Optional[AggregationPaginationToken] = None,
to_token: Optional[AggregationPaginationToken] = None,
ignored_users: FrozenSet[str] = frozenset(),
) -> PaginationChunk:
"""Get a list of annotations on the event, grouped by event type and
aggregation key, sorted by count.
Expand All @@ -276,6 +292,7 @@ async def get_aggregation_groups_for_event(
the lowest count first (`"f"`).
from_token: Fetch rows from the given token, or from the start if None.
to_token: Fetch rows up to the given token, or up to the end if None.
ignored_users: The users ignored by the requesting user.

Returns:
List of groups of annotations that match. Each row is a dict with
Expand All @@ -293,6 +310,16 @@ async def get_aggregation_groups_for_event(
where_clause.append("type = ?")
where_args.append(event_type)

if ignored_users:
(
ignored_users_clause_sql,
ignored_users_clause_args,
) = make_in_list_sql_clause(
self.database_engine, "sender", ignored_users, include=False
)
where_clause.append(ignored_users_clause_sql)
where_args.extend(ignored_users_clause_args)

having_clause = generate_pagination_where_clause(
direction=direction,
column_names=("COUNT(*)", "MAX(stream_ordering)"),
Expand Down Expand Up @@ -437,18 +464,21 @@ def _get_applicable_edits_txn(txn: LoggingTransaction) -> Dict[str, str]:
for original_event_id in event_ids
}

@cached()
def get_thread_summary(self, event_id: str) -> Optional[Tuple[int, EventBase]]:
@cached(tree=True)
def get_thread_summary(
self, event_id: str, ignored_users: FrozenSet[str]
) -> Optional[Tuple[int, EventBase]]:
raise NotImplementedError()

@cachedList(cached_method_name="get_thread_summary", list_name="event_ids")
async def get_thread_summaries(
self, event_ids: Collection[str]
self, event_ids: Collection[str], ignored_users: FrozenSet[str]
) -> Dict[str, Optional[Tuple[int, EventBase, Optional[EventBase]]]]:
"""Get the number of threaded replies, the latest reply (if any), and the latest edit for that reply for the given event.

Args:
event_ids: Summarize the thread related to this event ID.
ignored_users: The users ignored by the requesting user.

Returns:
A map of the thread summary each event. A missing event implies there
Expand Down Expand Up @@ -500,6 +530,16 @@ def _get_thread_summaries_txn(
txn.database_engine, "relates_to_id", event_ids
)

if ignored_users:
(
ignored_users_clause_sql,
ignored_users_clause_args,
) = make_in_list_sql_clause(
self.database_engine, "child.sender", ignored_users, include=False
)
clause += " AND " + ignored_users_clause_sql
args.extend(ignored_users_clause_args)

if self._msc3440_enabled:
relations_clause = "(relation_type = ? OR relation_type = ?)"
args.extend((RelationTypes.THREAD, RelationTypes.UNSTABLE_THREAD))
Expand Down Expand Up @@ -536,6 +576,9 @@ def _get_thread_summaries_txn(
clause, args = make_in_list_sql_clause(
txn.database_engine, "relates_to_id", latest_event_ids.keys()
)
if ignored_users:
clause += " AND " + ignored_users_clause_sql
args.extend(ignored_users_clause_args)

if self._msc3440_enabled:
relations_clause = "(relation_type = ? OR relation_type = ?)"
Expand Down
Loading