Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Handle half-created indices in receipts index background update #14650

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions changelog.d/14650.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Fix a bug introduced in Synapse 1.72.0 where the background updates to add non-thread unique indexes on receipts would fail if they were previously interrupted.

55 changes: 46 additions & 9 deletions synapse/storage/background_updates.py
Original file line number Diff line number Diff line change
Expand Up @@ -544,6 +544,48 @@ def register_background_index_update(
The named index will be dropped upon completion of the new index.
"""

async def updater(progress: JsonDict, batch_size: int) -> int:
await self.create_index_in_background(
index_name=index_name,
table=table,
columns=columns,
where_clause=where_clause,
unique=unique,
psql_only=psql_only,
replaces_index=replaces_index,
)
await self._end_background_update(update_name)
return 1

self._background_update_handlers[update_name] = _BackgroundUpdateHandler(
updater, oneshot=True
)

async def create_index_in_background(
self,
index_name: str,
table: str,
columns: Iterable[str],
where_clause: Optional[str] = None,
unique: bool = False,
psql_only: bool = False,
replaces_index: Optional[str] = None,
) -> None:
"""Add an index in the background.

Args:
update_name: update_name to register for
index_name: name of index to add
table: table to add index to
columns: columns/expressions to include in index
where_clause: A WHERE clause to specify a partial unique index.
unique: true to make a UNIQUE index
psql_only: true to only create this index on psql databases (useful
for virtual sqlite tables)
replaces_index: The name of an index that this index replaces.
The named index will be dropped upon completion of the new index.
"""

def create_index_psql(conn: Connection) -> None:
conn.rollback()
# postgres insists on autocommit for the index
Expand Down Expand Up @@ -618,16 +660,11 @@ def create_index_sqlite(conn: Connection) -> None:
else:
runner = create_index_sqlite

async def updater(progress: JsonDict, batch_size: int) -> int:
if runner is not None:
logger.info("Adding index %s to %s", index_name, table)
await self.db_pool.runWithConnection(runner)
await self._end_background_update(update_name)
return 1
if runner is None:
return

self._background_update_handlers[update_name] = _BackgroundUpdateHandler(
updater, oneshot=True
)
logger.info("Adding index %s to %s", index_name, table)
await self.db_pool.runWithConnection(runner)

async def _end_background_update(self, update_name: str) -> None:
"""Removes a completed background update task from the queue.
Expand Down
51 changes: 12 additions & 39 deletions synapse/storage/databases/main/receipts.py
Original file line number Diff line number Diff line change
Expand Up @@ -924,39 +924,6 @@ def _populate_receipt_event_stream_ordering_txn(

return batch_size

async def _create_receipts_index(self, index_name: str, table: str) -> None:
"""Adds a unique index on `(room_id, receipt_type, user_id)` to the given
receipts table, for non-thread receipts."""

def _create_index(conn: LoggingDatabaseConnection) -> None:
conn.rollback()

# we have to set autocommit, because postgres refuses to
# CREATE INDEX CONCURRENTLY without it.
if isinstance(self.database_engine, PostgresEngine):
conn.set_session(autocommit=True)

try:
c = conn.cursor()

# Now that the duplicates are gone, we can create the index.
concurrently = (
"CONCURRENTLY"
if isinstance(self.database_engine, PostgresEngine)
else ""
)
sql = f"""
CREATE UNIQUE INDEX {concurrently} {index_name}
ON {table}(room_id, receipt_type, user_id)
WHERE thread_id IS NULL
"""
c.execute(sql)
finally:
if isinstance(self.database_engine, PostgresEngine):
conn.set_session(autocommit=False)

await self.db_pool.runWithConnection(_create_index)

async def _background_receipts_linearized_unique_index(
self, progress: dict, batch_size: int
) -> int:
Expand Down Expand Up @@ -999,9 +966,12 @@ def _remote_duplicate_receipts_txn(txn: LoggingTransaction) -> None:
_remote_duplicate_receipts_txn,
)

await self._create_receipts_index(
"receipts_linearized_unique_index",
"receipts_linearized",
await self.db_pool.updates.create_index_in_background(
index_name="receipts_linearized_unique_index",
table="receipts_linearized",
columns=["room_id", "receipt_type", "user_id"],
where_clause="thread_id IS NULL",
unique=True,
)

await self.db_pool.updates._end_background_update(
Expand Down Expand Up @@ -1050,9 +1020,12 @@ def _remote_duplicate_receipts_txn(txn: LoggingTransaction) -> None:
_remote_duplicate_receipts_txn,
)

await self._create_receipts_index(
"receipts_graph_unique_index",
"receipts_graph",
Comment on lines -1053 to -1055
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is _create_receipts_index now dead code?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good spot, thanks.

await self.db_pool.updates.create_index_in_background(
index_name="receipts_graph_unique_index",
table="receipts_graph",
columns=["room_id", "receipt_type", "user_id"],
where_clause="thread_id IS NULL",
unique=True,
)

await self.db_pool.updates._end_background_update(
Expand Down