Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Fix media repo admin APIs when using a media worker. #6664

Merged
merged 6 commits into from
Jan 8, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/6664.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix media repo admin APIs when using a media worker.
2 changes: 2 additions & 0 deletions synapse/app/media_repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
from synapse.replication.slave.storage.client_ips import SlavedClientIpStore
from synapse.replication.slave.storage.registration import SlavedRegistrationStore
from synapse.replication.slave.storage.room import RoomStore
from synapse.replication.slave.storage.transactions import SlavedTransactionStore
from synapse.replication.tcp.client import ReplicationClientHandler
from synapse.rest.admin import register_servlets_for_media_repo
Expand All @@ -47,6 +48,7 @@


class MediaRepositorySlavedStore(
RoomStore,
SlavedApplicationServiceStore,
SlavedRegistrationStore,
SlavedClientIpStore,
Expand Down
248 changes: 128 additions & 120 deletions synapse/storage/data_stores/main/room.py
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,134 @@ def get_retention_policy_for_room_txn(txn):

defer.returnValue(row)

def get_media_mxcs_in_room(self, room_id):
"""Retrieves all the local and remote media MXC URIs in a given room

Args:
room_id (str)

Returns:
The local and remote media as a lists of tuples where the key is
the hostname and the value is the media ID.
"""

def _get_media_mxcs_in_room_txn(txn):
local_mxcs, remote_mxcs = self._get_media_mxcs_in_room_txn(txn, room_id)
local_media_mxcs = []
remote_media_mxcs = []

# Convert the IDs to MXC URIs
for media_id in local_mxcs:
local_media_mxcs.append("mxc://%s/%s" % (self.hs.hostname, media_id))
for hostname, media_id in remote_mxcs:
remote_media_mxcs.append("mxc://%s/%s" % (hostname, media_id))

return local_media_mxcs, remote_media_mxcs

return self.db.runInteraction(
"get_media_ids_in_room", _get_media_mxcs_in_room_txn
)

def quarantine_media_ids_in_room(self, room_id, quarantined_by):
"""For a room loops through all events with media and quarantines
the associated media
"""

def _quarantine_media_in_room_txn(txn):
local_mxcs, remote_mxcs = self._get_media_mxcs_in_room_txn(txn, room_id)
total_media_quarantined = 0

# Now update all the tables to set the quarantined_by flag

txn.executemany(
"""
UPDATE local_media_repository
SET quarantined_by = ?
WHERE media_id = ?
""",
((quarantined_by, media_id) for media_id in local_mxcs),
)

txn.executemany(
"""
UPDATE remote_media_cache
SET quarantined_by = ?
WHERE media_origin = ? AND media_id = ?
""",
(
(quarantined_by, origin, media_id)
for origin, media_id in remote_mxcs
),
)

total_media_quarantined += len(local_mxcs)
total_media_quarantined += len(remote_mxcs)

return total_media_quarantined

return self.db.runInteraction(
"quarantine_media_in_room", _quarantine_media_in_room_txn
)

def _get_media_mxcs_in_room_txn(self, txn, room_id):
"""Retrieves all the local and remote media MXC URIs in a given room

Args:
txn (cursor)
room_id (str)

Returns:
The local and remote media as a lists of tuples where the key is
the hostname and the value is the media ID.
"""
mxc_re = re.compile("^mxc://([^/]+)/([^/#?]+)")

sql = """
SELECT stream_ordering, json FROM events
JOIN event_json USING (room_id, event_id)
WHERE room_id = ?
%(where_clause)s
AND contains_url = ? AND outlier = ?
ORDER BY stream_ordering DESC
LIMIT ?
"""
txn.execute(sql % {"where_clause": ""}, (room_id, True, False, 100))

local_media_mxcs = []
remote_media_mxcs = []

while True:
next_token = None
for stream_ordering, content_json in txn:
next_token = stream_ordering
event_json = json.loads(content_json)
content = event_json["content"]
content_url = content.get("url")
thumbnail_url = content.get("info", {}).get("thumbnail_url")

for url in (content_url, thumbnail_url):
if not url:
continue
matches = mxc_re.match(url)
if matches:
hostname = matches.group(1)
media_id = matches.group(2)
if hostname == self.hs.hostname:
local_media_mxcs.append(media_id)
else:
remote_media_mxcs.append((hostname, media_id))

if next_token is None:
# We've gone through the whole room, so we're finished.
break

txn.execute(
sql % {"where_clause": "AND stream_ordering < ?"},
(room_id, next_token, True, False, 100),
)

return local_media_mxcs, remote_media_mxcs


class RoomBackgroundUpdateStore(SQLBaseStore):
REMOVE_TOMESTONED_ROOMS_BG_UPDATE = "remove_tombstoned_rooms_from_directory"
Expand Down Expand Up @@ -810,126 +938,6 @@ def block_room(self, room_id, user_id):
(room_id,),
)

def get_media_mxcs_in_room(self, room_id):
"""Retrieves all the local and remote media MXC URIs in a given room

Args:
room_id (str)

Returns:
The local and remote media as a lists of tuples where the key is
the hostname and the value is the media ID.
"""

def _get_media_mxcs_in_room_txn(txn):
local_mxcs, remote_mxcs = self._get_media_mxcs_in_room_txn(txn, room_id)
local_media_mxcs = []
remote_media_mxcs = []

# Convert the IDs to MXC URIs
for media_id in local_mxcs:
local_media_mxcs.append("mxc://%s/%s" % (self.hs.hostname, media_id))
for hostname, media_id in remote_mxcs:
remote_media_mxcs.append("mxc://%s/%s" % (hostname, media_id))

return local_media_mxcs, remote_media_mxcs

return self.db.runInteraction(
"get_media_ids_in_room", _get_media_mxcs_in_room_txn
)

def quarantine_media_ids_in_room(self, room_id, quarantined_by):
"""For a room loops through all events with media and quarantines
the associated media
"""

def _quarantine_media_in_room_txn(txn):
local_mxcs, remote_mxcs = self._get_media_mxcs_in_room_txn(txn, room_id)
total_media_quarantined = 0

# Now update all the tables to set the quarantined_by flag

txn.executemany(
"""
UPDATE local_media_repository
SET quarantined_by = ?
WHERE media_id = ?
""",
((quarantined_by, media_id) for media_id in local_mxcs),
)

txn.executemany(
"""
UPDATE remote_media_cache
SET quarantined_by = ?
WHERE media_origin = ? AND media_id = ?
""",
(
(quarantined_by, origin, media_id)
for origin, media_id in remote_mxcs
),
)

total_media_quarantined += len(local_mxcs)
total_media_quarantined += len(remote_mxcs)

return total_media_quarantined

return self.db.runInteraction(
"quarantine_media_in_room", _quarantine_media_in_room_txn
)

def _get_media_mxcs_in_room_txn(self, txn, room_id):
"""Retrieves all the local and remote media MXC URIs in a given room

Args:
txn (cursor)
room_id (str)

Returns:
The local and remote media as a lists of tuples where the key is
the hostname and the value is the media ID.
"""
mxc_re = re.compile("^mxc://([^/]+)/([^/#?]+)")

next_token = self.get_current_events_token() + 1
local_media_mxcs = []
remote_media_mxcs = []

while next_token:
sql = """
SELECT stream_ordering, json FROM events
JOIN event_json USING (room_id, event_id)
WHERE room_id = ?
AND stream_ordering < ?
AND contains_url = ? AND outlier = ?
ORDER BY stream_ordering DESC
LIMIT ?
"""
txn.execute(sql, (room_id, next_token, True, False, 100))

next_token = None
for stream_ordering, content_json in txn:
next_token = stream_ordering
event_json = json.loads(content_json)
content = event_json["content"]
content_url = content.get("url")
thumbnail_url = content.get("info", {}).get("thumbnail_url")

for url in (content_url, thumbnail_url):
if not url:
continue
matches = mxc_re.match(url)
if matches:
hostname = matches.group(1)
media_id = matches.group(2)
if hostname == self.hs.hostname:
local_media_mxcs.append(media_id)
else:
remote_media_mxcs.append((hostname, media_id))

return local_media_mxcs, remote_media_mxcs

@defer.inlineCallbacks
def get_rooms_for_retention_period_in_range(
self, min_ms, max_ms, include_null=False
Expand Down