Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix initial runahead limit, for offset recurrence start #5708

Merged
merged 5 commits into from
Aug 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changes.d/5708.fix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix runahead limit at start-up, with recurrences that start beyond the limit.
27 changes: 24 additions & 3 deletions cylc/flow/cycling/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -320,8 +320,8 @@ class SequenceBase(metaclass=ABCMeta):
They should also provide get_async_expr, get_interval,
get_offset & set_offset (deprecated), is_on_sequence,
get_nearest_prev_point, get_next_point,
get_next_point_on_sequence, get_first_point, and
get_stop_point.
get_next_point_on_sequence, get_first_point
get_start_point, and get_stop_point.

They should also provide a self.__eq__ implementation
which should return whether a SequenceBase-derived object
Expand Down Expand Up @@ -405,11 +405,32 @@ def get_first_point(self, point):
"""Return the first point >= to point, or None if out of bounds."""
pass

@abstractmethod
def get_start_point(self):
"""Return the first point of this sequence."""
pass

@abstractmethod
def get_stop_point(self):
"""Return the last point in this sequence, or None if unbounded."""
"""Return the last point of this sequence, or None if unbounded."""
pass

def get_first_n_points(self, n, point=None):
"""Return a list of first n points of this sequence."""
if point is None:
p1 = self.get_start_point()
else:
p1 = self.get_first_point(point)
if p1 is None:
return []
result = [p1]
for _ in range(1, n):
p1 = self.get_next_point_on_sequence(p1)
if p1 is None:
break
result.append(p1)
return result

@abstractmethod
def __eq__(self, other) -> bool:
# Return True if other (sequence) is equal to self.
Expand Down
78 changes: 54 additions & 24 deletions cylc/flow/task_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -309,18 +309,54 @@ def compute_runahead(self, force=False) -> bool:
With force=True we recompute the limit even if the base point has not
changed (needed if max_future_offset changed, or on reload).
"""

limit = self.config.runahead_limit # e.g. P2 or P2D
count_cycles = False
with suppress(TypeError):
# Count cycles (integer cycling, and optional for datetime too).
ilimit = int(limit) # type: ignore
count_cycles = True

base_point: 'PointBase'
points: List['PointBase'] = []
sequence_points: Set['PointBase']

if not self.main_pool:
# Start at first point in each sequence, after the initial point.
points = [
point
for point in {
seq.get_first_point(self.config.start_point)
for seq in self.config.sequences
}
if point is not None
]
# No tasks yet, just consider sequence points.
if count_cycles:
# Get the first ilimit points in each sequence.
# (After workflow start point - sequence may begin earlier).
points = [
point
for plist in [
seq.get_first_n_points(
ilimit, self.config.start_point)
for seq in self.config.sequences
]
for point in plist
]
# Drop points beyond the limit.
points = sorted(points)[:ilimit + 1]
base_point = min(points)

else:
# Start at first point in each sequence.
# (After workflow start point - sequence may begin earlier).
points = [
point
for point in {
seq.get_first_point(self.config.start_point)
for seq in self.config.sequences
}
if point is not None
]
base_point = min(points)
# Drop points beyond the limit.
points = [
point
for point in points
if point <= base_point + limit
]

else:
# Find the earliest point with unfinished tasks.
for point, itasks in sorted(self.get_tasks_by_point().items()):
Expand All @@ -344,9 +380,10 @@ def compute_runahead(self, force=False) -> bool:
)
):
points.append(point)
if not points:
return False
base_point = min(points)

if not points:
return False
base_point = min(points)

if self._prev_runahead_base_point is None:
self._prev_runahead_base_point = base_point
Expand All @@ -363,15 +400,8 @@ def compute_runahead(self, force=False) -> bool:
# change or the runahead limit is already at stop point.
return False

try:
limit = int(self.config.runahead_limit) # type: ignore
except TypeError:
count_cycles = False
limit = self.config.runahead_limit
else:
count_cycles = True

# Get all cycle points possible after the runahead base point.
# Get all cycle points possible after the base point.
sequence_points: Set['PointBase']
if (
not force
and self._prev_runahead_sequence_points
Expand All @@ -388,7 +418,7 @@ def compute_runahead(self, force=False) -> bool:
while seq_point is not None:
if count_cycles:
# P0 allows only the base cycle point to run.
if count > 1 + limit:
if count > 1 + ilimit:
break
else:
# PT0H allows only the base cycle point to run.
Expand All @@ -404,7 +434,7 @@ def compute_runahead(self, force=False) -> bool:

if count_cycles:
# Some sequences may have different intervals.
limit_point = sorted(points)[:(limit + 1)][-1]
limit_point = sorted(points)[:(ilimit + 1)][-1]
else:
# We already stopped at the runahead limit.
limit_point = sorted(points)[-1]
Expand Down
44 changes: 44 additions & 0 deletions tests/integration/test_task_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from cylc.flow import CYLC_LOG
from cylc.flow.cycling import PointBase
from cylc.flow.cycling.integer import IntegerPoint
from cylc.flow.cycling.iso8601 import ISO8601Point
from cylc.flow.data_store_mgr import TASK_PROXIES
from cylc.flow.task_outputs import TASK_OUTPUT_SUCCEEDED
from cylc.flow.scheduler import Scheduler
Expand Down Expand Up @@ -62,6 +63,22 @@
}


EXAMPLE_FLOW_2_CFG = {
'scheduler': {
'allow implicit tasks': True,
'UTC mode': True
},
'scheduling': {
'initial cycle point': '2001',
'runahead limit': 'P3Y',
'graph': {
'P1Y': 'foo',
'R/2025/P1Y': 'foo => bar',
}
},
}


def get_task_ids(
name_point_list: Iterable[Tuple[str, Union[PointBase, str, int]]]
) -> List[str]:
Expand Down Expand Up @@ -129,6 +146,22 @@ async def example_flow(
yield schd


@pytest.fixture(scope='module')
async def mod_example_flow_2(
mod_flow: Callable, mod_scheduler: Callable, mod_run: Callable
) -> Scheduler:
"""Return a scheduler for interrogating its task pool.

This is module-scoped so faster than example_flow, but should only be used
where the test does not mutate the state of the scheduler or task pool.
"""
id_ = mod_flow(EXAMPLE_FLOW_2_CFG)
schd: Scheduler = mod_scheduler(id_, paused_start=True)
async with mod_run(schd):
pass
return schd


@pytest.mark.parametrize(
'items, expected_task_ids, expected_bad_items, expected_warnings',
[
Expand Down Expand Up @@ -1157,3 +1190,14 @@ async def test_task_proxy_remove_from_queues(

assert queues_after['default'] == ['1/hidden_control']
assert queues_after['queue_two'] == ['1/control']


async def test_runahead_offset_start(
mod_example_flow_2: Scheduler
) -> None:
"""Late-start recurrences should not break the runahead limit at start-up.

See GitHub #5708
"""
task_pool = mod_example_flow_2.pool
assert task_pool.runahead_limit_point == ISO8601Point('2004')
98 changes: 98 additions & 0 deletions tests/unit/cycling/test_cycling.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,21 @@
parse_exclusion,
)

from cylc.flow.cycling.integer import (
IntegerPoint,
IntegerSequence,
)

from cylc.flow.cycling.iso8601 import (
ISO8601Point,
ISO8601Sequence,
)

from cylc.flow.cycling.loader import (
INTEGER_CYCLING_TYPE,
ISO8601_CYCLING_TYPE,
)


def test_simple_abstract_class_test():
"""Cannot instantiate abstract classes, they must be defined in
Expand Down Expand Up @@ -73,3 +88,86 @@ def test_parse_bad_exclusion(expression):
"""Tests incorrectly formatted exclusions"""
with pytest.raises(Exception):
parse_exclusion(expression)


@pytest.mark.parametrize(
'sequence, wf_start_point, expected',
(
(
('R/2/P2', 1),
None,
[2,4,6,8,10]
),
(
('R/2/P2', 1),
3,
[4,6,8,10,12]
),
),
)
def test_get_first_n_points_integer(
set_cycling_type,
sequence, wf_start_point, expected
):
"""Test sequence get_first_n_points method.

(The method is implemented in the base class).
"""
set_cycling_type(INTEGER_CYCLING_TYPE)
sequence = IntegerSequence(*sequence)
if wf_start_point is not None:
wf_start_point = IntegerPoint(wf_start_point)
expected = [
IntegerPoint(p)
for p in expected
]
assert (
expected == (
sequence.get_first_n_points(
len(expected),
wf_start_point
)
)
)


@pytest.mark.parametrize(
'sequence, wf_start_point, expected',
(
(
('R/2008/P2Y', '2001'),
None,
['2008', '2010', '2012', '2014', '2016']
),
(
('R/2008/P2Y', '2001'),
'2009',
['2010', '2012', '2014', '2016', '2018']
),
),
)
def test_get_first_n_points_iso8601(
set_cycling_type,
sequence, wf_start_point, expected
):
"""Test sequence get_first_n_points method.

(The method is implemented in the base class).
"""
set_cycling_type(ISO8601_CYCLING_TYPE, 'Z')
sequence = ISO8601Sequence(*sequence)
if wf_start_point is not None:
wf_start_point = ISO8601Point(wf_start_point)
expected = [
ISO8601Point(p)
for p in expected
]

assert (
expected == (
sequence.get_first_n_points(
len(expected),
wf_start_point
)
)
)