Skip to content

Commit

Permalink
Fix spawning of parent/no-parent tasks.
Browse files Browse the repository at this point in the history
(Those that have parents in some cycles but not others).
Also tidy up task spawning and runahead computation.

fixes...
  • Loading branch information
hjoliver committed Jun 14, 2022
1 parent 055a62b commit 4c98f6a
Show file tree
Hide file tree
Showing 24 changed files with 394 additions and 377 deletions.
21 changes: 6 additions & 15 deletions cylc/flow/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,8 +219,7 @@ def __init__(
self._start_point_for_actual_first_point: Optional['PointBase'] = None

self.task_param_vars = {} # type: ignore # TODO figure out type
self.custom_runahead_limit: Optional['IntervalBase'] = None
self.max_num_active_cycle_points = None
self.runahead_limit: Optional['IntervalBase'] = None

# runtime hierarchy dicts keyed by namespace name:
self.runtime: Dict[str, dict] = { # TODO figure out type
Expand Down Expand Up @@ -1188,7 +1187,7 @@ def compute_inheritance(self):
# LOG.info(log_msg)

def process_runahead_limit(self):
"""Extract the runahead limits information."""
"""Extract runahead limit information."""
limit = self.cfg['scheduling']['runahead limit']
if limit.isdigit():
limit = f'PT{limit}H'
Expand All @@ -1200,28 +1199,20 @@ def process_runahead_limit(self):
time_limit_regexes = DurationParser.DURATION_REGEXES

if number_limit_regex.fullmatch(limit):
self.custom_runahead_limit = IntegerInterval(limit)
self.runahead_limit = IntegerInterval(limit)
# Handle "runahead limit = P0":
if self.custom_runahead_limit.is_null():
self.custom_runahead_limit = IntegerInterval('P1')
if self.runahead_limit.is_null():
self.runahead_limit = IntegerInterval('P1')
elif ( # noqa: SIM106
self.cycling_type == ISO8601_CYCLING_TYPE
and any(tlr.fullmatch(limit) for tlr in time_limit_regexes)
):
self.custom_runahead_limit = ISO8601Interval(limit)
self.runahead_limit = ISO8601Interval(limit)
else:
raise WorkflowConfigError(
f'bad runahead limit "{limit}" for {self.cycling_type} '
'cycling type')

def get_custom_runahead_limit(self):
"""Return the custom runahead limit (may be None)."""
return self.custom_runahead_limit

def get_max_num_active_cycle_points(self):
"""Return the maximum allowed number of pool cycle points."""
return self.max_num_active_cycle_points

def get_config(self, args, sparse=False):
return self.pcfg.get(args, sparse)

Expand Down
8 changes: 4 additions & 4 deletions cylc/flow/id_match.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ def filter_ids(
if tokens.get(lowest_token.value):
break

cycles = []
cycles = set()
tasks = []

# filter by cycle
Expand All @@ -159,11 +159,11 @@ def filter_ids(
if not point_match(icycle, cycle, pattern_match):
continue
if cycle_sel == '*':
cycles.append(icycle)
cycles.add(icycle)
continue
for itask in itasks.values():
if match(itask.state.status, cycle_sel):
cycles.append(icycle)
cycles.add(icycle)
break

# filter by task
Expand Down Expand Up @@ -213,7 +213,7 @@ def filter_ids(
if warn:
LOG.warning(f"No active tasks matching: {id_}")
else:
_cycles.extend(cycles)
_cycles.extend(list(cycles))
_tasks.extend(tasks)

ret: List[Any] = []
Expand Down
3 changes: 2 additions & 1 deletion cylc/flow/rundb.py
Original file line number Diff line number Diff line change
Expand Up @@ -820,7 +820,8 @@ def select_task_pool_for_restart(self, callback):
LEFT OUTER JOIN
%(task_outputs)s
ON %(task_pool)s.cycle == %(task_outputs)s.cycle AND
%(task_pool)s.name == %(task_outputs)s.name
%(task_pool)s.name == %(task_outputs)s.name AND
%(task_pool)s.flow_nums == %(task_outputs)s.flow_nums
"""
form_data = {
"task_pool": self.TABLE_TASK_POOL,
Expand Down
51 changes: 10 additions & 41 deletions cylc/flow/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -701,27 +701,7 @@ def _load_pool_from_point(self):
else "Cold"
)
LOG.info(f"{start_type} start from {self.config.start_point}")

flow_num = self.flow_mgr.get_new_flow(
f"original flow from {self.config.start_point}"
)
for name in self.config.get_task_name_list():
tdef = self.config.get_taskdef(name)
try:
point = sorted([
point for point in
(seq.get_first_point(self.config.start_point)
for seq in tdef.sequences) if point
])[0]
except IndexError:
# No points
continue
parent_points = tdef.get_parent_points(point)
if not parent_points or all(
x < self.config.start_point for x in parent_points):
self.pool.add_to_pool(
TaskProxy(tdef, point, {flow_num})
)
self.pool.load_from_point()

def _load_pool_from_db(self):
"""Load task pool from DB, for a restart."""
Expand Down Expand Up @@ -909,23 +889,20 @@ def command_stop(
self.pool.stop_flow(flow_num)
return

if cycle_point:
if cycle_point is not None:
# schedule shutdown after tasks pass provided cycle point
point = TaskID.get_standardised_point(cycle_point)
if self.pool.set_stop_point(point):
if point is not None and self.pool.set_stop_point(point):
self.options.stopcp = str(point)
self.workflow_db_mgr.put_workflow_stop_cycle_point(
self.options.stopcp)
else:
# TODO: yield warning
pass
elif clock_time:
elif clock_time is not None:
# schedule shutdown after wallclock time passes provided time
parser = TimePointParser()
self.set_stop_clock(
int(parser.parse(clock_time).seconds_since_unix_epoch)
)
elif task:
elif task is not None:
# schedule shutdown after task succeeds
task_id = TaskID.get_standardised_taskid(task)
self.pool.set_stop_task(task_id)
Expand Down Expand Up @@ -1498,11 +1475,6 @@ async def main_loop(self):
self.is_updated = True

self.process_command_queue()

if self.pool.release_runahead_tasks():
self.is_updated = True
self.reset_inactivity_timer()

self.proc_pool.process()

# Tasks in the main pool that are waiting but not queued must be
Expand Down Expand Up @@ -1557,7 +1529,6 @@ async def main_loop(self):
self.xtrigger_mgr.housekeep(self.pool.get_tasks())

self.pool.set_expired_tasks()

self.release_queued_tasks()

if self.pool.sim_time_check(self.message_queue):
Expand Down Expand Up @@ -1834,8 +1805,6 @@ def check_auto_shutdown(self):
# Don't if paused.
return False

self.pool.release_runahead_tasks()

if self.check_workflow_stalled():
return False

Expand All @@ -1846,19 +1815,19 @@ def check_auto_shutdown(self):
TASK_STATUS_SUBMITTED,
TASK_STATUS_RUNNING
)
or (itask.state(TASK_STATUS_WAITING)
and not itask.state.is_runahead)
or (
itask.state(TASK_STATUS_WAITING)
and not itask.state.is_runahead
)
):
# Don't if there are more tasks to run (if waiting and not
# runahead, then held, queued, or xtriggered).
return False

# Can shut down.
if self.pool.stop_point:
self.options.stopcp = None
self.pool.stop_point = None
# Forget early stop point in case of a restart.
self.workflow_db_mgr.delete_workflow_stop_cycle_point()
self.update_data_store()

return True

Expand Down
Loading

0 comments on commit 4c98f6a

Please sign in to comment.