Skip to content

Commit

Permalink
clock-expire: exclude active tasks
Browse files Browse the repository at this point in the history
* Active tasks should not be considered for clock-expiry.
  Closes cylc#6025
* Manually triggered tasks should not be considered for clock-expiry.
  Implements proposal point 10
  https://cylc.github.io/cylc-admin/proposal-optional-output-extension.html#proposal
  • Loading branch information
oliver-sanders committed Apr 3, 2024
1 parent 2bd254b commit 505b7d4
Show file tree
Hide file tree
Showing 2 changed files with 82 additions and 4 deletions.
23 changes: 19 additions & 4 deletions cylc/flow/task_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -2161,10 +2161,25 @@ def spawn_parentless_sequential_xtriggers(self):
def clock_expire_tasks(self):
"""Expire any tasks past their clock-expiry time."""
for itask in self.get_tasks():
if not itask.clock_expire():
continue
self.task_events_mgr.process_message(
itask, logging.WARNING, TASK_OUTPUT_EXPIRED)
if (
# force triggered tasks can not clock-expire
# see proposal point 10:
# https://cylc.github.io/cylc-admin/proposal-optional-output-extension.html#proposal
not itask.is_manual_submit

# only waiting tasks can clock-expire
# see https://github.com/cylc/cylc-flow/issues/6025
# (note retrying tasks will be in the waiting state)
and itask.state(TASK_STATUS_WAITING)

# check if this task is clock expired
and itask.clock_expire()
):
self.task_events_mgr.process_message(
itask,
logging.WARNING,
TASK_OUTPUT_EXPIRED,
)

def task_succeeded(self, id_):
"""Return True if task with id_ is in the succeeded state."""
Expand Down
63 changes: 63 additions & 0 deletions tests/integration/test_optional_outputs.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
TASK_STATUS_EXPIRED,
TASK_STATUS_PREPARING,
TASK_STATUS_WAITING,
TASK_STATUSES_ACTIVE,
)

if TYPE_CHECKING:
Expand Down Expand Up @@ -378,3 +379,65 @@ async def test_clock_expire_partially_satisfied_task(

# the task should now be in the expired state
assert e.state(TASK_STATUS_EXPIRED)


async def test_clock_expiry(
flow,
scheduler,
start,
):
"""Waiting tasks should be considered for clock-expiry.
Tests two things:
* Manually triggered tasks should not be considered for clock-expiry.
Tests proposal point 10:
https://cylc.github.io/cylc-admin/proposal-optional-output-extension.html#proposal
* Active tasks should not be considered for clock-expiry.
Closes https://github.com/cylc/cylc-flow/issues/6025
"""
id_ = flow({
'scheduling': {
'initial cycle point': '2000',
'runahead limit': 'P1',
'special tasks': {
'clock-expire': 'x'
},
'graph': {
'P1Y': 'x'
},
},
})
schd = scheduler(id_)
async with start(schd):
# the first task (waiting)
one = schd.pool.get_task(ISO8601Point('20000101T0000Z'), 'x')
assert one

# the second task (preparing)
two = schd.pool.get_task(ISO8601Point('20010101T0000Z'), 'x')
assert two
two.state_reset(TASK_STATUS_PREPARING)

# the third task (force-triggered)
schd.pool.force_trigger_tasks(['20100101T0000Z/x'], ['1'])
three = schd.pool.get_task(ISO8601Point('20100101T0000Z'), 'x')
assert three

# check for expiry
schd.pool.clock_expire_tasks()

# the first task should be expired (it was waiting)
assert one.state(TASK_STATUS_EXPIRED)
assert one.state.outputs.is_message_complete(TASK_OUTPUT_EXPIRED)

# the second task should *not* be expired (it was active)
assert not two.state(TASK_STATUS_EXPIRED)
assert not two.state.outputs.is_message_complete(TASK_OUTPUT_EXPIRED)

# the third task should *not* be expired (it was a manual submit)
assert not three.state(TASK_STATUS_EXPIRED)
assert not three.state.outputs.is_message_complete(TASK_OUTPUT_EXPIRED)

0 comments on commit 505b7d4

Please sign in to comment.