From 53a10edf162b1e5f680e1d13faebdecb3bf5f8c0 Mon Sep 17 00:00:00 2001 From: Tim Pillinger <26465611+wxtim@users.noreply.github.com> Date: Wed, 29 Mar 2023 15:25:11 +0100 Subject: [PATCH 01/15] Create a mapping between platforms and install_targets for use by remote tidy Remove broken unit test tests for map_platforms_used_for_install_targets unit test select_platforms_used Test remote_tidy Changelog f --- CHANGES.md | 3 + cylc/flow/platforms.py | 71 +++++++---- cylc/flow/rundb.py | 8 ++ cylc/flow/task_job_mgr.py | 2 +- cylc/flow/task_remote_mgr.py | 42 +++++-- tests/integration/test_task_remote_mgr.py | 104 +++++++++++++++++ tests/unit/test_platforms.py | 136 +++++++++++++++------- tests/unit/test_rundb.py | 20 ++++ 8 files changed, 307 insertions(+), 79 deletions(-) create mode 100644 tests/integration/test_task_remote_mgr.py diff --git a/CHANGES.md b/CHANGES.md index 5410f3c2376..2c0b30a00bf 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -14,6 +14,9 @@ ones in. --> ### Fixes +[5445](https://github.com/cylc/cylc-flow/pull/5445) - Fix remote tidy + bug where install target is not explicit in platform definition. + [5398](https://github.com/cylc/cylc-flow/pull/5398) - Fix platform from group selection order bug. diff --git a/cylc/flow/platforms.py b/cylc/flow/platforms.py index 6f1b4b1be03..cdf55ca7642 100644 --- a/cylc/flow/platforms.py +++ b/cylc/flow/platforms.py @@ -20,7 +20,8 @@ import re from copy import deepcopy from typing import ( - TYPE_CHECKING, Any, Dict, Iterable, List, Optional, Set, Union, overload + TYPE_CHECKING, Any, Dict, Iterable, + List, Optional, Set, Tuple, Union, overload ) from cylc.flow import LOG @@ -649,32 +650,52 @@ def is_platform_with_target_in_list( ) -def get_all_platforms_for_install_target( - install_target: str -) -> List[Dict[str, Any]]: - """Return list of platform dictionaries for given install target.""" - platforms: List[Dict[str, Any]] = [] - all_platforms = glbl_cfg(cached=True).get(['platforms'], sparse=False) - for k, v in all_platforms.iteritems(): # noqa: B301 (iteritems valid here) - if (v.get('install target', k) == install_target): - v_copy = deepcopy(v) - v_copy['name'] = k - platforms.append(v_copy) - return platforms +def map_platforms_used_for_install_targets( + platform_names: Set[str], + install_targets: Set[str] +) -> Tuple[Dict[str, List[Dict[Any, Any]]], Set[str]]: + """Get a mapping of install targets to platforms. + Different from get_install_target_platforms_map because it is passed + a list of install targets actually used to look for. -def get_random_platform_for_install_target( - install_target: str -) -> Dict[str, Any]: - """Return a randomly selected platform (dict) for given install target.""" - platforms = get_all_platforms_for_install_target(install_target) - try: - return random.choice(platforms) # nosec (not crypto related) - except IndexError: - # No platforms to choose from - raise PlatformLookupError( - f'Could not select platform for install target: {install_target}' - ) + Returns: + install_target_map: { + 'install target': [ + {...platform2...}, + {...platform2...} + ] + } + unreachable_targets: + A list of install_targets which we cannot get platforms for. + """ + install_targets_map: Dict[str, List] = { + target: [] for target in install_targets} + for platform_name in platform_names: + try: + platform = get_platform(platform_name) + except PlatformLookupError: + # We only need one working platform per install target: + continue + else: + # Install Target + if 'install target' in platform and platform['install target']: + install_targets_map[platform['install target']].append( + platform) + else: + install_targets_map[platform['name']].append(platform) + + # Look for unreachable install targets and report them: + unreachable_targets = { + target for target, platforms + in install_targets_map.items() + if not platforms + } + # Remove unreachable targets from list: + [install_targets_map.pop(i) for i in unreachable_targets] + # Otherwise return out targets map object. + + return install_targets_map, unreachable_targets def get_localhost_install_target() -> str: diff --git a/cylc/flow/rundb.py b/cylc/flow/rundb.py index e0a18b61c8d..4a931c87992 100644 --- a/cylc/flow/rundb.py +++ b/cylc/flow/rundb.py @@ -684,6 +684,14 @@ def select_task_job(self, cycle, name, submit_num=None): except sqlite3.DatabaseError: return None + def select_platforms_used(self): + """Get a list of platforms used by tasks.""" + stmt = r'SELECT platform_name FROM task_jobs' + try: + return {r[0] for r in self.connect().execute(stmt)} + except sqlite3.DatabaseError: + return None + def select_jobs_for_restart(self, callback): """Select from task_pool+task_states+task_jobs for restart. diff --git a/cylc/flow/task_job_mgr.py b/cylc/flow/task_job_mgr.py index 6671cbe0c93..fde5f65c8b0 100644 --- a/cylc/flow/task_job_mgr.py +++ b/cylc/flow/task_job_mgr.py @@ -157,7 +157,7 @@ def __init__(self, workflow, proc_pool, workflow_db_mgr, self.bad_hosts = bad_hosts self.bad_hosts_to_clear = set() self.task_remote_mgr = TaskRemoteMgr( - workflow, proc_pool, self.bad_hosts) + workflow, proc_pool, self.bad_hosts, self.workflow_db_mgr) def check_task_jobs(self, workflow, task_pool): """Check submission and execution timeout and polling timers. diff --git a/cylc/flow/task_remote_mgr.py b/cylc/flow/task_remote_mgr.py index 59b6e04cf42..d6e43b7b3aa 100644 --- a/cylc/flow/task_remote_mgr.py +++ b/cylc/flow/task_remote_mgr.py @@ -27,6 +27,7 @@ from pathlib import Path from cylc.flow.option_parsers import verbosity_to_opts import os +import random from shlex import quote import re from subprocess import Popen, PIPE, DEVNULL @@ -35,7 +36,7 @@ from typing import Any, Deque, Dict, TYPE_CHECKING, List, NamedTuple, Tuple from cylc.flow import LOG -from cylc.flow.exceptions import PlatformError +from cylc.flow.exceptions import PlatformError, PlatformLookupError import cylc.flow.flags from cylc.flow.hostuserutil import is_remote_host from cylc.flow.network.client_factory import CommsMeth @@ -51,8 +52,8 @@ get_host_from_platform, get_install_target_from_platform, get_localhost_install_target, - get_random_platform_for_install_target, log_platform_event, + map_platforms_used_for_install_targets ) from cylc.flow.remote import construct_rsync_over_ssh_cmd, construct_ssh_cmd from cylc.flow.subprocctx import SubProcContext @@ -91,7 +92,7 @@ class RemoteTidyQueueTuple(NamedTuple): class TaskRemoteMgr: """Manage task remote initialisation, tidy, selection.""" - def __init__(self, workflow, proc_pool, bad_hosts): + def __init__(self, workflow, proc_pool, bad_hosts, db_mgr): self.workflow = workflow self.proc_pool = proc_pool # self.remote_command_map = {command: host|PlatformError|None} @@ -105,6 +106,7 @@ def __init__(self, workflow, proc_pool, bad_hosts): self.bad_hosts = bad_hosts self.is_reload = False self.is_restart = False + self.db_mgr = db_mgr def subshell_eval(self, command, command_pattern, host_check=True): """Evaluate a task platform from a subshell string. @@ -293,19 +295,39 @@ def remote_tidy(self) -> None: This method is called on workflow shutdown, so we want nothing to hang. Timeout any incomplete commands after 10 seconds. """ + # Get a list of all platforms used from workflow database: + platforms_used = ( + self.db_mgr.get_pri_dao().select_platforms_used()) + # For each install target compile a list of platforms: + install_targets = { + target for target, msg + in self.remote_init_map.items() + if msg == REMOTE_FILE_INSTALL_DONE + } + install_targets_map, unreachable_targets = ( + map_platforms_used_for_install_targets( + platforms_used, install_targets)) + # If we couldn't find a platform for a target, we cannot tidy it - + # raise an Error: + if unreachable_targets: + msg = 'No platforms available to remote tidy install targets:' + for unreachable_target in unreachable_targets: + msg = ( + 'Unable to tidy the following install targets' + ' because no matching platform was found:' + ) + msg += f'\n * {unreachable_target}' + LOG.error(msg) + # Issue all SSH commands in parallel queue: Deque[RemoteTidyQueueTuple] = deque() - for install_target, message in self.remote_init_map.items(): - if message != REMOTE_FILE_INSTALL_DONE: - continue + for install_target in install_targets_map.keys(): if install_target == get_localhost_install_target(): continue + platform = random.choice(list(install_targets_map[install_target])) try: - platform = get_random_platform_for_install_target( - install_target - ) cmd, host = self.construct_remote_tidy_ssh_cmd(platform) - except (NoHostsError, PlatformLookupError) as exc: + except NoHostsError as exc: LOG.warning( PlatformError( f'{PlatformError.MSG_TIDY}\n{exc}', diff --git a/tests/integration/test_task_remote_mgr.py b/tests/integration/test_task_remote_mgr.py new file mode 100644 index 00000000000..35c15922c99 --- /dev/null +++ b/tests/integration/test_task_remote_mgr.py @@ -0,0 +1,104 @@ +# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE. +# Copyright (C) NIWA & British Crown (Met Office) & Contributors. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +from cylc.flow.task_remote_mgr import ( + REMOTE_FILE_INSTALL_DONE, + REMOTE_FILE_INSTALL_FAILED +) + + +async def test_remote_tidy( + flow, + scheduler, + start, + mock_glbl_cfg, + one_conf +): + """Remote tidy gets platforms for install targets. + + In particular, referencing https://github.com/cylc/cylc-flow/issues/5429, + ensure that install targets defined implicitly by platform name are found. + + Mock remote init map: + - Include an install target (quiz) with + message != REMOTE_FILE_INSTALL_DONE to ensure that + this is picked out. + - Install targets where we can get a platform + - foo - Install target is implicitly the platfrom name. + - bar9 - The install target is implicitly the plaform name, + and the platform name matches a platform regex. + - baz - Install target is set explicitly. + - An install target (qux) where we cannot get a platform: Ensure + that we get the desired error. + """ + mock_glbl_cfg( + 'cylc.flow.platforms.glbl_cfg', + ''' + [platforms] + [[foo]] + # install target = foo (implicit) + # hosts = foo (implicit) + [[bar.]] + # install target = bar1 to bar9 (implicit) + # hosts = bar1 to bar9 (implicit) + [[baz]] + install target = baz + hosts = baz + ''', + ) + + # Get a scheduler: + id_ = flow(one_conf) + schd = scheduler(id_) + async with start(schd) as log: + # Write database with 6 tasks using 3 platforms: + platforms = ['baz', 'bar9', 'foo'] + line = r"('', '', {}, 0, 1, '', '', 0,'', '', '', 0, '', '{}', 4, '')" + stmt = r"INSERT INTO task_jobs VALUES" + r','.join([ + line.format(i, platform) for i, platform in enumerate(platforms) + ]) + schd.workflow_db_mgr.pri_dao.connect().execute(stmt) + schd.workflow_db_mgr.pri_dao.connect().commit() + + # Mock a remote init map. + schd.task_job_mgr.task_remote_mgr.remote_init_map = { + 'baz': REMOTE_FILE_INSTALL_DONE, # Should match platform baz + 'bar9': REMOTE_FILE_INSTALL_DONE, # Should match platform bar. + 'foo': REMOTE_FILE_INSTALL_DONE, # Should match plaform foo + 'qux': REMOTE_FILE_INSTALL_DONE, # Should not match a plaform + 'quiz': REMOTE_FILE_INSTALL_FAILED, # Should not be considered + } + + # Clear the log, run the test: + log.clear() + schd.task_job_mgr.task_remote_mgr.remote_tidy() + records = [str(r.msg) for r in log.records] + + # We can't get qux, no defined platform has a matching install target: + qux_msg = ( + 'Unable to tidy the following install targets because no' + ' matching platform was found:\n * qux') + assert qux_msg in records + + # We can get foo bar baz, and we try to remote tidy them. + # (This will ultimately fail, but past the point we are testing). + for target in ['foo', 'bar9', 'baz']: + msg = f'platform: {target} - remote tidy (on {target})' + assert msg in records + + # We haven't done anything with Quiz because we're only looking + # at cases where platform == REMOTE_FILE_INSTALL_DONE + assert not [r for r in records if 'quiz' in r] diff --git a/tests/unit/test_platforms.py b/tests/unit/test_platforms.py index 8bbe19ebbd4..d90abd5226f 100644 --- a/tests/unit/test_platforms.py +++ b/tests/unit/test_platforms.py @@ -21,14 +21,14 @@ from cylc.flow.parsec.OrderedDict import OrderedDictWithDefaults from cylc.flow.platforms import ( - get_all_platforms_for_install_target, get_platform, get_platform_deprecated_settings, - get_random_platform_for_install_target, is_platform_definition_subshell, + is_platform_definition_subshell, platform_from_name, platform_name_from_job_info, get_install_target_from_platform, get_install_target_to_platforms_map, generic_items_match, + map_platforms_used_for_install_targets, _validate_single_host ) from cylc.flow.exceptions import ( @@ -511,47 +511,6 @@ def test_generic_items_match(platform, job, remote, expect): assert generic_items_match(platform, job, remote) == expect -def test_get_all_platforms_for_install_target(mock_glbl_cfg): - mock_glbl_cfg( - 'cylc.flow.platforms.glbl_cfg', - ''' - [platforms] - [[localhost]] - hosts = localhost - install target = localhost - [[olaf]] - hosts = snow, ice, sparkles - install target = arendelle - [[snow white]] - hosts = happy, sleepy, dopey - install target = forest - [[kristoff]] - hosts = anna, elsa, hans - install target = arendelle - [[belle]] - hosts = beast, maurice - install target = france - [[bambi]] - hosts = thumper, faline, flower - install target = forest - [[merida]] - hosts = angus, fergus - install target = forest - [[forest]] - hosts = fir, oak, elm - ''' - ) - actual = get_all_platforms_for_install_target('forest') - expected = ['snow white', 'bambi', 'merida', 'forest'] - for platform in actual: - assert platform['name'] in expected - arendelle_platforms = ['kristoff', 'olaf'] - assert get_random_platform_for_install_target( - 'arendelle')['name'] in arendelle_platforms - assert get_random_platform_for_install_target( - 'forest')['name'] not in arendelle_platforms - - @pytest.mark.parametrize( 'task_conf, expected', [ @@ -647,3 +606,94 @@ def test_get_platform_from_OrderedDictWithDefaults(mock_glbl_cfg): ]) result = get_platform(task_conf)['name'] assert result == 'skarloey' + + +@pytest.mark.parametrize( + 'platform_names, install_targets, glblcfg, expect', + [ + pytest.param( + # Two platforms share an install target. Both are reachable. + ['sir_handel', 'peter_sam'], + ['mountain_railway'], + ''' + [platforms] + [[peter_sam, sir_handel]] + install target = mountain_railway + ''', + { + 'targets': {'mountain_railway': ['sir_handel', 'peter_sam']}, + 'unreachable': set() + }, + id='basic' + ), + pytest.param( + # One of our install targets matches one of our platforms, + # but only implicitly; i.e. the platform name is the same as the + # install target name. + ['sir_handel'], + ['sir_handel'], + ''' + [platforms] + [[sir_handel]] + ''', + { + 'targets': {'sir_handel': ['sir_handel']}, + 'unreachable': set() + }, + id='implicit-target' + ), + pytest.param( + # One of our install targets matches one of our platforms, + # but only implicitly, and the platform name is defined using a + # regex. + ['sir_handel42'], + ['sir_handel42'], + ''' + [platforms] + [[sir_handel..]] + ''', + { + 'targets': {'sir_handel42': ['sir_handel42']}, + 'unreachable': set() + }, + id='implicit-target-regex' + ), + pytest.param( + # One of our install targets (rusty) has no defined platforms + # causing a PlatformLookupError. + ['duncan', 'rusty'], + ['mountain_railway', 'rusty'], + ''' + [platforms] + [[duncan]] + install target = mountain_railway + ''', + { + 'targets': {'mountain_railway': ['duncan']}, + 'unreachable': {'rusty'} + }, + id='PlatformLookupError' + ) + ] +) +def test_map_platforms_used_for_install_targets( + mock_glbl_cfg, + platform_names, install_targets, glblcfg, expect +): + def flatten_install_targets_map(itm): + result = {} + for target, platforms in itm.items(): + result[target] = [p['name'] for p in platforms] + return result + + mock_glbl_cfg('cylc.flow.platforms.glbl_cfg', glblcfg) + install_targets_map, unreachable_targets = ( + map_platforms_used_for_install_targets( + platform_names, install_targets)) + + assert ( + expect['targets'] == flatten_install_targets_map(install_targets_map)) + + assert ( + expect['unreachable'] == unreachable_targets) + diff --git a/tests/unit/test_rundb.py b/tests/unit/test_rundb.py index a0754659dc6..3bd10cddc41 100644 --- a/tests/unit/test_rundb.py +++ b/tests/unit/test_rundb.py @@ -153,3 +153,23 @@ def test_context_manager_exit( mock_close.assert_called_once() # Close connection for real: dao.close() + + +def test_select_platforms_used(tmp_path): + """It returns a set of platforms used by task_jobs. + """ + # Setup DB + db_file = tmp_path / 'db' + dao = CylcWorkflowDAO(db_file_name=db_file, create_tables=True) + + # Write database with 6 tasks using 3 platforms: + platforms = ['foo', 'bar', 'bar', 'qux', 'qux', 'qux'] + line = ( + r" ('', '', {}, 0, 1, '', '', 0, '', '', '', 0, '', '{}', 4377, '')") + stmt = r"INSERT INTO task_jobs VALUES" + r','.join([ + line.format(i, platform) for i, platform in enumerate(platforms) + ]) + dao.conn.execute(stmt) + + # Assert that we only have 3 platforms reported by function: + assert dao.select_platforms_used() == {'bar', 'qux', 'foo'} From af7d22eb83253e63d05153225b45f6ac83c4cd2a Mon Sep 17 00:00:00 2001 From: Tim Pillinger <26465611+wxtim@users.noreply.github.com> Date: Fri, 31 Mar 2023 10:58:24 +0100 Subject: [PATCH 02/15] Update tests/integration/test_task_remote_mgr.py Co-authored-by: Oliver Sanders --- tests/integration/test_task_remote_mgr.py | 28 +++++++++++------------ 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/tests/integration/test_task_remote_mgr.py b/tests/integration/test_task_remote_mgr.py index 35c15922c99..29371c0a8aa 100644 --- a/tests/integration/test_task_remote_mgr.py +++ b/tests/integration/test_task_remote_mgr.py @@ -85,20 +85,20 @@ async def test_remote_tidy( # Clear the log, run the test: log.clear() schd.task_job_mgr.task_remote_mgr.remote_tidy() - records = [str(r.msg) for r in log.records] + records = [str(r.msg) for r in log.records] - # We can't get qux, no defined platform has a matching install target: - qux_msg = ( - 'Unable to tidy the following install targets because no' - ' matching platform was found:\n * qux') - assert qux_msg in records + # We can't get qux, no defined platform has a matching install target: + qux_msg = ( + 'Unable to tidy the following install targets because no' + ' matching platform was found:\n * qux') + assert qux_msg in records - # We can get foo bar baz, and we try to remote tidy them. - # (This will ultimately fail, but past the point we are testing). - for target in ['foo', 'bar9', 'baz']: - msg = f'platform: {target} - remote tidy (on {target})' - assert msg in records + # We can get foo bar baz, and we try to remote tidy them. + # (This will ultimately fail, but past the point we are testing). + for target in ['foo', 'bar9', 'baz']: + msg = f'platform: {target} - remote tidy (on {target})' + assert msg in records - # We haven't done anything with Quiz because we're only looking - # at cases where platform == REMOTE_FILE_INSTALL_DONE - assert not [r for r in records if 'quiz' in r] + # We haven't done anything with Quiz because we're only looking + # at cases where platform == REMOTE_FILE_INSTALL_DONE + assert not [r for r in records if 'quiz' in r] From f498168be9742da94e6e7cb1d87229342600413f Mon Sep 17 00:00:00 2001 From: Tim Pillinger <26465611+wxtim@users.noreply.github.com> Date: Fri, 31 Mar 2023 12:59:23 +0100 Subject: [PATCH 03/15] Removed new platforms method map_platforms_for_install_targets and replaced it by adding a "quiet" mode to get_install_targets_platform_map. Added test for the new quiet option. Factored some logic from remote_tidy to a staticmethod of TaskRemoteMgr to allow unit testing. Added unit test. monkeypatch test to avoid actual subproc calls --- cylc/flow/platforms.py | 63 +++---------- cylc/flow/task_remote_mgr.py | 62 +++++++++---- tests/integration/test_task_remote_mgr.py | 26 +++++- tests/unit/test_platforms.py | 101 ++------------------ tests/unit/test_task_remote_mgr.py | 108 ++++++++++++++++++++++ 5 files changed, 196 insertions(+), 164 deletions(-) diff --git a/cylc/flow/platforms.py b/cylc/flow/platforms.py index cdf55ca7642..a1b6ee9f121 100644 --- a/cylc/flow/platforms.py +++ b/cylc/flow/platforms.py @@ -613,18 +613,29 @@ def get_install_target_from_platform(platform: Dict[str, Any]) -> str: def get_install_target_to_platforms_map( - platform_names: Iterable[str] + platform_names: Iterable[str], + quiet: bool = False ) -> Dict[str, List[Dict[str, Any]]]: """Get a dictionary of unique install targets and the platforms which use them. Args: platform_names: List of platform names to look up in the global config. + quiet: Supress PlatformNotFound Errors Return {install_target_1: [platform_1_dict, platform_2_dict, ...], ...} """ platform_names = set(platform_names) - platforms = [platform_from_name(p_name) for p_name in platform_names] + platforms: List[Dict[str, Any]] = [] + for p_name in platform_names: + try: + platform = platform_from_name(p_name) + except PlatformLookupError as exc: + if not quiet: + raise exc + else: + platforms.append(platform) + install_targets = { get_install_target_from_platform(platform) for platform in platforms @@ -650,54 +661,6 @@ def is_platform_with_target_in_list( ) -def map_platforms_used_for_install_targets( - platform_names: Set[str], - install_targets: Set[str] -) -> Tuple[Dict[str, List[Dict[Any, Any]]], Set[str]]: - """Get a mapping of install targets to platforms. - - Different from get_install_target_platforms_map because it is passed - a list of install targets actually used to look for. - - Returns: - install_target_map: { - 'install target': [ - {...platform2...}, - {...platform2...} - ] - } - unreachable_targets: - A list of install_targets which we cannot get platforms for. - """ - install_targets_map: Dict[str, List] = { - target: [] for target in install_targets} - for platform_name in platform_names: - try: - platform = get_platform(platform_name) - except PlatformLookupError: - # We only need one working platform per install target: - continue - else: - # Install Target - if 'install target' in platform and platform['install target']: - install_targets_map[platform['install target']].append( - platform) - else: - install_targets_map[platform['name']].append(platform) - - # Look for unreachable install targets and report them: - unreachable_targets = { - target for target, platforms - in install_targets_map.items() - if not platforms - } - # Remove unreachable targets from list: - [install_targets_map.pop(i) for i in unreachable_targets] - # Otherwise return out targets map object. - - return install_targets_map, unreachable_targets - - def get_localhost_install_target() -> str: """Returns the install target of localhost platform""" localhost = get_platform() diff --git a/cylc/flow/task_remote_mgr.py b/cylc/flow/task_remote_mgr.py index d6e43b7b3aa..a8920e6c048 100644 --- a/cylc/flow/task_remote_mgr.py +++ b/cylc/flow/task_remote_mgr.py @@ -36,7 +36,9 @@ from typing import Any, Deque, Dict, TYPE_CHECKING, List, NamedTuple, Tuple from cylc.flow import LOG -from cylc.flow.exceptions import PlatformError, PlatformLookupError +from cylc.flow.exceptions import ( + PlatformError, PlatformLookupError, NoHostsError +) import cylc.flow.flags from cylc.flow.hostuserutil import is_remote_host from cylc.flow.network.client_factory import CommsMeth @@ -47,13 +49,11 @@ get_workflow_run_dir, ) from cylc.flow.platforms import ( - NoHostsError, - PlatformLookupError, get_host_from_platform, get_install_target_from_platform, + get_install_target_to_platforms_map, get_localhost_install_target, log_platform_event, - map_platforms_used_for_install_targets ) from cylc.flow.remote import construct_rsync_over_ssh_cmd, construct_ssh_cmd from cylc.flow.subprocctx import SubProcContext @@ -288,6 +288,44 @@ def construct_remote_tidy_ssh_cmd( cmd = construct_ssh_cmd(cmd, platform, host, timeout='10s') return cmd, host + @staticmethod + def _get_remote_tidy_targets( + platform_names: List[str], + install_targets: List[str] + ) -> Dict[str, List[Dict[str, Any]]]: + """Finds valid platforms for install targets, warns about in invalid + install targets. + + logs: + A list of install targets where no platform can be found. + + returns: + A mapping of install targets to valid platforms only where + platforms are available. + """ + if platform_names is None and install_targets: + install_targets_map = {t: [] for t in install_targets} + else: + install_targets_map = get_install_target_to_platforms_map( + platform_names, quiet=True) + + # If we couldn't find a platform for a target, we cannot tidy it - + # raise an Error: + unreachable_targets = { + t for t in install_targets if not install_targets_map.get(t) + } + if unreachable_targets: + msg = 'No platforms available to remote tidy install targets:' + for unreachable_target in unreachable_targets: + msg = ( + 'Unable to tidy the following install targets' + ' because no matching platform was found:' + ) + msg += f'\n * {unreachable_target}' + LOG.error(msg) + + return install_targets_map + def remote_tidy(self) -> None: """Remove workflow contact files and keys from initialised remotes. @@ -304,20 +342,8 @@ def remote_tidy(self) -> None: in self.remote_init_map.items() if msg == REMOTE_FILE_INSTALL_DONE } - install_targets_map, unreachable_targets = ( - map_platforms_used_for_install_targets( - platforms_used, install_targets)) - # If we couldn't find a platform for a target, we cannot tidy it - - # raise an Error: - if unreachable_targets: - msg = 'No platforms available to remote tidy install targets:' - for unreachable_target in unreachable_targets: - msg = ( - 'Unable to tidy the following install targets' - ' because no matching platform was found:' - ) - msg += f'\n * {unreachable_target}' - LOG.error(msg) + install_targets_map = self._get_remote_tidy_targets( + platforms_used, install_targets) # Issue all SSH commands in parallel queue: Deque[RemoteTidyQueueTuple] = deque() diff --git a/tests/integration/test_task_remote_mgr.py b/tests/integration/test_task_remote_mgr.py index 29371c0a8aa..f9d0356eea3 100644 --- a/tests/integration/test_task_remote_mgr.py +++ b/tests/integration/test_task_remote_mgr.py @@ -14,6 +14,8 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see . +import cylc +from cylc.flow.exceptions import NoHostsError from cylc.flow.task_remote_mgr import ( REMOTE_FILE_INSTALL_DONE, REMOTE_FILE_INSTALL_FAILED @@ -25,7 +27,8 @@ async def test_remote_tidy( scheduler, start, mock_glbl_cfg, - one_conf + one_conf, + monkeypatch ): """Remote tidy gets platforms for install targets. @@ -44,6 +47,21 @@ async def test_remote_tidy( - An install target (qux) where we cannot get a platform: Ensure that we get the desired error. """ + # Monkeypatch away subprocess.Popen calls - prevent any interaction with + # remotes actually happening: + class MockProc: + def __init__(self, *args, **kwargs): + breakpoint() + self.poll = lambda: True + self.returncode = 0 + self.communicate = lambda: ('out', 'err') + + monkeypatch.setattr( + cylc.flow.task_remote_mgr, + 'Popen', + lambda *args, **kwargs: MockProc(*args, **kwargs) + ) + mock_glbl_cfg( 'cylc.flow.platforms.glbl_cfg', ''' @@ -56,7 +74,9 @@ async def test_remote_tidy( # hosts = bar1 to bar9 (implicit) [[baz]] install target = baz - hosts = baz + hosts = baum, bay, baz + [[[selection]]] + method = definition order ''', ) @@ -84,7 +104,9 @@ async def test_remote_tidy( # Clear the log, run the test: log.clear() + # schd.task_job_mgr.bad_hosts.update(['baum', 'bay']) schd.task_job_mgr.task_remote_mgr.remote_tidy() + records = [str(r.msg) for r in log.records] # We can't get qux, no defined platform has a matching install target: diff --git a/tests/unit/test_platforms.py b/tests/unit/test_platforms.py index d90abd5226f..f3094312360 100644 --- a/tests/unit/test_platforms.py +++ b/tests/unit/test_platforms.py @@ -28,7 +28,6 @@ get_install_target_from_platform, get_install_target_to_platforms_map, generic_items_match, - map_platforms_used_for_install_targets, _validate_single_host ) from cylc.flow.exceptions import ( @@ -414,6 +413,7 @@ def test_get_install_target_from_platform(platform, expected): assert get_install_target_from_platform(platform) == expected +@pytest.mark.parametrize('quiet', [True, False]) @pytest.mark.parametrize( 'platform_names, expected_map, expected_err', [ @@ -450,14 +450,18 @@ def test_get_install_target_to_platforms_map( platform_names: List[str], expected_map: Dict[str, Any], expected_err: Type[Exception], - monkeypatch: pytest.MonkeyPatch): + quiet: bool, + monkeypatch: pytest.MonkeyPatch +): """Test that get_install_target_to_platforms_map works as expected.""" monkeypatch.setattr('cylc.flow.platforms.platform_from_name', lambda x: platform_from_name(x, PLATFORMS_TREK)) - if expected_err: + if expected_err and not quiet: with pytest.raises(expected_err): get_install_target_to_platforms_map(platform_names) + elif expected_err and quiet: + result = get_install_target_to_platforms_map(platform_names, quiet) else: result = get_install_target_to_platforms_map(platform_names) # Sort the maps: @@ -606,94 +610,3 @@ def test_get_platform_from_OrderedDictWithDefaults(mock_glbl_cfg): ]) result = get_platform(task_conf)['name'] assert result == 'skarloey' - - -@pytest.mark.parametrize( - 'platform_names, install_targets, glblcfg, expect', - [ - pytest.param( - # Two platforms share an install target. Both are reachable. - ['sir_handel', 'peter_sam'], - ['mountain_railway'], - ''' - [platforms] - [[peter_sam, sir_handel]] - install target = mountain_railway - ''', - { - 'targets': {'mountain_railway': ['sir_handel', 'peter_sam']}, - 'unreachable': set() - }, - id='basic' - ), - pytest.param( - # One of our install targets matches one of our platforms, - # but only implicitly; i.e. the platform name is the same as the - # install target name. - ['sir_handel'], - ['sir_handel'], - ''' - [platforms] - [[sir_handel]] - ''', - { - 'targets': {'sir_handel': ['sir_handel']}, - 'unreachable': set() - }, - id='implicit-target' - ), - pytest.param( - # One of our install targets matches one of our platforms, - # but only implicitly, and the platform name is defined using a - # regex. - ['sir_handel42'], - ['sir_handel42'], - ''' - [platforms] - [[sir_handel..]] - ''', - { - 'targets': {'sir_handel42': ['sir_handel42']}, - 'unreachable': set() - }, - id='implicit-target-regex' - ), - pytest.param( - # One of our install targets (rusty) has no defined platforms - # causing a PlatformLookupError. - ['duncan', 'rusty'], - ['mountain_railway', 'rusty'], - ''' - [platforms] - [[duncan]] - install target = mountain_railway - ''', - { - 'targets': {'mountain_railway': ['duncan']}, - 'unreachable': {'rusty'} - }, - id='PlatformLookupError' - ) - ] -) -def test_map_platforms_used_for_install_targets( - mock_glbl_cfg, - platform_names, install_targets, glblcfg, expect -): - def flatten_install_targets_map(itm): - result = {} - for target, platforms in itm.items(): - result[target] = [p['name'] for p in platforms] - return result - - mock_glbl_cfg('cylc.flow.platforms.glbl_cfg', glblcfg) - install_targets_map, unreachable_targets = ( - map_platforms_used_for_install_targets( - platform_names, install_targets)) - - assert ( - expect['targets'] == flatten_install_targets_map(install_targets_map)) - - assert ( - expect['unreachable'] == unreachable_targets) - diff --git a/tests/unit/test_task_remote_mgr.py b/tests/unit/test_task_remote_mgr.py index bb7b8cea085..eefb1474c26 100644 --- a/tests/unit/test_task_remote_mgr.py +++ b/tests/unit/test_task_remote_mgr.py @@ -127,3 +127,111 @@ def test_get_log_file_name(tmp_path: Path, log_name = task_remote_mgr.get_log_file_name( install_target, install_log_dir=log_dir) assert log_name == expected + + +@pytest.mark.parametrize( + 'platform_names, install_targets, glblcfg, expect', + [ + pytest.param( + # Two platforms share an install target. Both are reachable. + ['sir_handel', 'peter_sam'], + ['mountain_railway'], + ''' + [platforms] + [[peter_sam, sir_handel]] + install target = mountain_railway + ''', + { + 'targets': {'mountain_railway': ['peter_sam', 'sir_handel']}, + 'unreachable': set() + }, + id='basic' + ), + pytest.param( + # Two platforms share an install target. Both are reachable. + None, + ['mountain_railway'], + ''' + [platforms] + [[peter_sam, sir_handel]] + install target = mountain_railway + ''', + { + 'targets': {'mountain_railway': []}, + 'unreachable': 'mountain_railway' + }, + id='platform_unreachable' + ), + pytest.param( + # One of our install targets matches one of our platforms, + # but only implicitly; i.e. the platform name is the same as the + # install target name. + ['sir_handel'], + ['sir_handel'], + ''' + [platforms] + [[sir_handel]] + ''', + { + 'targets': {'sir_handel': ['sir_handel']}, + 'unreachable': set() + }, + id='implicit-target' + ), + pytest.param( + # One of our install targets matches one of our platforms, + # but only implicitly, and the platform name is defined using a + # regex. + ['sir_handel42'], + ['sir_handel42'], + ''' + [platforms] + [[sir_handel..]] + ''', + { + 'targets': {'sir_handel42': ['sir_handel42']}, + 'unreachable': set() + }, + id='implicit-target-regex' + ), + pytest.param( + # One of our install targets (rusty) has no defined platforms + # causing a PlatformLookupError. + ['duncan', 'rusty'], + ['mountain_railway', 'rusty'], + ''' + [platforms] + [[duncan]] + install target = mountain_railway + ''', + { + 'targets': {'mountain_railway': ['duncan']}, + 'unreachable': 'rusty' + }, + id='PlatformLookupError' + ) + ] +) +def test_map_platforms_used_for_install_targets( + mock_glbl_cfg, + platform_names, install_targets, glblcfg, expect, caplog +): + def flatten_install_targets_map(itm): + result = {} + for target, platforms in itm.items(): + result[target] = sorted([p['name'] for p in platforms]) + return result + + mock_glbl_cfg('cylc.flow.platforms.glbl_cfg', glblcfg) + + install_targets_map = TaskRemoteMgr._get_remote_tidy_targets( + platform_names, install_targets) + + assert ( + expect['targets'] == flatten_install_targets_map(install_targets_map)) + + if expect['unreachable']: + assert ( + expect['unreachable'] in caplog.records[0].msg) + else: + assert not caplog.records From dd2a3730dd3c8b3c9c7ef4e61b55666d766728db Mon Sep 17 00:00:00 2001 From: Tim Pillinger <26465611+wxtim@users.noreply.github.com> Date: Fri, 31 Mar 2023 16:18:14 +0100 Subject: [PATCH 04/15] Response to review --- cylc/flow/exceptions.py | 16 ++++-- cylc/flow/platforms.py | 2 +- cylc/flow/task_remote_mgr.py | 57 ++++++++++---------- tests/integration/test_task_remote_mgr.py | 64 ++++++++++++++++++----- tests/unit/test_task_remote_mgr.py | 2 +- 5 files changed, 94 insertions(+), 47 deletions(-) diff --git a/cylc/flow/exceptions.py b/cylc/flow/exceptions.py index 94d90f25066..d017193b6bd 100644 --- a/cylc/flow/exceptions.py +++ b/cylc/flow/exceptions.py @@ -436,12 +436,20 @@ def __str__(self): class NoPlatformsError(CylcError): - """None of the platforms of a given group were reachable.""" - def __init__(self, platform_group): - self.platform_group = platform_group + """None of the platforms of a given set were reachable. + + Instatiation args: + identity: The name of the platform group or install target + _type: Whether the set of platforms is a platform group or an + install target + """ + def __init__(self, identity: str, set_type: str = 'group'): + self.identity = identity + self.type = set_type def __str__(self): - return f'Unable to find a platform from group {self.platform_group}.' + return ( + f'Unable to find a platform from {self.type} {self.identity}.') class CylcVersionError(CylcError): diff --git a/cylc/flow/platforms.py b/cylc/flow/platforms.py index a1b6ee9f121..211a278072a 100644 --- a/cylc/flow/platforms.py +++ b/cylc/flow/platforms.py @@ -21,7 +21,7 @@ from copy import deepcopy from typing import ( TYPE_CHECKING, Any, Dict, Iterable, - List, Optional, Set, Tuple, Union, overload + List, Optional, Set, Union, overload ) from cylc.flow import LOG diff --git a/cylc/flow/task_remote_mgr.py b/cylc/flow/task_remote_mgr.py index a8920e6c048..6875e835560 100644 --- a/cylc/flow/task_remote_mgr.py +++ b/cylc/flow/task_remote_mgr.py @@ -27,17 +27,17 @@ from pathlib import Path from cylc.flow.option_parsers import verbosity_to_opts import os -import random from shlex import quote import re from subprocess import Popen, PIPE, DEVNULL import tarfile from time import sleep, time -from typing import Any, Deque, Dict, TYPE_CHECKING, List, NamedTuple, Tuple +from typing import ( + Any, Deque, Dict, TYPE_CHECKING, List, NamedTuple, Set, Tuple) from cylc.flow import LOG from cylc.flow.exceptions import ( - PlatformError, PlatformLookupError, NoHostsError + PlatformError, PlatformLookupError, NoHostsError, NoPlatformsError ) import cylc.flow.flags from cylc.flow.hostuserutil import is_remote_host @@ -291,7 +291,7 @@ def construct_remote_tidy_ssh_cmd( @staticmethod def _get_remote_tidy_targets( platform_names: List[str], - install_targets: List[str] + install_targets: Set[str] ) -> Dict[str, List[Dict[str, Any]]]: """Finds valid platforms for install targets, warns about in invalid install targets. @@ -317,10 +317,6 @@ def _get_remote_tidy_targets( if unreachable_targets: msg = 'No platforms available to remote tidy install targets:' for unreachable_target in unreachable_targets: - msg = ( - 'Unable to tidy the following install targets' - ' because no matching platform was found:' - ) msg += f'\n * {unreachable_target}' LOG.error(msg) @@ -347,30 +343,35 @@ def remote_tidy(self) -> None: # Issue all SSH commands in parallel queue: Deque[RemoteTidyQueueTuple] = deque() - for install_target in install_targets_map.keys(): + for install_target, platforms in install_targets_map.items(): if install_target == get_localhost_install_target(): continue - platform = random.choice(list(install_targets_map[install_target])) - try: - cmd, host = self.construct_remote_tidy_ssh_cmd(platform) - except NoHostsError as exc: - LOG.warning( - PlatformError( - f'{PlatformError.MSG_TIDY}\n{exc}', - platform['name'], + for platform in platforms: + try: + cmd, host = self.construct_remote_tidy_ssh_cmd(platform) + except NoHostsError as exc: + LOG.warning( + PlatformError( + f'{PlatformError.MSG_TIDY}\n{exc}', + platform['name'], + ) ) - ) - else: - log_platform_event('remote tidy', platform, host) - queue.append( - RemoteTidyQueueTuple( - platform, - host, - Popen( # nosec - cmd, stdout=PIPE, stderr=PIPE, stdin=DEVNULL, - text=True - ) # * command constructed by internal interface + else: + log_platform_event('remote tidy', platform, host) + queue.append( + RemoteTidyQueueTuple( + platform, + host, + Popen( # nosec + cmd, stdout=PIPE, stderr=PIPE, stdin=DEVNULL, + text=True + ) # * command constructed by internal interface + ) ) + break + else: + LOG.error( + NoPlatformsError(install_target, 'install target') ) # Wait for commands to complete for a max of 10 seconds timeout = time() + 10.0 diff --git a/tests/integration/test_task_remote_mgr.py b/tests/integration/test_task_remote_mgr.py index f9d0356eea3..ea7b78ec4f5 100644 --- a/tests/integration/test_task_remote_mgr.py +++ b/tests/integration/test_task_remote_mgr.py @@ -15,7 +15,6 @@ # along with this program. If not, see . import cylc -from cylc.flow.exceptions import NoHostsError from cylc.flow.task_remote_mgr import ( REMOTE_FILE_INSTALL_DONE, REMOTE_FILE_INSTALL_FAILED @@ -46,14 +45,21 @@ async def test_remote_tidy( - baz - Install target is set explicitly. - An install target (qux) where we cannot get a platform: Ensure that we get the desired error. + + Test that platforms with no good hosts (no host not in bad hosts). """ # Monkeypatch away subprocess.Popen calls - prevent any interaction with # remotes actually happening: class MockProc: def __init__(self, *args, **kwargs): - breakpoint() self.poll = lambda: True - self.returncode = 0 + if ( + 'baum' in args[0] + or 'bay' in args[0] + ): + self.returncode = 255 + else: + self.returncode = 0 self.communicate = lambda: ('out', 'err') monkeypatch.setattr( @@ -62,6 +68,25 @@ def __init__(self, *args, **kwargs): lambda *args, **kwargs: MockProc(*args, **kwargs) ) + # Monkeypath function to add a sort order which we don't need in the + # real code but rely to prevent the test becoming flaky: + def mock_get_install_target_platforms_map(*args, **kwargs): + """Add sort to original function to ensure test consistency""" + from cylc.flow.platforms import get_install_target_to_platforms_map + result = get_install_target_to_platforms_map(*args, **kwargs) + sorted_result = {} + for key in sorted(result): + sorted_result[key] = sorted( + result[key], key=lambda x: x['name'], reverse=True) + return sorted_result + + monkeypatch.setattr( + cylc.flow.task_remote_mgr, + 'get_install_target_to_platforms_map', + mock_get_install_target_platforms_map + ) + + # Set up global config mock_glbl_cfg( 'cylc.flow.platforms.glbl_cfg', ''' @@ -74,9 +99,14 @@ def __init__(self, *args, **kwargs): # hosts = bar1 to bar9 (implicit) [[baz]] install target = baz + # baum and bay should be uncontactable: hosts = baum, bay, baz [[[selection]]] method = definition order + [[notthisone]] + install target = baz + hosts = baum, bay + [[bay]] ''', ) @@ -85,7 +115,7 @@ def __init__(self, *args, **kwargs): schd = scheduler(id_) async with start(schd) as log: # Write database with 6 tasks using 3 platforms: - platforms = ['baz', 'bar9', 'foo'] + platforms = ['baz', 'bar9', 'foo', 'notthisone', 'bay'] line = r"('', '', {}, 0, 1, '', '', 0,'', '', '', 0, '', '{}', 4, '')" stmt = r"INSERT INTO task_jobs VALUES" + r','.join([ line.format(i, platform) for i, platform in enumerate(platforms) @@ -95,24 +125,24 @@ def __init__(self, *args, **kwargs): # Mock a remote init map. schd.task_job_mgr.task_remote_mgr.remote_init_map = { - 'baz': REMOTE_FILE_INSTALL_DONE, # Should match platform baz - 'bar9': REMOTE_FILE_INSTALL_DONE, # Should match platform bar. - 'foo': REMOTE_FILE_INSTALL_DONE, # Should match plaform foo - 'qux': REMOTE_FILE_INSTALL_DONE, # Should not match a plaform - 'quiz': REMOTE_FILE_INSTALL_FAILED, # Should not be considered + 'baz': REMOTE_FILE_INSTALL_DONE, # Should match platform baz + 'bar9': REMOTE_FILE_INSTALL_DONE, # Should match platform bar. + 'foo': REMOTE_FILE_INSTALL_DONE, # Should match plaform foo + 'qux': REMOTE_FILE_INSTALL_DONE, # Should not match a plaform + 'quiz': REMOTE_FILE_INSTALL_FAILED, # Should not be considered + 'bay': REMOTE_FILE_INSTALL_DONE, # Should return NoPlatforms } # Clear the log, run the test: log.clear() - # schd.task_job_mgr.bad_hosts.update(['baum', 'bay']) + schd.task_job_mgr.task_remote_mgr.bad_hosts.update(['baum', 'bay']) schd.task_job_mgr.task_remote_mgr.remote_tidy() + pass records = [str(r.msg) for r in log.records] # We can't get qux, no defined platform has a matching install target: - qux_msg = ( - 'Unable to tidy the following install targets because no' - ' matching platform was found:\n * qux') + qux_msg = 'No platforms available to remote tidy install targets:\n * qux' assert qux_msg in records # We can get foo bar baz, and we try to remote tidy them. @@ -124,3 +154,11 @@ def __init__(self, *args, **kwargs): # We haven't done anything with Quiz because we're only looking # at cases where platform == REMOTE_FILE_INSTALL_DONE assert not [r for r in records if 'quiz' in r] + + notthisone_msg = ( + 'platform: notthisone - clean up did not complete' + '\nUnable to find valid host for notthisone' + ) + assert notthisone_msg in records + + assert 'Unable to find a platform from install target bay.' in records diff --git a/tests/unit/test_task_remote_mgr.py b/tests/unit/test_task_remote_mgr.py index eefb1474c26..67ce4c5e709 100644 --- a/tests/unit/test_task_remote_mgr.py +++ b/tests/unit/test_task_remote_mgr.py @@ -112,7 +112,7 @@ def test_get_log_file_name(tmp_path: Path, install_target: str, load_type: Optional[str], expected: str): - task_remote_mgr = TaskRemoteMgr('some_workflow', None, None) + task_remote_mgr = TaskRemoteMgr('some_workflow', None, None, None) if load_type == 'restart': task_remote_mgr.is_restart = True elif load_type == 'reload': From 75a08f3d775f15bcbdca0400cca6db89243e2000 Mon Sep 17 00:00:00 2001 From: Tim Pillinger <26465611+wxtim@users.noreply.github.com> Date: Tue, 11 Apr 2023 12:40:19 +0100 Subject: [PATCH 05/15] fix test --- .../intelligent-host-selection/01-periodic-clear-badhosts.t | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/functional/intelligent-host-selection/01-periodic-clear-badhosts.t b/tests/functional/intelligent-host-selection/01-periodic-clear-badhosts.t index 07dc1c4b487..fef25c44fe0 100644 --- a/tests/functional/intelligent-host-selection/01-periodic-clear-badhosts.t +++ b/tests/functional/intelligent-host-selection/01-periodic-clear-badhosts.t @@ -76,6 +76,7 @@ platform: fake-platform - initialisation did not complete platform: fake-platform - remote init (on localhost) platform: fake-platform - remote file install (on localhost) platform: fake-platform - initialisation did not complete +platform: fake-platform - remote tidy (on localhost) __HERE__ purge From 488bc583226b04fb8cc38084a7244b560602c596 Mon Sep 17 00:00:00 2001 From: Tim Pillinger <26465611+wxtim@users.noreply.github.com> Date: Mon, 17 Apr 2023 11:38:04 +0100 Subject: [PATCH 06/15] response to review --- cylc/flow/exceptions.py | 13 +++++++++++-- cylc/flow/rundb.py | 8 -------- cylc/flow/task_remote_mgr.py | 6 +++--- tests/integration/test_task_remote_mgr.py | 5 ++++- tests/unit/test_platforms.py | 2 +- tests/unit/test_task_remote_mgr.py | 9 +++++---- 6 files changed, 24 insertions(+), 19 deletions(-) diff --git a/cylc/flow/exceptions.py b/cylc/flow/exceptions.py index d017193b6bd..8b43a74a342 100644 --- a/cylc/flow/exceptions.py +++ b/cylc/flow/exceptions.py @@ -442,14 +442,23 @@ class NoPlatformsError(CylcError): identity: The name of the platform group or install target _type: Whether the set of platforms is a platform group or an install target + place: Where the attempt to get the platform failed. """ - def __init__(self, identity: str, set_type: str = 'group'): + def __init__( + self, identity: str, set_type: str = 'group', place: str = '' + ): self.identity = identity self.type = set_type + if place: + self.place = f' during {place}.' + else: + self.place = '.' def __str__(self): return ( - f'Unable to find a platform from {self.type} {self.identity}.') + f'Unable to find a platform from {self.type} {self.identity}' + f'{self.place}' + ) class CylcVersionError(CylcError): diff --git a/cylc/flow/rundb.py b/cylc/flow/rundb.py index 4a931c87992..e0a18b61c8d 100644 --- a/cylc/flow/rundb.py +++ b/cylc/flow/rundb.py @@ -684,14 +684,6 @@ def select_task_job(self, cycle, name, submit_num=None): except sqlite3.DatabaseError: return None - def select_platforms_used(self): - """Get a list of platforms used by tasks.""" - stmt = r'SELECT platform_name FROM task_jobs' - try: - return {r[0] for r in self.connect().execute(stmt)} - except sqlite3.DatabaseError: - return None - def select_jobs_for_restart(self, callback): """Select from task_pool+task_states+task_jobs for restart. diff --git a/cylc/flow/task_remote_mgr.py b/cylc/flow/task_remote_mgr.py index 6875e835560..9bd31a36119 100644 --- a/cylc/flow/task_remote_mgr.py +++ b/cylc/flow/task_remote_mgr.py @@ -331,7 +331,7 @@ def remote_tidy(self) -> None: """ # Get a list of all platforms used from workflow database: platforms_used = ( - self.db_mgr.get_pri_dao().select_platforms_used()) + self.db_mgr.get_pri_dao().select_task_job_platforms()) # For each install target compile a list of platforms: install_targets = { target for target, msg @@ -371,8 +371,8 @@ def remote_tidy(self) -> None: break else: LOG.error( - NoPlatformsError(install_target, 'install target') - ) + NoPlatformsError( + install_target, 'install target', 'remote tidy')) # Wait for commands to complete for a max of 10 seconds timeout = time() + 10.0 while queue and time() < timeout: diff --git a/tests/integration/test_task_remote_mgr.py b/tests/integration/test_task_remote_mgr.py index ea7b78ec4f5..0fd462115f7 100644 --- a/tests/integration/test_task_remote_mgr.py +++ b/tests/integration/test_task_remote_mgr.py @@ -161,4 +161,7 @@ def mock_get_install_target_platforms_map(*args, **kwargs): ) assert notthisone_msg in records - assert 'Unable to find a platform from install target bay.' in records + bay_msg = ( + 'Unable to find a platform from install target' + ' bay during remote tidy.') + assert bay_msg in records diff --git a/tests/unit/test_platforms.py b/tests/unit/test_platforms.py index f3094312360..5d514db4cb7 100644 --- a/tests/unit/test_platforms.py +++ b/tests/unit/test_platforms.py @@ -461,7 +461,7 @@ def test_get_install_target_to_platforms_map( with pytest.raises(expected_err): get_install_target_to_platforms_map(platform_names) elif expected_err and quiet: - result = get_install_target_to_platforms_map(platform_names, quiet) + assert get_install_target_to_platforms_map(platform_names, quiet=quiet) else: result = get_install_target_to_platforms_map(platform_names) # Sort the maps: diff --git a/tests/unit/test_task_remote_mgr.py b/tests/unit/test_task_remote_mgr.py index 67ce4c5e709..286bcaf5155 100644 --- a/tests/unit/test_task_remote_mgr.py +++ b/tests/unit/test_task_remote_mgr.py @@ -158,7 +158,7 @@ def test_get_log_file_name(tmp_path: Path, ''', { 'targets': {'mountain_railway': []}, - 'unreachable': 'mountain_railway' + 'unreachable': {'mountain_railway'} }, id='platform_unreachable' ), @@ -206,7 +206,7 @@ def test_get_log_file_name(tmp_path: Path, ''', { 'targets': {'mountain_railway': ['duncan']}, - 'unreachable': 'rusty' + 'unreachable': {'rusty'} }, id='PlatformLookupError' ) @@ -231,7 +231,8 @@ def flatten_install_targets_map(itm): expect['targets'] == flatten_install_targets_map(install_targets_map)) if expect['unreachable']: - assert ( - expect['unreachable'] in caplog.records[0].msg) + for unreachable in expect["unreachable"]: + assert ( + unreachable in caplog.records[0].msg) else: assert not caplog.records From 35efbca3cea9391d256508308d5b745e7be8285c Mon Sep 17 00:00:00 2001 From: Tim Pillinger <26465611+wxtim@users.noreply.github.com> Date: Tue, 11 Apr 2023 15:49:39 +0100 Subject: [PATCH 07/15] Update cylc/flow/exceptions.py Co-authored-by: Oliver Sanders --- cylc/flow/exceptions.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cylc/flow/exceptions.py b/cylc/flow/exceptions.py index 8b43a74a342..abb562b6e3b 100644 --- a/cylc/flow/exceptions.py +++ b/cylc/flow/exceptions.py @@ -438,7 +438,7 @@ def __str__(self): class NoPlatformsError(CylcError): """None of the platforms of a given set were reachable. - Instatiation args: + Args: identity: The name of the platform group or install target _type: Whether the set of platforms is a platform group or an install target From 7a9da9f38a8ba98b27f5505590bf864904781378 Mon Sep 17 00:00:00 2001 From: Tim Pillinger <26465611+wxtim@users.noreply.github.com> Date: Thu, 13 Apr 2023 20:49:03 +0100 Subject: [PATCH 08/15] Apply suggestions from code review Co-authored-by: Ronnie Dutta <61982285+MetRonnie@users.noreply.github.com> --- cylc/flow/exceptions.py | 2 +- cylc/flow/task_remote_mgr.py | 6 ++---- tests/unit/test_platforms.py | 1 + tests/unit/test_rundb.py | 26 +++++++++++++------------- tests/unit/test_task_remote_mgr.py | 2 +- 5 files changed, 18 insertions(+), 19 deletions(-) diff --git a/cylc/flow/exceptions.py b/cylc/flow/exceptions.py index abb562b6e3b..9db9c375bdc 100644 --- a/cylc/flow/exceptions.py +++ b/cylc/flow/exceptions.py @@ -440,7 +440,7 @@ class NoPlatformsError(CylcError): Args: identity: The name of the platform group or install target - _type: Whether the set of platforms is a platform group or an + set_type: Whether the set of platforms is a platform group or an install target place: Where the attempt to get the platform failed. """ diff --git a/cylc/flow/task_remote_mgr.py b/cylc/flow/task_remote_mgr.py index 9bd31a36119..99f9f14f081 100644 --- a/cylc/flow/task_remote_mgr.py +++ b/cylc/flow/task_remote_mgr.py @@ -290,7 +290,7 @@ def construct_remote_tidy_ssh_cmd( @staticmethod def _get_remote_tidy_targets( - platform_names: List[str], + platform_names: Optional[List[str]], install_targets: Set[str] ) -> Dict[str, List[Dict[str, Any]]]: """Finds valid platforms for install targets, warns about in invalid @@ -311,9 +311,7 @@ def _get_remote_tidy_targets( # If we couldn't find a platform for a target, we cannot tidy it - # raise an Error: - unreachable_targets = { - t for t in install_targets if not install_targets_map.get(t) - } + unreachable_targets = install_targets.difference(install_targets_map) if unreachable_targets: msg = 'No platforms available to remote tidy install targets:' for unreachable_target in unreachable_targets: diff --git a/tests/unit/test_platforms.py b/tests/unit/test_platforms.py index 5d514db4cb7..68d08cb8d16 100644 --- a/tests/unit/test_platforms.py +++ b/tests/unit/test_platforms.py @@ -461,6 +461,7 @@ def test_get_install_target_to_platforms_map( with pytest.raises(expected_err): get_install_target_to_platforms_map(platform_names) elif expected_err and quiet: + # No error should be raised in quiet mode. assert get_install_target_to_platforms_map(platform_names, quiet=quiet) else: result = get_install_target_to_platforms_map(platform_names) diff --git a/tests/unit/test_rundb.py b/tests/unit/test_rundb.py index 3bd10cddc41..b57525ff462 100644 --- a/tests/unit/test_rundb.py +++ b/tests/unit/test_rundb.py @@ -160,16 +160,16 @@ def test_select_platforms_used(tmp_path): """ # Setup DB db_file = tmp_path / 'db' - dao = CylcWorkflowDAO(db_file_name=db_file, create_tables=True) - - # Write database with 6 tasks using 3 platforms: - platforms = ['foo', 'bar', 'bar', 'qux', 'qux', 'qux'] - line = ( - r" ('', '', {}, 0, 1, '', '', 0, '', '', '', 0, '', '{}', 4377, '')") - stmt = r"INSERT INTO task_jobs VALUES" + r','.join([ - line.format(i, platform) for i, platform in enumerate(platforms) - ]) - dao.conn.execute(stmt) - - # Assert that we only have 3 platforms reported by function: - assert dao.select_platforms_used() == {'bar', 'qux', 'foo'} + with CylcWorkflowDAO(db_file_name=db_file, create_tables=True) as dao: + + # Write database with 6 tasks using 3 platforms: + platforms = ['foo', 'bar', 'bar', 'qux', 'qux', 'qux'] + line = ( + r" ('', '', {}, 0, 1, '', '', 0, '', '', '', 0, '', '{}', 4377, '')") + stmt = r"INSERT INTO task_jobs VALUES" + r','.join([ + line.format(i, platform) for i, platform in enumerate(platforms) + ]) + dao.conn.execute(stmt) + + # Assert that we only have 3 platforms reported by function: + assert dao.select_platforms_used() == {'bar', 'qux', 'foo'} diff --git a/tests/unit/test_task_remote_mgr.py b/tests/unit/test_task_remote_mgr.py index 286bcaf5155..9aa0f3bd6d9 100644 --- a/tests/unit/test_task_remote_mgr.py +++ b/tests/unit/test_task_remote_mgr.py @@ -148,7 +148,7 @@ def test_get_log_file_name(tmp_path: Path, id='basic' ), pytest.param( - # Two platforms share an install target. Both are reachable. + # Two platforms share an install target. Both are unreachable. None, ['mountain_railway'], ''' From 7629ea652ee04faaa7c49f3527d9b554deeac36a Mon Sep 17 00:00:00 2001 From: Tim Pillinger <26465611+wxtim@users.noreply.github.com> Date: Mon, 17 Apr 2023 11:52:04 +0100 Subject: [PATCH 09/15] f --- cylc/flow/task_remote_mgr.py | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/cylc/flow/task_remote_mgr.py b/cylc/flow/task_remote_mgr.py index fed99dfc0dd..7e37509e7b6 100644 --- a/cylc/flow/task_remote_mgr.py +++ b/cylc/flow/task_remote_mgr.py @@ -34,7 +34,7 @@ from time import sleep, time from typing import ( Any, Deque, Dict, TYPE_CHECKING, List, - NamedTuple, Optional, Tuple + NamedTuple, Optional, Set, Tuple ) from cylc.flow import LOG @@ -52,8 +52,6 @@ from cylc.flow.platforms import ( HOST_REC_COMMAND, PLATFORM_REC_COMMAND, - NoHostsError, - PlatformLookupError, get_host_from_platform, get_install_target_from_platform, get_install_target_to_platforms_map, @@ -305,7 +303,7 @@ def construct_remote_tidy_ssh_cmd( @staticmethod def _get_remote_tidy_targets( - platform_names: Optional[List[str]], + platform_names: List[str], install_targets: Set[str] ) -> Dict[str, List[Dict[str, Any]]]: """Finds valid platforms for install targets, warns about in invalid @@ -319,7 +317,8 @@ def _get_remote_tidy_targets( platforms are available. """ if platform_names is None and install_targets: - install_targets_map = {t: [] for t in install_targets} + install_targets_map: Dict[str, List[Dict[str, Any]]] = { + t: [] for t in install_targets} else: install_targets_map = get_install_target_to_platforms_map( platform_names, quiet=True) From 812da58fed00252ac49e1ea69f96e6edeee9583b Mon Sep 17 00:00:00 2001 From: Tim Pillinger <26465611+wxtim@users.noreply.github.com> Date: Mon, 17 Apr 2023 13:23:53 +0100 Subject: [PATCH 10/15] fixed some logic --- cylc/flow/task_remote_mgr.py | 7 +++++-- tests/unit/test_rundb.py | 20 -------------------- tests/unit/test_task_remote_mgr.py | 4 ++-- 3 files changed, 7 insertions(+), 24 deletions(-) diff --git a/cylc/flow/task_remote_mgr.py b/cylc/flow/task_remote_mgr.py index 7e37509e7b6..d32220f5377 100644 --- a/cylc/flow/task_remote_mgr.py +++ b/cylc/flow/task_remote_mgr.py @@ -316,7 +316,7 @@ def _get_remote_tidy_targets( A mapping of install targets to valid platforms only where platforms are available. """ - if platform_names is None and install_targets: + if not platform_names and install_targets: install_targets_map: Dict[str, List[Dict[str, Any]]] = { t: [] for t in install_targets} else: @@ -325,7 +325,10 @@ def _get_remote_tidy_targets( # If we couldn't find a platform for a target, we cannot tidy it - # raise an Error: - unreachable_targets = install_targets.difference(install_targets_map) + unreachable_targets = [ + t for t, v in install_targets_map.items() + if not v + ] + list(install_targets.difference(install_targets_map)) if unreachable_targets: msg = 'No platforms available to remote tidy install targets:' for unreachable_target in unreachable_targets: diff --git a/tests/unit/test_rundb.py b/tests/unit/test_rundb.py index b57525ff462..a0754659dc6 100644 --- a/tests/unit/test_rundb.py +++ b/tests/unit/test_rundb.py @@ -153,23 +153,3 @@ def test_context_manager_exit( mock_close.assert_called_once() # Close connection for real: dao.close() - - -def test_select_platforms_used(tmp_path): - """It returns a set of platforms used by task_jobs. - """ - # Setup DB - db_file = tmp_path / 'db' - with CylcWorkflowDAO(db_file_name=db_file, create_tables=True) as dao: - - # Write database with 6 tasks using 3 platforms: - platforms = ['foo', 'bar', 'bar', 'qux', 'qux', 'qux'] - line = ( - r" ('', '', {}, 0, 1, '', '', 0, '', '', '', 0, '', '{}', 4377, '')") - stmt = r"INSERT INTO task_jobs VALUES" + r','.join([ - line.format(i, platform) for i, platform in enumerate(platforms) - ]) - dao.conn.execute(stmt) - - # Assert that we only have 3 platforms reported by function: - assert dao.select_platforms_used() == {'bar', 'qux', 'foo'} diff --git a/tests/unit/test_task_remote_mgr.py b/tests/unit/test_task_remote_mgr.py index 9aa0f3bd6d9..539349da3fe 100644 --- a/tests/unit/test_task_remote_mgr.py +++ b/tests/unit/test_task_remote_mgr.py @@ -149,7 +149,7 @@ def test_get_log_file_name(tmp_path: Path, ), pytest.param( # Two platforms share an install target. Both are unreachable. - None, + set(), ['mountain_railway'], ''' [platforms] @@ -225,7 +225,7 @@ def flatten_install_targets_map(itm): mock_glbl_cfg('cylc.flow.platforms.glbl_cfg', glblcfg) install_targets_map = TaskRemoteMgr._get_remote_tidy_targets( - platform_names, install_targets) + set(platform_names), set(install_targets)) assert ( expect['targets'] == flatten_install_targets_map(install_targets_map)) From 94a61da65e847c9215b01ec41fd0dc5f4a9c8f0b Mon Sep 17 00:00:00 2001 From: Tim Pillinger <26465611+wxtim@users.noreply.github.com> Date: Tue, 18 Apr 2023 09:55:15 +0100 Subject: [PATCH 11/15] review response --- cylc/flow/task_remote_mgr.py | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/cylc/flow/task_remote_mgr.py b/cylc/flow/task_remote_mgr.py index d32220f5377..56a51fe8677 100644 --- a/cylc/flow/task_remote_mgr.py +++ b/cylc/flow/task_remote_mgr.py @@ -316,19 +316,18 @@ def _get_remote_tidy_targets( A mapping of install targets to valid platforms only where platforms are available. """ - if not platform_names and install_targets: + if install_targets and not platform_names: install_targets_map: Dict[str, List[Dict[str, Any]]] = { t: [] for t in install_targets} + unreachable_targets = install_targets else: install_targets_map = get_install_target_to_platforms_map( platform_names, quiet=True) + # If we couldn't find a platform for a target, we cannot tidy it - + # raise an Error: + unreachable_targets = install_targets.difference( + install_targets_map) - # If we couldn't find a platform for a target, we cannot tidy it - - # raise an Error: - unreachable_targets = [ - t for t, v in install_targets_map.items() - if not v - ] + list(install_targets.difference(install_targets_map)) if unreachable_targets: msg = 'No platforms available to remote tidy install targets:' for unreachable_target in unreachable_targets: From ec9a53126eb4232b67cebbc4843e8a203f32f419 Mon Sep 17 00:00:00 2001 From: Tim Pillinger <26465611+wxtim@users.noreply.github.com> Date: Wed, 19 Apr 2023 13:25:34 +0100 Subject: [PATCH 12/15] Fix flake 8 failure --- cylc/flow/exceptions.py | 1 - 1 file changed, 1 deletion(-) diff --git a/cylc/flow/exceptions.py b/cylc/flow/exceptions.py index f3b4f83547f..9db9c375bdc 100644 --- a/cylc/flow/exceptions.py +++ b/cylc/flow/exceptions.py @@ -454,7 +454,6 @@ def __init__( else: self.place = '.' - def __str__(self): return ( f'Unable to find a platform from {self.type} {self.identity}' From 7e674d005f025565ce234b5b98ec90bdd90c6626 Mon Sep 17 00:00:00 2001 From: Tim Pillinger <26465611+wxtim@users.noreply.github.com> Date: Thu, 20 Apr 2023 14:35:21 +0100 Subject: [PATCH 13/15] fixup test logic --- cylc/flow/task_job_mgr.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cylc/flow/task_job_mgr.py b/cylc/flow/task_job_mgr.py index 358f3586847..a87ed2d4d15 100644 --- a/cylc/flow/task_job_mgr.py +++ b/cylc/flow/task_job_mgr.py @@ -308,7 +308,7 @@ def submit_task_jobs(self, workflow, itasks, curve_auth, itask.tdef.rtconfig['platform'], bad_hosts=self.bad_hosts ) - except PlatformLookupError: + except (PlatformLookupError, NoPlatformsError): pass else: # If were able to select a new platform; From 4a86033d61e370a276efeab2b3a11a2671f3600b Mon Sep 17 00:00:00 2001 From: Tim Pillinger <26465611+wxtim@users.noreply.github.com> Date: Thu, 20 Apr 2023 16:00:19 +0100 Subject: [PATCH 14/15] Update cylc/flow/exceptions.py Co-authored-by: Ronnie Dutta <61982285+MetRonnie@users.noreply.github.com> --- cylc/flow/exceptions.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cylc/flow/exceptions.py b/cylc/flow/exceptions.py index 9db9c375bdc..dd59fb9eec8 100644 --- a/cylc/flow/exceptions.py +++ b/cylc/flow/exceptions.py @@ -435,7 +435,7 @@ def __str__(self): return f'Unable to find valid host for {self.platform_name}' -class NoPlatformsError(CylcError): +class NoPlatformsError(PlatformLookupError): """None of the platforms of a given set were reachable. Args: From cda7a57ef7db2eee420b7c0f3db9f6ced23d8a7e Mon Sep 17 00:00:00 2001 From: Tim Pillinger <26465611+wxtim@users.noreply.github.com> Date: Thu, 20 Apr 2023 16:00:45 +0100 Subject: [PATCH 15/15] Update cylc/flow/task_job_mgr.py Co-authored-by: Ronnie Dutta <61982285+MetRonnie@users.noreply.github.com> --- cylc/flow/task_job_mgr.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cylc/flow/task_job_mgr.py b/cylc/flow/task_job_mgr.py index a87ed2d4d15..358f3586847 100644 --- a/cylc/flow/task_job_mgr.py +++ b/cylc/flow/task_job_mgr.py @@ -308,7 +308,7 @@ def submit_task_jobs(self, workflow, itasks, curve_auth, itask.tdef.rtconfig['platform'], bad_hosts=self.bad_hosts ) - except (PlatformLookupError, NoPlatformsError): + except PlatformLookupError: pass else: # If were able to select a new platform;