Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move towards using MultiWriterIdGenerator everywhere #17226

Merged
merged 6 commits into from
May 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/17226.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Move towards using `MultiWriterIdGenerator` everywhere.
21 changes: 18 additions & 3 deletions synapse/storage/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -2461,7 +2461,11 @@ def simple_select_list_paginate_txn(


def make_in_list_sql_clause(
database_engine: BaseDatabaseEngine, column: str, iterable: Collection[Any]
database_engine: BaseDatabaseEngine,
column: str,
iterable: Collection[Any],
*,
negative: bool = False,
) -> Tuple[str, list]:
"""Returns an SQL clause that checks the given column is in the iterable.

Expand All @@ -2474,6 +2478,7 @@ def make_in_list_sql_clause(
database_engine
column: Name of the column
iterable: The values to check the column against.
negative: Whether we should check for inequality, i.e. `NOT IN`

Returns:
A tuple of SQL query and the args
Expand All @@ -2482,9 +2487,19 @@ def make_in_list_sql_clause(
if database_engine.supports_using_any_list:
# This should hopefully be faster, but also makes postgres query
# stats easier to understand.
return "%s = ANY(?)" % (column,), [list(iterable)]
if not negative:
clause = f"{column} = ANY(?)"
else:
clause = f"{column} != ALL(?)"

return clause, [list(iterable)]
else:
return "%s IN (%s)" % (column, ",".join("?" for _ in iterable)), list(iterable)
params = ",".join("?" for _ in iterable)
if not negative:
clause = f"{column} IN ({params})"
else:
clause = f"{column} NOT IN ({params})"
return clause, list(iterable)


# These overloads ensure that `columns` and `iterable` values have the same length.
Expand Down
47 changes: 14 additions & 33 deletions synapse/storage/databases/main/account_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,9 @@
)
from synapse.storage.databases.main.cache import CacheInvalidationWorkerStore
from synapse.storage.databases.main.push_rule import PushRulesWorkerStore
from synapse.storage.engines import PostgresEngine
from synapse.storage.util.id_generators import (
AbstractStreamIdGenerator,
MultiWriterIdGenerator,
StreamIdGenerator,
)
from synapse.types import JsonDict, JsonMapping
from synapse.util import json_encoder
Expand Down Expand Up @@ -75,37 +73,20 @@ def __init__(

self._account_data_id_gen: AbstractStreamIdGenerator

if isinstance(database.engine, PostgresEngine):
self._account_data_id_gen = MultiWriterIdGenerator(
db_conn=db_conn,
db=database,
notifier=hs.get_replication_notifier(),
stream_name="account_data",
instance_name=self._instance_name,
tables=[
("room_account_data", "instance_name", "stream_id"),
("room_tags_revisions", "instance_name", "stream_id"),
("account_data", "instance_name", "stream_id"),
],
sequence_name="account_data_sequence",
writers=hs.config.worker.writers.account_data,
)
else:
# Multiple writers are not supported for SQLite.
#
# We shouldn't be running in worker mode with SQLite, but its useful
# to support it for unit tests.
self._account_data_id_gen = StreamIdGenerator(
db_conn,
hs.get_replication_notifier(),
"room_account_data",
"stream_id",
extra_tables=[
("account_data", "stream_id"),
("room_tags_revisions", "stream_id"),
],
is_writer=self._instance_name in hs.config.worker.writers.account_data,
)
self._account_data_id_gen = MultiWriterIdGenerator(
db_conn=db_conn,
db=database,
notifier=hs.get_replication_notifier(),
stream_name="account_data",
instance_name=self._instance_name,
tables=[
("room_account_data", "instance_name", "stream_id"),
("room_tags_revisions", "instance_name", "stream_id"),
("account_data", "instance_name", "stream_id"),
],
sequence_name="account_data_sequence",
writers=hs.config.worker.writers.account_data,
)

account_max = self.get_max_account_data_stream_id()
self._account_data_stream_cache = StreamChangeCache(
Expand Down
46 changes: 16 additions & 30 deletions synapse/storage/databases/main/deviceinbox.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,9 @@
LoggingTransaction,
make_in_list_sql_clause,
)
from synapse.storage.engines import PostgresEngine
from synapse.storage.util.id_generators import (
AbstractStreamIdGenerator,
MultiWriterIdGenerator,
StreamIdGenerator,
)
from synapse.types import JsonDict
from synapse.util import json_encoder
Expand Down Expand Up @@ -89,35 +87,23 @@ def __init__(
expiry_ms=30 * 60 * 1000,
)

if isinstance(database.engine, PostgresEngine):
self._can_write_to_device = (
self._instance_name in hs.config.worker.writers.to_device
)
self._can_write_to_device = (
self._instance_name in hs.config.worker.writers.to_device
)

self._to_device_msg_id_gen: AbstractStreamIdGenerator = (
MultiWriterIdGenerator(
db_conn=db_conn,
db=database,
notifier=hs.get_replication_notifier(),
stream_name="to_device",
instance_name=self._instance_name,
tables=[
("device_inbox", "instance_name", "stream_id"),
("device_federation_outbox", "instance_name", "stream_id"),
],
sequence_name="device_inbox_sequence",
writers=hs.config.worker.writers.to_device,
)
)
else:
self._can_write_to_device = True
self._to_device_msg_id_gen = StreamIdGenerator(
db_conn,
hs.get_replication_notifier(),
"device_inbox",
"stream_id",
extra_tables=[("device_federation_outbox", "stream_id")],
)
self._to_device_msg_id_gen: AbstractStreamIdGenerator = MultiWriterIdGenerator(
db_conn=db_conn,
db=database,
notifier=hs.get_replication_notifier(),
stream_name="to_device",
instance_name=self._instance_name,
tables=[
("device_inbox", "instance_name", "stream_id"),
("device_federation_outbox", "instance_name", "stream_id"),
],
sequence_name="device_inbox_sequence",
writers=hs.config.worker.writers.to_device,
)

max_device_inbox_id = self._to_device_msg_id_gen.get_current_token()
device_inbox_prefill, min_device_inbox_id = self.db_pool.get_cache_dict(
Expand Down
101 changes: 33 additions & 68 deletions synapse/storage/databases/main/events_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,12 +75,10 @@
LoggingDatabaseConnection,
LoggingTransaction,
)
from synapse.storage.engines import PostgresEngine
from synapse.storage.types import Cursor
from synapse.storage.util.id_generators import (
AbstractStreamIdGenerator,
MultiWriterIdGenerator,
StreamIdGenerator,
)
from synapse.storage.util.sequence import build_sequence_generator
from synapse.types import JsonDict, get_domain_from_id
Expand Down Expand Up @@ -195,51 +193,28 @@ def __init__(

self._stream_id_gen: AbstractStreamIdGenerator
self._backfill_id_gen: AbstractStreamIdGenerator
Comment on lines 194 to 195
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can probably let these just be MultiWriterIdGenerator now instead of defining them as AbstractStreamIdGenerator? (Applies throughout the diff.)

(Overall love the simplification! 🎉 )

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I tried to remove these and mypy went 💥 I'm sure we can get rid of them, but I didn't want to go down that rabbithole right now

if isinstance(database.engine, PostgresEngine):
# If we're using Postgres than we can use `MultiWriterIdGenerator`
# regardless of whether this process writes to the streams or not.
self._stream_id_gen = MultiWriterIdGenerator(
db_conn=db_conn,
db=database,
notifier=hs.get_replication_notifier(),
stream_name="events",
instance_name=hs.get_instance_name(),
tables=[("events", "instance_name", "stream_ordering")],
sequence_name="events_stream_seq",
writers=hs.config.worker.writers.events,
)
self._backfill_id_gen = MultiWriterIdGenerator(
db_conn=db_conn,
db=database,
notifier=hs.get_replication_notifier(),
stream_name="backfill",
instance_name=hs.get_instance_name(),
tables=[("events", "instance_name", "stream_ordering")],
sequence_name="events_backfill_stream_seq",
positive=False,
writers=hs.config.worker.writers.events,
)
else:
# Multiple writers are not supported for SQLite.
#
# We shouldn't be running in worker mode with SQLite, but its useful
# to support it for unit tests.
self._stream_id_gen = StreamIdGenerator(
db_conn,
hs.get_replication_notifier(),
"events",
"stream_ordering",
is_writer=hs.get_instance_name() in hs.config.worker.writers.events,
)
self._backfill_id_gen = StreamIdGenerator(
db_conn,
hs.get_replication_notifier(),
"events",
"stream_ordering",
step=-1,
extra_tables=[("ex_outlier_stream", "event_stream_ordering")],
is_writer=hs.get_instance_name() in hs.config.worker.writers.events,
)

self._stream_id_gen = MultiWriterIdGenerator(
db_conn=db_conn,
db=database,
notifier=hs.get_replication_notifier(),
stream_name="events",
instance_name=hs.get_instance_name(),
tables=[("events", "instance_name", "stream_ordering")],
sequence_name="events_stream_seq",
writers=hs.config.worker.writers.events,
)
self._backfill_id_gen = MultiWriterIdGenerator(
db_conn=db_conn,
db=database,
notifier=hs.get_replication_notifier(),
stream_name="backfill",
instance_name=hs.get_instance_name(),
tables=[("events", "instance_name", "stream_ordering")],
sequence_name="events_backfill_stream_seq",
positive=False,
writers=hs.config.worker.writers.events,
)

events_max = self._stream_id_gen.get_current_token()
curr_state_delta_prefill, min_curr_state_delta_id = self.db_pool.get_cache_dict(
Expand Down Expand Up @@ -309,27 +284,17 @@ def get_chain_id_txn(txn: Cursor) -> int:

self._un_partial_stated_events_stream_id_gen: AbstractStreamIdGenerator

if isinstance(database.engine, PostgresEngine):
self._un_partial_stated_events_stream_id_gen = MultiWriterIdGenerator(
db_conn=db_conn,
db=database,
notifier=hs.get_replication_notifier(),
stream_name="un_partial_stated_event_stream",
instance_name=hs.get_instance_name(),
tables=[
("un_partial_stated_event_stream", "instance_name", "stream_id")
],
sequence_name="un_partial_stated_event_stream_sequence",
# TODO(faster_joins, multiple writers) Support multiple writers.
writers=["master"],
)
else:
self._un_partial_stated_events_stream_id_gen = StreamIdGenerator(
db_conn,
hs.get_replication_notifier(),
"un_partial_stated_event_stream",
"stream_id",
)
self._un_partial_stated_events_stream_id_gen = MultiWriterIdGenerator(
db_conn=db_conn,
db=database,
notifier=hs.get_replication_notifier(),
stream_name="un_partial_stated_event_stream",
instance_name=hs.get_instance_name(),
tables=[("un_partial_stated_event_stream", "instance_name", "stream_id")],
sequence_name="un_partial_stated_event_stream_sequence",
# TODO(faster_joins, multiple writers) Support multiple writers.
writers=["master"],
)

def get_un_partial_stated_events_token(self, instance_name: str) -> int:
return (
Expand Down
27 changes: 10 additions & 17 deletions synapse/storage/databases/main/presence.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,11 @@
LoggingTransaction,
)
from synapse.storage.databases.main.cache import CacheInvalidationWorkerStore
from synapse.storage.engines import PostgresEngine
from synapse.storage.engines._base import IsolationLevel
from synapse.storage.types import Connection
from synapse.storage.util.id_generators import (
AbstractStreamIdGenerator,
MultiWriterIdGenerator,
StreamIdGenerator,
)
from synapse.util.caches.descriptors import cached, cachedList
from synapse.util.caches.stream_change_cache import StreamChangeCache
Expand Down Expand Up @@ -91,21 +89,16 @@ def __init__(
self._instance_name in hs.config.worker.writers.presence
)

if isinstance(database.engine, PostgresEngine):
self._presence_id_gen = MultiWriterIdGenerator(
db_conn=db_conn,
db=database,
notifier=hs.get_replication_notifier(),
stream_name="presence_stream",
instance_name=self._instance_name,
tables=[("presence_stream", "instance_name", "stream_id")],
sequence_name="presence_stream_sequence",
writers=hs.config.worker.writers.presence,
)
else:
self._presence_id_gen = StreamIdGenerator(
db_conn, hs.get_replication_notifier(), "presence_stream", "stream_id"
)
self._presence_id_gen = MultiWriterIdGenerator(
db_conn=db_conn,
db=database,
notifier=hs.get_replication_notifier(),
stream_name="presence_stream",
instance_name=self._instance_name,
tables=[("presence_stream", "instance_name", "stream_id")],
sequence_name="presence_stream_sequence",
writers=hs.config.worker.writers.presence,
)

self.hs = hs
self._presence_on_startup = self._get_active_presence(db_conn)
Expand Down
Loading
Loading