Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Set our own stream position from the current sequence value on startup #17309

Merged
merged 9 commits into from
Jun 17, 2024
1 change: 1 addition & 0 deletions changelog.d/17309.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
When rolling back to a previous Synapse version and then forwards again to this release, don't require server operators to manually run SQL.
23 changes: 20 additions & 3 deletions synapse/storage/util/id_generators.py
Original file line number Diff line number Diff line change
Expand Up @@ -276,9 +276,6 @@ def __init__(
# no active writes in progress.
self._max_position_of_local_instance = self._max_seen_allocated_stream_id

# This goes and fills out the above state from the database.
self._load_current_ids(db_conn, tables)

self._sequence_gen = build_sequence_generator(
db_conn=db_conn,
database_engine=db.engine,
Expand All @@ -303,6 +300,13 @@ def __init__(
positive=positive,
)

# This goes and fills out the above state from the database.
# This may read on the PostgreSQL sequence, and
# SequenceGenerator.check_consistency might have fixed up the sequence, which
# means the SequenceGenerator needs to be setup before we read the value from
# the sequence.
self._load_current_ids(db_conn, tables, sequence_name)

self._max_seen_allocated_stream_id = max(
self._current_positions.values(), default=1
)
Expand All @@ -327,6 +331,7 @@ def _load_current_ids(
self,
db_conn: LoggingDatabaseConnection,
tables: List[Tuple[str, str, str]],
sequence_name: str,
) -> None:
cur = db_conn.cursor(txn_name="_load_current_ids")

Expand Down Expand Up @@ -360,6 +365,18 @@ def _load_current_ids(
if instance in self._writers
}

# If we're a writer, we can assume we're at the end of the stream
# Usually, we would get that from the stream_positions, but in some cases,
# like if we rolled back Synapse, the stream_positions table might not be up to
# date. If we're using Postgres for the sequences, we can just use the current
# sequence value as our own position.
if self._instance_name in self._writers:
if isinstance(self._db.engine, PostgresEngine):
cur.execute(f"SELECT last_value FROM {sequence_name}")
row = cur.fetchone()
assert row is not None
self._current_positions[self._instance_name] = row[0]

# We set the `_persisted_upto_position` to be the minimum of all current
# positions. If empty we use the max stream ID from the DB table.
min_stream_id = min(self._current_positions.values(), default=None)
Expand Down
Loading
Loading