Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Allow background tasks to be run on a separate worker. #8369

Merged
merged 20 commits into from
Oct 2, 2020
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
71ade09
Allow background tasks to be run on a separate worker.
clokep Sep 21, 2020
094b11e
Only run function on background worker.
clokep Sep 22, 2020
fd8aad3
Accept a worker name instead of using a flag.
clokep Sep 22, 2020
67c6fa4
Remove check for being the background_worker app.
clokep Sep 22, 2020
08a7d5d
Do not allow a non-existent worker app.
clokep Sep 23, 2020
0d06a87
Also run the user directory updating in the backgronud.
clokep Sep 24, 2020
2e98b78
Ensure the proper handlers are loaded during start-up.
clokep Sep 24, 2020
b77b89b
Backout some of the changes to simplify the PR.
clokep Sep 25, 2020
f26b92e
Ensure that the background tasks have access to the proper stores.
clokep Sep 25, 2020
53a4402
Add some documentation.
clokep Sep 25, 2020
82d167b
Do not require a replication endpoint for the instance map.
clokep Sep 25, 2020
0c9e970
Clarify confusing wording.
clokep Sep 25, 2020
d40aff7
Do not require the background worker instance to be in the instance map.
clokep Sep 29, 2020
c015379
Update the sample config.
clokep Sep 29, 2020
cfe28f2
The user directory background tasks are controlled by a separate flag.
clokep Sep 29, 2020
f0c83d9
Rename instance variable.
clokep Sep 30, 2020
b226c49
Allow the phone home stats to be able to run on any worker.
clokep Sep 30, 2020
cb740cf
Move around some storage classes to make the proper metrics methods a…
clokep Sep 30, 2020
179d8c3
Consolidate logic for starting metrics calls.
clokep Sep 30, 2020
9fb2c05
Merge remote-tracking branch 'origin/develop' into clokep/background-…
clokep Sep 30, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/8369.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Allow running background tasks in a separate worker process.
6 changes: 6 additions & 0 deletions docs/sample_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2458,6 +2458,12 @@ opentracing:
# events: worker1
# typing: worker1

# The worker that is used to run background tasks (e.g. cleaning up expired
# data). This should be one of the workers from `instance_map`. If not
# provided this defaults to the main process.
#
#run_background_tasks: worker1


# Configuration for Redis when using workers. This *must* be enabled when
# using workers (unless using old style direct TCP configuration).
Expand Down
1 change: 1 addition & 0 deletions synapse/app/generic_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -885,6 +885,7 @@ def start(config_options):
# For backwards compatibility let any of the old app names.
assert config.worker_app in (
"synapse.app.appservice",
"synapse.app.background_worker",
clokep marked this conversation as resolved.
Show resolved Hide resolved
"synapse.app.client_reader",
"synapse.app.event_creator",
"synapse.app.federation_reader",
Expand Down
16 changes: 10 additions & 6 deletions synapse/app/homeserver.py
Original file line number Diff line number Diff line change
Expand Up @@ -620,16 +620,18 @@ def generate_user_daily_visit_stats():
# Rather than update on per session basis, batch up the requests.
# If you increase the loop period, the accuracy of user_daily_visits
# table will decrease
clock.looping_call(generate_user_daily_visit_stats, 5 * 60 * 1000)
if hs.config.run_background_tasks:
clock.looping_call(generate_user_daily_visit_stats, 5 * 60 * 1000)

# monthly active user limiting functionality
def reap_monthly_active_users():
return run_as_background_process(
"reap_monthly_active_users", hs.get_datastore().reap_monthly_active_users
)

clock.looping_call(reap_monthly_active_users, 1000 * 60 * 60)
reap_monthly_active_users()
if hs.config.run_background_tasks:
clock.looping_call(reap_monthly_active_users, 1000 * 60 * 60)
reap_monthly_active_users()
clokep marked this conversation as resolved.
Show resolved Hide resolved

async def generate_monthly_active_users():
current_mau_count = 0
Expand All @@ -655,12 +657,14 @@ def start_generate_monthly_active_users():
"generate_monthly_active_users", generate_monthly_active_users
)

start_generate_monthly_active_users()
if hs.config.limit_usage_by_mau or hs.config.mau_stats_only:
if hs.config.run_background_tasks and (
hs.config.limit_usage_by_mau or hs.config.mau_stats_only
):
start_generate_monthly_active_users()
clock.looping_call(start_generate_monthly_active_users, 5 * 60 * 1000)
# End of monthly active user settings

if hs.config.report_stats:
if hs.config.run_background_tasks and hs.config.report_stats:
erikjohnston marked this conversation as resolved.
Show resolved Hide resolved
clokep marked this conversation as resolved.
Show resolved Hide resolved
logger.info("Scheduling stats reporting for 3 hour intervals")
clock.looping_call(start_phone_stats_home, 3 * 60 * 60 * 1000)

Expand Down
24 changes: 24 additions & 0 deletions synapse/config/workers.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,24 @@ def read_config(self, config, **kwargs):

self.events_shard_config = ShardedWorkerHandlingConfig(self.writers.events)

# Whether this worker should run background tasks or not.
#
# As a note for developers, the background tasks guarded by this should
# be able to run on only a single instance (meaning that they don't
# depend on any in-memory state of a particular worker).
#
# Effort is not made to ensure only a single instance of these tasks is
# running.
instance = config.get("run_background_tasks_on") or "master"
clokep marked this conversation as resolved.
Show resolved Hide resolved
if instance != "master" and instance not in self.instance_map:
raise ConfigError(
"Instance %r is configured to run background tasks but does not appear in `instance_map` config."
% (instance,)
)
self.run_background_tasks = (
self.worker_name is None and instance == "master"
) or self.worker_name == instance

def generate_config_section(self, config_dir_path, server_name, **kwargs):
return """\
## Workers ##
Expand Down Expand Up @@ -167,6 +185,12 @@ def generate_config_section(self, config_dir_path, server_name, **kwargs):
#stream_writers:
# events: worker1
# typing: worker1

# The worker that is used to run background tasks (e.g. cleaning up expired
# data). This should be one of the workers from `instance_map`. If not
# provided this defaults to the main process.
#
#run_background_tasks: worker1
"""

def read_arguments(self, args):
Expand Down
1 change: 1 addition & 0 deletions synapse/handlers/account_validity.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ def __init__(self, hs):
if (
self._account_validity.enabled
and self._account_validity.renew_by_email_enabled
and hs.config.run_background_tasks
):
# Don't do email-specific configuration if renewal by email is disabled.
self._template_html = self.config.account_validity_template_html
Expand Down
2 changes: 1 addition & 1 deletion synapse/handlers/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ def __init__(self, hs):
self._clock = self.hs.get_clock()

# Expire old UI auth sessions after a period of time.
if hs.config.worker_app is None:
if hs.config.run_background_tasks:
self._clock.looping_call(
run_as_background_process,
5 * 60 * 1000,
Expand Down
2 changes: 1 addition & 1 deletion synapse/handlers/deactivate_account.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ def __init__(self, hs):

# Start the user parter loop so it can resume parting users from rooms where
# it left off (if it has work left to do).
if hs.config.worker_app is None:
if hs.config.run_background_tasks:
hs.get_reactor().callWhenRunning(self._start_user_parting)

self._account_validity_enabled = hs.config.account_validity.enabled
Expand Down
13 changes: 7 additions & 6 deletions synapse/handlers/device.py
Original file line number Diff line number Diff line change
Expand Up @@ -522,12 +522,13 @@ def __init__(self, hs, device_handler):

# Attempt to resync out of sync device lists every 30s.
self._resync_retry_in_progress = False
self.clock.looping_call(
run_as_background_process,
30 * 1000,
func=self._maybe_retry_device_resync,
desc="_maybe_retry_device_resync",
)
if hs.config.run_background_tasks:
self.clock.looping_call(
run_as_background_process,
30 * 1000,
func=self._maybe_retry_device_resync,
desc="_maybe_retry_device_resync",
)

@trace
async def incoming_device_list_update(self, origin, edu_content):
Expand Down
2 changes: 1 addition & 1 deletion synapse/handlers/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,7 @@ def __init__(self, hs: "HomeServer"):
self._consent_uri_builder = ConsentURIBuilder(self.config)

if (
not self.config.worker_app
self.config.run_background_tasks
and self.config.cleanup_extremities_with_dummy_events
):
self.clock.looping_call(
Expand Down
2 changes: 1 addition & 1 deletion synapse/handlers/pagination.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ def __init__(self, hs: "HomeServer"):
self._retention_allowed_lifetime_min = hs.config.retention_allowed_lifetime_min
self._retention_allowed_lifetime_max = hs.config.retention_allowed_lifetime_max

if hs.config.retention_enabled:
if hs.config.run_background_tasks and hs.config.retention_enabled:
# Run the purge jobs described in the configuration file.
for job in hs.config.retention_purge_jobs:
logger.info("Setting up purge job with config: %s", job)
Expand Down
9 changes: 4 additions & 5 deletions synapse/handlers/profile.py
Original file line number Diff line number Diff line change
Expand Up @@ -371,11 +371,10 @@ class MasterProfileHandler(BaseProfileHandler):
def __init__(self, hs):
super().__init__(hs)

assert hs.config.worker_app is None

self.clock.looping_call(
self._start_update_remote_profile_cache, self.PROFILE_UPDATE_MS
)
if hs.config.run_background_tasks:
self.clock.looping_call(
self._start_update_remote_profile_cache, self.PROFILE_UPDATE_MS
)

def _start_update_remote_profile_cache(self):
return run_as_background_process(
Expand Down
2 changes: 1 addition & 1 deletion synapse/handlers/stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ def __init__(self, hs):
# Guard to ensure we only process deltas one at a time
self._is_processing = False

if hs.config.stats_enabled:
if self.stats_enabled and hs.config.run_background_tasks:
self.notifier.add_replication_callback(self.notify_new_event)

# We kick this off so that we don't have to wait for a change before
Expand Down
5 changes: 4 additions & 1 deletion synapse/storage/databases/main/censor_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,10 @@ def _censor_redactions():
"_censor_redactions", self._censor_redactions
)

if self.hs.config.redaction_retention_period is not None:
if (
self.hs.config.run_background_tasks
and self.hs.config.redaction_retention_period is not None
):
hs.get_clock().looping_call(_censor_redactions, 5 * 60 * 1000)

async def _censor_redactions(self):
Expand Down
2 changes: 1 addition & 1 deletion synapse/storage/databases/main/client_ips.py
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,7 @@ def __init__(self, database: DatabasePool, db_conn, hs):
"before", "shutdown", self._update_client_ips_batch
)

if self.user_ips_max_age:
if hs.config.run_background_tasks and self.user_ips_max_age:
self._clock.looping_call(self._prune_old_user_ips, 5 * 1000)

async def insert_client_ip(
Expand Down
5 changes: 4 additions & 1 deletion synapse/storage/databases/main/devices.py
Original file line number Diff line number Diff line change
Expand Up @@ -834,7 +834,10 @@ def __init__(self, database: DatabasePool, db_conn, hs):
name="device_id_exists", keylen=2, max_entries=10000
)

self._clock.looping_call(self._prune_old_outbound_device_pokes, 60 * 60 * 1000)
if hs.config.run_background_tasks:
self._clock.looping_call(
self._prune_old_outbound_device_pokes, 60 * 60 * 1000
)

async def store_device(
self, user_id: str, device_id: str, initial_device_display_name: str
Expand Down
7 changes: 4 additions & 3 deletions synapse/storage/databases/main/event_federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -606,9 +606,10 @@ def __init__(self, database: DatabasePool, db_conn, hs):
self.EVENT_AUTH_STATE_ONLY, self._background_delete_non_state_event_auth
)

hs.get_clock().looping_call(
self._delete_old_forward_extrem_cache, 60 * 60 * 1000
)
if hs.config.run_background_tasks:
hs.get_clock().looping_call(
self._delete_old_forward_extrem_cache, 60 * 60 * 1000
)

def _delete_old_forward_extrem_cache(self):
def _delete_old_forward_extrem_cache_txn(txn):
Expand Down
9 changes: 5 additions & 4 deletions synapse/storage/databases/main/event_push_actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -679,9 +679,10 @@ def __init__(self, database: DatabasePool, db_conn, hs):
)

self._doing_notif_rotation = False
self._rotate_notif_loop = self._clock.looping_call(
self._start_rotate_notifs, 30 * 60 * 1000
)
if hs.config.run_background_tasks:
self._rotate_notif_loop = self._clock.looping_call(
self._start_rotate_notifs, 30 * 60 * 1000
)

async def get_push_actions_for_user(
self, user_id, before=None, limit=50, only_highlight=False
Expand Down Expand Up @@ -741,7 +742,7 @@ def _remove_old_push_actions_before_txn(
users can still get a list of recent highlights.

Args:
txn: The transcation
txn: The transaction
room_id: Room ID to delete from
user_id: user ID to delete for
stream_ordering: The lowest stream ordering which will
Expand Down
3 changes: 2 additions & 1 deletion synapse/storage/databases/main/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ def read_forward_extremities():
"read_forward_extremities", self._read_forward_extremities
)

hs.get_clock().looping_call(read_forward_extremities, 60 * 60 * 1000)
if hs.config.run_background_tasks:
hs.get_clock().looping_call(read_forward_extremities, 60 * 60 * 1000)

async def _read_forward_extremities(self):
def fetch(txn):
Expand Down
3 changes: 2 additions & 1 deletion synapse/storage/databases/main/registration.py
Original file line number Diff line number Diff line change
Expand Up @@ -914,7 +914,8 @@ def start_cull():
self.cull_expired_threepid_validation_tokens,
)

hs.get_clock().looping_call(start_cull, THIRTY_MINUTES_IN_MS)
if hs.config.run_background_tasks:
hs.get_clock().looping_call(start_cull, THIRTY_MINUTES_IN_MS)

async def add_access_token_to_user(
self,
Expand Down
5 changes: 4 additions & 1 deletion synapse/storage/databases/main/roommember.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,10 @@ def __init__(self, database: DatabasePool, db_conn, hs):
self._check_safe_current_state_events_membership_updated_txn(txn)
txn.close()

if self.hs.config.metrics_flags.known_servers:
if (
self.hs.config.run_background_tasks
and self.hs.config.metrics_flags.known_servers
):
self._known_servers_count = 1
self.hs.get_clock().looping_call(
run_as_background_process,
Expand Down
3 changes: 2 additions & 1 deletion synapse/storage/databases/main/transactions.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ class TransactionStore(SQLBaseStore):
def __init__(self, database: DatabasePool, db_conn, hs):
super().__init__(database, db_conn, hs)

self._clock.looping_call(self._start_cleanup_transactions, 30 * 60 * 1000)
if hs.config.run_background_tasks:
self._clock.looping_call(self._start_cleanup_transactions, 30 * 60 * 1000)

self._destination_retry_cache = ExpiringCache(
cache_name="get_destination_retry_timings",
Expand Down