Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Increase DB/CPU perf of _is_server_still_joined check. #6936

Merged
merged 7 commits into from
Feb 19, 2020
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/6936.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Increase DB/CPU perf of `_is_server_still_joined` check.
31 changes: 31 additions & 0 deletions synapse/storage/data_stores/main/roommember.py
Original file line number Diff line number Diff line change
Expand Up @@ -868,6 +868,37 @@ def get_membership_from_event_ids(
desc="get_membership_from_event_ids",
)

async def is_local_host_in_room_ignoring_users(
self, room_id: str, ignore_users: Collection[str]
) -> bool:
"""Check if there are any local users, excluding those in the given
list, in the room.
"""

clause, args = make_in_list_sql_clause(
self.database_engine, "user_id", ignore_users
)

sql = """
SELECT 1 FROM local_current_membership
WHERE
room_id = ? AND membership = ?
AND NOT (%s)
LIMIT 1
""" % (
clause,
)
richvdh marked this conversation as resolved.
Show resolved Hide resolved

def _is_local_host_in_room_ignoring_users_txn(txn):
txn.execute(sql, (room_id, Membership.JOIN, *args))

return bool(txn.fetchone())
richvdh marked this conversation as resolved.
Show resolved Hide resolved

return await self.db.runInteraction(
"is_local_host_in_room_ignoring_users",
_is_local_host_in_room_ignoring_users_txn,
)


class RoomMemberBackgroundUpdateStore(SQLBaseStore):
def __init__(self, database: Database, db_conn, hs):
Expand Down
44 changes: 29 additions & 15 deletions synapse/storage/persist_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -727,6 +727,7 @@ async def _is_server_still_joined(

# Check if any of the given events are a local join that appear in the
# current state
events_to_check = [] # Event IDs that aren't an event we're persisting
for (typ, state_key), event_id in delta.to_insert.items():
if typ != EventTypes.Member or not self.is_mine_id(state_key):
continue
Expand All @@ -735,9 +736,35 @@ async def _is_server_still_joined(
if event_id == event.event_id:
if event.membership == Membership.JOIN:
return True
else:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the obscure else: here is redundant.

# The event is not in `ev_ctx_rm`, so we need to pull it out of
# the DB.
events_to_check.append(event_id)

# Check if any of the changes that we don't have events for are joins.
if events_to_check:
rows = await self.main_store.get_membership_from_event_ids(events_to_check)
is_still_joined = any(row["membership"] == Membership.JOIN for row in rows)
if is_still_joined:
return True

# None of the changes to state are joins, so we fall back to checking
# the current state of the room to see if any of our users are joined
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this needs to die

# None of the new state events are joins, so now we check the current
# room state to see if there are any other users in the room.
users_to_ignore = [
state_key
for _, state_key in itertools.chain(delta.to_insert, delta.to_delete)
if self.is_mine_id(state_key)
]

if await self.main_store.is_local_host_in_room_ignoring_users(
room_id, users_to_ignore
):
return True

# There's been a change of membership but we don't have a local join
# event in the new events, so we need to check the full state.
# The server will leave the room, so we go and find out which remote
# users will still be joined when we leave.
if current_state is None:
current_state = await self.main_store.get_current_state_ids(room_id)
current_state = dict(current_state)
Expand All @@ -746,19 +773,6 @@ async def _is_server_still_joined(

current_state.update(delta.to_insert)

event_ids = [
event_id
for (typ, state_key,), event_id in current_state.items()
if typ == EventTypes.Member and self.is_mine_id(state_key)
]

rows = await self.main_store.get_membership_from_event_ids(event_ids)
is_still_joined = any(row["membership"] == Membership.JOIN for row in rows)
if is_still_joined:
return True

# The server will leave the room, so we go and find out which remote
# users will still be joined when we leave.
remote_event_ids = [
event_id
for (typ, state_key,), event_id in current_state.items()
Expand Down