Skip to content

Commit

Permalink
Create a mapping between platforms and install_targets for use by rem…
Browse files Browse the repository at this point in the history
…ote tidy

Remove broken unit test

tests for map_platforms_used_for_install_targets

unit test select_platforms_used

Test remote_tidy

Changelog
  • Loading branch information
wxtim committed Mar 31, 2023
1 parent 65683fa commit adfa520
Show file tree
Hide file tree
Showing 8 changed files with 308 additions and 79 deletions.
3 changes: 3 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ ones in. -->

### Fixes

[5445](https://github.com/cylc/cylc-flow/pull/5445) - Fix remote tidy
bug where install target is not explicit in platform definition.

[5398](https://github.com/cylc/cylc-flow/pull/5398) - Fix platform from
group selection order bug.

Expand Down
72 changes: 47 additions & 25 deletions cylc/flow/platforms.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@
import re
from copy import deepcopy
from typing import (
TYPE_CHECKING, Any, Dict, Iterable, List, Optional, Set, Union, overload
TYPE_CHECKING, Any, Dict, Iterable,
List, Optional, Set, Tuple, Union, overload
)

from cylc.flow import LOG
Expand Down Expand Up @@ -649,32 +650,53 @@ def is_platform_with_target_in_list(
)


def get_all_platforms_for_install_target(
install_target: str
) -> List[Dict[str, Any]]:
"""Return list of platform dictionaries for given install target."""
platforms: List[Dict[str, Any]] = []
all_platforms = glbl_cfg(cached=True).get(['platforms'], sparse=False)
for k, v in all_platforms.iteritems(): # noqa: B301 (iteritems valid here)
if (v.get('install target', k) == install_target):
v_copy = deepcopy(v)
v_copy['name'] = k
platforms.append(v_copy)
return platforms
def map_platforms_used_for_install_targets(
platform_names: Set[str],
install_targets: Set[str]
) -> Tuple[Dict[str, List[Dict[Any, Any]]], Set[str]]:
"""Get a mapping of install targets to platforms.
Returns:
install_target_map: {
'install target': [
{...platform2...},
{...platform2...}
]
}
unreachable_targets:
A list of install_targets which we cannot get platforms for.
def get_random_platform_for_install_target(
install_target: str
) -> Dict[str, Any]:
"""Return a randomly selected platform (dict) for given install target."""
platforms = get_all_platforms_for_install_target(install_target)
try:
return random.choice(platforms) # nosec (not crypto related)
except IndexError:
# No platforms to choose from
raise PlatformLookupError(
f'Could not select platform for install target: {install_target}'
)
Raises:
PlatformLookupError/NoPlatformsError: If something goes wrong in
`get_platform`.
"""
install_targets_map: Dict[str, List] = {
target: [] for target in install_targets}
for platform_name in platform_names:
try:
platform = get_platform(platform_name)
except PlatformLookupError:
# We only need one working platform per install target:
continue
else:
# Install Target
if 'install target' in platform and platform['install target']:
install_targets_map[platform['install target']].append(
platform)
else:
install_targets_map[platform['name']].append(platform)

# Look for unreachable install targets and report them:
unreachable_targets = {
target for target, platforms
in install_targets_map.items()
if not platforms
}
# Remove unreachable targets from list:
[install_targets_map.pop(i) for i in unreachable_targets]
# Otherwise return out targets map object.

return install_targets_map, unreachable_targets


def get_localhost_install_target() -> str:
Expand Down
8 changes: 8 additions & 0 deletions cylc/flow/rundb.py
Original file line number Diff line number Diff line change
Expand Up @@ -684,6 +684,14 @@ def select_task_job(self, cycle, name, submit_num=None):
except sqlite3.DatabaseError:
return None

def select_platforms_used(self):
"""Get a list of platforms used by tasks."""
stmt = r'SELECT platform_name FROM task_jobs'
try:
return {r[0] for r in self.connect().execute(stmt)}
except sqlite3.DatabaseError:
return None

def select_jobs_for_restart(self, callback):
"""Select from task_pool+task_states+task_jobs for restart.
Expand Down
2 changes: 1 addition & 1 deletion cylc/flow/task_job_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ def __init__(self, workflow, proc_pool, workflow_db_mgr,
self.bad_hosts = bad_hosts
self.bad_hosts_to_clear = set()
self.task_remote_mgr = TaskRemoteMgr(
workflow, proc_pool, self.bad_hosts)
workflow, proc_pool, self.bad_hosts, self.workflow_db_mgr)

def check_task_jobs(self, workflow, task_pool):
"""Check submission and execution timeout and polling timers.
Expand Down
42 changes: 32 additions & 10 deletions cylc/flow/task_remote_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
from pathlib import Path
from cylc.flow.option_parsers import verbosity_to_opts
import os
import random
from shlex import quote
import re
from subprocess import Popen, PIPE, DEVNULL
Expand All @@ -35,7 +36,7 @@
from typing import Any, Deque, Dict, TYPE_CHECKING, List, NamedTuple, Tuple

from cylc.flow import LOG
from cylc.flow.exceptions import PlatformError
from cylc.flow.exceptions import PlatformError, PlatformLookupError
import cylc.flow.flags
from cylc.flow.hostuserutil import is_remote_host
from cylc.flow.network.client_factory import CommsMeth
Expand All @@ -51,8 +52,8 @@
get_host_from_platform,
get_install_target_from_platform,
get_localhost_install_target,
get_random_platform_for_install_target,
log_platform_event,
map_platforms_used_for_install_targets
)
from cylc.flow.remote import construct_rsync_over_ssh_cmd, construct_ssh_cmd
from cylc.flow.subprocctx import SubProcContext
Expand Down Expand Up @@ -91,7 +92,7 @@ class RemoteTidyQueueTuple(NamedTuple):
class TaskRemoteMgr:
"""Manage task remote initialisation, tidy, selection."""

def __init__(self, workflow, proc_pool, bad_hosts):
def __init__(self, workflow, proc_pool, bad_hosts, db_mgr):
self.workflow = workflow
self.proc_pool = proc_pool
# self.remote_command_map = {command: host|PlatformError|None}
Expand All @@ -105,6 +106,7 @@ def __init__(self, workflow, proc_pool, bad_hosts):
self.bad_hosts = bad_hosts
self.is_reload = False
self.is_restart = False
self.db_mgr = db_mgr

def subshell_eval(self, command, command_pattern, host_check=True):
"""Evaluate a task platform from a subshell string.
Expand Down Expand Up @@ -293,19 +295,39 @@ def remote_tidy(self) -> None:
This method is called on workflow shutdown, so we want nothing to hang.
Timeout any incomplete commands after 10 seconds.
"""
# Get a list of all platforms used from workflow database:
platforms_used = (
self.db_mgr.get_pri_dao().select_platforms_used())
# For each install target compile a list of platforms:
install_targets = {
target for target, msg
in self.remote_init_map.items()
if msg == REMOTE_FILE_INSTALL_DONE
}
install_targets_map, unreachable_targets = (
map_platforms_used_for_install_targets(
platforms_used, install_targets))
# If we couldn't find a platform for a target, we cannot tidy it -
# raise an Error:
if unreachable_targets:
msg = 'No platforms available to remote tidy install targets:'
for unreachable_target in unreachable_targets:
msg = (
'Unable to tidy the following install targets'
' because no matching platform was found:'
)
msg += f'\n * {unreachable_target}'
LOG.error(msg)

# Issue all SSH commands in parallel
queue: Deque[RemoteTidyQueueTuple] = deque()
for install_target, message in self.remote_init_map.items():
if message != REMOTE_FILE_INSTALL_DONE:
continue
for install_target in install_targets_map.keys():
if install_target == get_localhost_install_target():
continue
platform = random.choice(list(install_targets_map[install_target]))
try:
platform = get_random_platform_for_install_target(
install_target
)
cmd, host = self.construct_remote_tidy_ssh_cmd(platform)
except (NoHostsError, PlatformLookupError) as exc:
except NoHostsError as exc:
LOG.warning(
PlatformError(
f'{PlatformError.MSG_TIDY}\n{exc}',
Expand Down
104 changes: 104 additions & 0 deletions tests/integration/test_task_remote_mgr.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE.
# Copyright (C) NIWA & British Crown (Met Office) & Contributors.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

from cylc.flow.task_remote_mgr import (
REMOTE_FILE_INSTALL_DONE,
REMOTE_FILE_INSTALL_FAILED
)


async def test_remote_tidy(
flow,
scheduler,
start,
mock_glbl_cfg,
one_conf
):
"""Remote tidy gets platforms for install targets.
In particular, referencing https://github.com/cylc/cylc-flow/issues/5429,
ensure that install targets defined implicitly by platform name are found.
Mock remote init map:
- Include an install target (quiz) with
message != REMOTE_FILE_INSTALL_DONE to ensure that
this is picked out.
- Install targets where we can get a platform
- foo - Install target is implicitly the platfrom name.
- bar9 - The install target is implicitly the plaform name,
and the platform name matches a platform regex.
- baz - Install target is set explicitly.
- An install target (qux) where we cannot get a platform: Ensure
that we get the desired error.
"""
mock_glbl_cfg(
'cylc.flow.platforms.glbl_cfg',
'''
[platforms]
[[foo]]
# install target = foo (implicit)
# hosts = foo (implicit)
[[bar.]]
# install target = bar1 to bar9 (implicit)
# hosts = bar1 to bar9 (implicit)
[[baz]]
install target = baz
hosts = baz
''',
)

# Get a scheduler:
id_ = flow(one_conf)
schd = scheduler(id_)
async with start(schd) as log:
# Write database with 6 tasks using 3 platforms:
platforms = ['baz', 'bar9', 'foo']
line = r"('', '', {}, 0, 1, '', '', 0,'', '', '', 0, '', '{}', 4, '')"
stmt = r"INSERT INTO task_jobs VALUES" + r','.join([
line.format(i, platform) for i, platform in enumerate(platforms)
])
schd.workflow_db_mgr.pri_dao.connect().execute(stmt)
schd.workflow_db_mgr.pri_dao.connect().commit()

# Mock a remote init map.
schd.task_job_mgr.task_remote_mgr.remote_init_map = {
'baz': REMOTE_FILE_INSTALL_DONE, # Should match platform baz
'bar9': REMOTE_FILE_INSTALL_DONE, # Should match platform bar.
'foo': REMOTE_FILE_INSTALL_DONE, # Should match plaform foo
'qux': REMOTE_FILE_INSTALL_DONE, # Should not match a plaform
'quiz': REMOTE_FILE_INSTALL_FAILED, # Should not be considered
}

# Clear the log, run the test:
log.clear()
schd.task_job_mgr.task_remote_mgr.remote_tidy()
records = [str(r.msg) for r in log.records]

# We can't get qux, no defined platform has a matching install target:
qux_msg = (
'Unable to tidy the following install targets because no'
' matching platform was found:\n * qux')
assert qux_msg in records

# We can get foo bar baz, and we try to remote tidy them.
# (This will ultimately fail, but past the point we are testing).
for target in ['foo', 'bar9', 'baz']:
msg = f'platform: {target} - remote tidy (on {target})'
assert msg in records

# We haven't done anything with Quiz because we're only looking
# at cases where platform == REMOTE_FILE_INSTALL_DONE
assert not [r for r in records if 'quiz' in r]
Loading

0 comments on commit adfa520

Please sign in to comment.