diff --git a/cylc/flow/cfgspec/globalcfg.py b/cylc/flow/cfgspec/globalcfg.py index 0d3260c7590..5c95b3ea9e3 100644 --- a/cylc/flow/cfgspec/globalcfg.py +++ b/cylc/flow/cfgspec/globalcfg.py @@ -263,6 +263,13 @@ .. versionchanged:: 8.0.0 {REPLACES}``abort on inactivity``. + ''', + 'restart timeout': ''' + How long to wait for intervention on restarting a completed workflow. + The timer stops if any task is triggered. + + .. versionadded:: 8.1.0 + ''' } @@ -833,6 +840,8 @@ def default_for( vdr_type = VDR.V_INTERVAL if item == "stall timeout": default = DurationFloat(3600) + elif item == "restart timeout": + default = DurationFloat(120) else: default = None Conf(item, vdr_type, default, desc=desc) diff --git a/cylc/flow/scheduler.py b/cylc/flow/scheduler.py index e32277f6c7a..d87746449b6 100644 --- a/cylc/flow/scheduler.py +++ b/cylc/flow/scheduler.py @@ -30,7 +30,6 @@ from time import sleep, time import traceback from typing import ( - TYPE_CHECKING, Callable, Iterable, NoReturn, @@ -138,9 +137,6 @@ get_utc_mode) from cylc.flow.xtrigger_mgr import XtriggerManager -if TYPE_CHECKING: - from cylc.flow.task_proxy import TaskProxy - class SchedulerStop(CylcError): """Scheduler normal stop.""" @@ -158,9 +154,10 @@ class Scheduler: EVENT_SHUTDOWN = WorkflowEventHandler.EVENT_SHUTDOWN EVENT_ABORTED = WorkflowEventHandler.EVENT_ABORTED EVENT_WORKFLOW_TIMEOUT = WorkflowEventHandler.EVENT_WORKFLOW_TIMEOUT + EVENT_STALL = WorkflowEventHandler.EVENT_STALL EVENT_STALL_TIMEOUT = WorkflowEventHandler.EVENT_STALL_TIMEOUT + EVENT_RESTART_TIMEOUT = WorkflowEventHandler.EVENT_RESTART_TIMEOUT EVENT_INACTIVITY_TIMEOUT = WorkflowEventHandler.EVENT_INACTIVITY_TIMEOUT - EVENT_STALL = WorkflowEventHandler.EVENT_STALL # Intervals in seconds INTERVAL_MAIN_LOOP = 1.0 @@ -240,6 +237,7 @@ class Scheduler: is_paused = False is_updated = False is_stalled = False + is_restart_timeout_wait = False is_reloaded = False # main loop @@ -483,7 +481,8 @@ async def configure(self): for event, start_now, log_reset_func in [ (self.EVENT_INACTIVITY_TIMEOUT, True, LOG.debug), (self.EVENT_WORKFLOW_TIMEOUT, True, None), - (self.EVENT_STALL_TIMEOUT, False, None) + (self.EVENT_STALL_TIMEOUT, False, None), + (self.EVENT_RESTART_TIMEOUT, False, None) ]: interval = self._get_events_conf(event) if interval is not None: @@ -492,6 +491,16 @@ async def configure(self): timer.reset() self.timers[event] = timer + if self.is_restart and not self.pool.get_all_tasks(): + # This workflow completed before restart; wait for intervention. + with suppress(KeyError): + self.timers[self.EVENT_RESTART_TIMEOUT].reset() + self.is_restart_timeout_wait = True + LOG.warning( + "This workflow already completed.\nIt will shut down" + " again unless you trigger tasks before it times out." + ) + # Main loop plugins self.main_loop_plugins = main_loop.load( self.cylc_config.get('main loop', {}), @@ -511,7 +520,7 @@ async def configure(self): self.pause_workflow() self.profiler.log_memory("scheduler.py: begin run while loop") - self.is_updated = True + # self.is_updated = True # NOT NEEDED? if self.options.profile_mode: self.previous_profile_point = 0 self.count = 0 @@ -1366,7 +1375,7 @@ async def workflow_shutdown(self): # Is the workflow ready to shut down now? if self.pool.can_stop(self.stop_mode): - await self.update_data_structure() + await self.update_data_structure(self.is_reloaded) self.proc_pool.close() if self.stop_mode != StopMode.REQUEST_NOW_NOW: # Wait for process pool to complete, @@ -1587,11 +1596,33 @@ async def main_loop(self) -> None: # Update state summary, database, and uifeed self.workflow_db_mgr.put_task_event_timers(self.task_events_mgr) - has_updated = await self.update_data_structure() - if has_updated and not self.is_stalled: - # Stop the stalled timer. + + updated_task_list = [ + t for t in self.pool.get_tasks() if t.state.is_updated] + has_updated = updated_task_list or self.is_updated + + if updated_task_list and self.is_restart_timeout_wait: with suppress(KeyError): - self.timers[self.EVENT_STALL_TIMEOUT].stop() + self.timers[self.EVENT_RESTART_TIMEOUT].stop() + self.is_restart_timeout_wait = False + + if has_updated: + await self.update_data_structure(self.is_reloaded) + # TODO what's this about? + if not self.is_reloaded: + # (A reload cannot unstall workflow by itself) + self.is_stalled = False + self.is_reloaded = False + + # Reset workflow and task updated flags. + self.is_updated = False + for itask in updated_task_list: + itask.state.is_updated = False + + if not self.is_stalled: + # Stop the stalled timer. + with suppress(KeyError): + self.timers[self.EVENT_STALL_TIMEOUT].stop() self.process_workflow_db_queue() @@ -1640,17 +1671,12 @@ async def main_loop(self) -> None: self.main_loop_intervals.append(time() - tinit) # END MAIN LOOP - async def update_data_structure(self) -> Union[bool, List['TaskProxy']]: + async def update_data_structure(self, reloaded: bool = False): """Update DB, UIS, Summary data elements""" - updated_tasks = [ - t for t in self.pool.get_tasks() if t.state.is_updated] - has_updated = self.is_updated or updated_tasks - reloaded = self.is_reloaded - # Add tasks that have moved moved from runahead to live pool. - if has_updated or self.data_store_mgr.updates_pending: + # Add tasks that have moved from runahead to live pool. + if self.data_store_mgr.updates_pending: # Collect/apply data store updates/deltas self.data_store_mgr.update_data_structure(reloaded=reloaded) - self.is_reloaded = False # Publish updates: if self.data_store_mgr.publish_pending: self.data_store_mgr.publish_pending = False @@ -1659,17 +1685,9 @@ async def update_data_structure(self) -> Union[bool, List['TaskProxy']]: # Non-async sleep - yield to other threads rather # than event loop sleep(0) - if has_updated: - # Database update - self.workflow_db_mgr.put_task_pool(self.pool) - # Reset workflow and task updated flags. - self.is_updated = False - if not reloaded: # (A reload cannot unstall workflow by itself) - self.is_stalled = False - for itask in updated_tasks: - itask.state.is_updated = False - self.update_data_store() - return has_updated + # Database update + self.workflow_db_mgr.put_task_pool(self.pool) + self.update_data_store() def check_workflow_timers(self): """Check timers, and abort or run event handlers as configured.""" @@ -1682,6 +1700,10 @@ def check_workflow_timers(self): raise SchedulerError(f'"{abort_conf}" is set') if self._get_events_conf(f"{event} handlers") is not None: self.run_event_handlers(event) + if event == self.EVENT_RESTART_TIMEOUT: + # Unset wait flag to allow normal shutdown. + with suppress(KeyError): + self.is_restart_timeout_wait = False def check_workflow_stalled(self) -> bool: """Check if workflow is stalled or not.""" @@ -1850,27 +1872,25 @@ def stop_clock_done(self): def check_auto_shutdown(self): """Check if we should shut down now.""" - if self.is_paused: - # Don't if paused. - return False - - if self.check_workflow_stalled(): - return False - - if any( - itask for itask in self.pool.get_tasks() - if itask.state( - TASK_STATUS_PREPARING, - TASK_STATUS_SUBMITTED, - TASK_STATUS_RUNNING - ) - or ( - itask.state(TASK_STATUS_WAITING) - and not itask.state.is_runahead + if ( + self.is_paused or + self.is_restart_timeout_wait or + self.check_workflow_stalled() or + # if more tasks to run (if waiting and not + # runahead, then held, queued, or xtriggered). + any( + itask for itask in self.pool.get_tasks() + if itask.state( + TASK_STATUS_PREPARING, + TASK_STATUS_SUBMITTED, + TASK_STATUS_RUNNING + ) + or ( + itask.state(TASK_STATUS_WAITING) + and not itask.state.is_runahead + ) ) ): - # Don't if there are more tasks to run (if waiting and not - # runahead, then held, queued, or xtriggered). return False # Can shut down. diff --git a/cylc/flow/workflow_events.py b/cylc/flow/workflow_events.py index adf2abfed8c..b7631e1fd9f 100644 --- a/cylc/flow/workflow_events.py +++ b/cylc/flow/workflow_events.py @@ -202,6 +202,7 @@ class WorkflowEventHandler(): EVENT_INACTIVITY_TIMEOUT = 'inactivity timeout' EVENT_STALL = 'stall' EVENT_STALL_TIMEOUT = 'stall timeout' + EVENT_RESTART_TIMEOUT = 'restart timeout' WORKFLOW_EVENT_HANDLER = 'workflow-event-handler' WORKFLOW_EVENT_MAIL = 'workflow-event-mail'