From d7b8d87dadb7a1a6cd80b9b93d7fcef931f928ac Mon Sep 17 00:00:00 2001 From: Olivier 'reivilibre Date: Sat, 18 May 2024 20:41:38 +0100 Subject: [PATCH 1/3] Log task usage upon finish and every 5 minutes otherwise --- synapse/util/task_scheduler.py | 69 +++++++++++++++++++++++++++++++++- 1 file changed, 67 insertions(+), 2 deletions(-) diff --git a/synapse/util/task_scheduler.py b/synapse/util/task_scheduler.py index 01d05c9ed60..31922b1ce1e 100644 --- a/synapse/util/task_scheduler.py +++ b/synapse/util/task_scheduler.py @@ -24,7 +24,12 @@ from twisted.python.failure import Failure -from synapse.logging.context import nested_logging_context +from synapse.logging.context import ( + ContextResourceUsage, + LoggingContext, + nested_logging_context, + set_current_context, +) from synapse.metrics import LaterGauge from synapse.metrics.background_process_metrics import ( run_as_background_process, @@ -81,6 +86,8 @@ class TaskScheduler: MAX_CONCURRENT_RUNNING_TASKS = 5 # Time from the last task update after which we will log a warning LAST_UPDATE_BEFORE_WARNING_MS = 24 * 60 * 60 * 1000 # 24hrs + # Report a running task's status and usage every so often. + OCCASIONAL_REPORT_INTERVAL_MS = 5 * 60 * 1000 # 5 minutes def __init__(self, hs: "HomeServer"): self._hs = hs @@ -346,6 +353,33 @@ async def _clean_scheduled_tasks(self) -> None: assert task.id not in self._running_tasks await self._store.delete_scheduled_task(task.id) + @staticmethod + def _log_task_usage( + state: str, task: ScheduledTask, usage: ContextResourceUsage, active_time: float + ) -> None: + """ + Log a line describing the state and usage of a task. + The log line is inspired by / a copy of the request log line format, + but with irrelevant fields removed. + + active_time: Time that the task has been running for, in seconds. + """ + + logger.info( + "Task %s: %.3fsec (%.3fsec, %.3fsec) (%.3fsec/%.3fsec/%d)" + " [%d dbevts] %r, %r", + state, + active_time, + usage.ru_utime, + usage.ru_stime, + usage.db_sched_duration_sec, + usage.db_txn_duration_sec, + int(usage.db_txn_count), + usage.evt_db_fetch_count, + task.resource_id, + task.params, + ) + async def _launch_task(self, task: ScheduledTask) -> None: """Launch a scheduled task now. @@ -360,8 +394,32 @@ async def _launch_task(self, task: ScheduledTask) -> None: ) function = self._actions[task.action] + def _occasional_report( + task_log_context: LoggingContext, start_time: int + ) -> None: + """ + Helper to log a 'Task continuing' line every so often. + """ + + current_time = int(self._clock.time()) + calling_context = set_current_context(task_log_context) + try: + usage = task_log_context.get_resource_usage() + TaskScheduler._log_task_usage( + "continuing", task, usage, (current_time - start_time) * 0.001 + ) + finally: + set_current_context(calling_context) + async def wrapper() -> None: - with nested_logging_context(task.id): + with nested_logging_context(task.id) as log_context: + start_time = int(self._clock.time()) + occasional_status_call = self._clock.looping_call( + _occasional_report, + TaskScheduler.OCCASIONAL_REPORT_INTERVAL_MS, + log_context, + start_time, + ) try: (status, result, error) = await function(task) except Exception: @@ -383,6 +441,13 @@ async def wrapper() -> None: ) self._running_tasks.remove(task.id) + current_time = int(self._clock.time()) + usage = log_context.get_resource_usage() + TaskScheduler._log_task_usage( + status.value, task, usage, (current_time - start_time) * 0.001 + ) + occasional_status_call.stop() + # Try launch a new task since we've finished with this one. self._clock.call_later(0.1, self._launch_scheduled_tasks) From 3958fdab777c2e929acb13c90797240557fda699 Mon Sep 17 00:00:00 2001 From: Olivier 'reivilibre Date: Sat, 18 May 2024 20:44:13 +0100 Subject: [PATCH 2/3] Newsfile Signed-off-by: Olivier 'reivilibre --- changelog.d/17219.feature | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/17219.feature diff --git a/changelog.d/17219.feature b/changelog.d/17219.feature new file mode 100644 index 00000000000..f8277a89d85 --- /dev/null +++ b/changelog.d/17219.feature @@ -0,0 +1 @@ +Add logging to tasks managed by the task scheduler, showing CPU and database usage. \ No newline at end of file From effebb3e88eaa30987a32d8d9a2331023fc5f12e Mon Sep 17 00:00:00 2001 From: Olivier 'reivilibre Date: Sat, 18 May 2024 21:11:34 +0100 Subject: [PATCH 3/3] Oops! `Clock.time` gives seconds --- synapse/util/task_scheduler.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/synapse/util/task_scheduler.py b/synapse/util/task_scheduler.py index 31922b1ce1e..448960b2978 100644 --- a/synapse/util/task_scheduler.py +++ b/synapse/util/task_scheduler.py @@ -395,25 +395,25 @@ async def _launch_task(self, task: ScheduledTask) -> None: function = self._actions[task.action] def _occasional_report( - task_log_context: LoggingContext, start_time: int + task_log_context: LoggingContext, start_time: float ) -> None: """ Helper to log a 'Task continuing' line every so often. """ - current_time = int(self._clock.time()) + current_time = self._clock.time() calling_context = set_current_context(task_log_context) try: usage = task_log_context.get_resource_usage() TaskScheduler._log_task_usage( - "continuing", task, usage, (current_time - start_time) * 0.001 + "continuing", task, usage, current_time - start_time ) finally: set_current_context(calling_context) async def wrapper() -> None: with nested_logging_context(task.id) as log_context: - start_time = int(self._clock.time()) + start_time = self._clock.time() occasional_status_call = self._clock.looping_call( _occasional_report, TaskScheduler.OCCASIONAL_REPORT_INTERVAL_MS, @@ -441,10 +441,10 @@ async def wrapper() -> None: ) self._running_tasks.remove(task.id) - current_time = int(self._clock.time()) + current_time = self._clock.time() usage = log_context.get_resource_usage() TaskScheduler._log_task_usage( - status.value, task, usage, (current_time - start_time) * 0.001 + status.value, task, usage, current_time - start_time ) occasional_status_call.stop()