Skip to content

Commit

Permalink
expiry: prevent expired tasks from retrying automatically
Browse files Browse the repository at this point in the history
* Closes #6284
  • Loading branch information
oliver-sanders committed Sep 3, 2024
1 parent 8bb1f44 commit 547da8f
Show file tree
Hide file tree
Showing 3 changed files with 105 additions and 0 deletions.
1 change: 1 addition & 0 deletions cylc/flow/task_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -2239,6 +2239,7 @@ def clock_expire_tasks(self):
# check if this task is clock expired
and itask.clock_expire()
):
self.task_queue_mgr.remove_task(itask)
self.task_events_mgr.process_message(
itask,
logging.WARNING,
Expand Down
56 changes: 56 additions & 0 deletions tests/integration/scripts/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
#!/usr/bin/env python3

# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE.
# Copyright (C) NIWA & British Crown (Met Office) & Contributors.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

from types import SimpleNamespace
from uuid import uuid1

import pytest

from cylc.flow.workflow_files import WorkflowFiles

from ..utils.flow_writer import flow_config_str


@pytest.fixture
def one_src(tmp_path, one_conf):
src_dir = tmp_path
(src_dir / 'flow.cylc').write_text(flow_config_str(one_conf))
(src_dir / 'rose-suite.conf').touch()
return SimpleNamespace(path=src_dir)


@pytest.fixture
def one_run(one_src, test_dir, run_dir):
w_run_dir = test_dir / str(uuid1())
w_run_dir.mkdir()
(w_run_dir / 'flow.cylc').write_text(
(one_src.path / 'flow.cylc').read_text()
)
(w_run_dir / 'rose-suite.conf').write_text(
(one_src.path / 'rose-suite.conf').read_text()
)
install_dir = (w_run_dir / WorkflowFiles.Install.DIRNAME)
install_dir.mkdir(parents=True)
(install_dir / WorkflowFiles.Install.SOURCE).symlink_to(
one_src.path,
target_is_directory=True,
)
return SimpleNamespace(
path=w_run_dir,
id=str(w_run_dir.relative_to(run_dir)),
)
48 changes: 48 additions & 0 deletions tests/integration/test_task_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -2150,3 +2150,51 @@ async def test_trigger_unqueued(flow, scheduler, start):
assert not schd.pool.task_queue_mgr.force_released, (
"Triggering an unqueued task should not affect the force_released list"
)


@pytest.mark.parametrize('expire_type', ['clock-expire', 'manual'])
async def test_expire_dequeue_with_retries(flow, scheduler, start, expire_type):
"""An expired waiting task should be removed from any queues.
See https://github.com/cylc/cylc-flow/issues/6284
"""
conf = {
'scheduling': {
'initial cycle point': '2000',

'graph': {
'R1': 'foo'
},
},
'runtime': {
'foo': {
'execution retry delays': 'PT0S'
}
}
}

if expire_type == 'clock-expire':
conf['scheduling']['special tasks'] = {'clock-expire': 'foo(PT0S)'}
method = lambda schd: schd.pool.clock_expire_tasks()
else:
method = lambda schd: schd.pool.set_prereqs_and_outputs(
['2000/foo'], prereqs=[], outputs=['expired'], flow=['1']
)

id_ = flow(conf)
schd = scheduler(id_)
schd: Scheduler
async with start(schd):
itask = schd.pool.get_tasks()[0]

# the task should start as "waiting(queued)"
assert itask.state(TASK_STATUS_WAITING, is_queued=True)

# expire the task via whichever method we are testing
method(schd)

# the task should enter the "expired" state
assert itask.state(TASK_STATUS_EXPIRED, is_queued=False)

# the task should also have been removed from the queue
assert not schd.pool.task_queue_mgr.remove_task(itask)

0 comments on commit 547da8f

Please sign in to comment.