Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix.platform is regex remote tidy fail #5445

Merged
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ ones in. -->

### Fixes

[5445](https://github.com/cylc/cylc-flow/pull/5445) - Fix remote tidy
bug where install target is not explicit in platform definition.

[5398](https://github.com/cylc/cylc-flow/pull/5398) - Fix platform from
group selection order bug.

Expand Down
25 changes: 21 additions & 4 deletions cylc/flow/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -436,12 +436,29 @@ def __str__(self):


class NoPlatformsError(CylcError):
wxtim marked this conversation as resolved.
Show resolved Hide resolved
"""None of the platforms of a given group were reachable."""
def __init__(self, platform_group):
self.platform_group = platform_group
"""None of the platforms of a given set were reachable.

Args:
identity: The name of the platform group or install target
set_type: Whether the set of platforms is a platform group or an
install target
place: Where the attempt to get the platform failed.
"""
def __init__(
self, identity: str, set_type: str = 'group', place: str = ''
):
self.identity = identity
self.type = set_type
if place:
self.place = f' during {place}.'
else:
self.place = '.'

def __str__(self):
return f'Unable to find a platform from group {self.platform_group}.'
return (
f'Unable to find a platform from {self.type} {self.identity}'
f'{self.place}'
)


class CylcVersionError(CylcError):
Expand Down
46 changes: 15 additions & 31 deletions cylc/flow/platforms.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@
import re
from copy import deepcopy
from typing import (
TYPE_CHECKING, Any, Dict, Iterable, List, Optional, Set, Union, overload
TYPE_CHECKING, Any, Dict, Iterable,
List, Optional, Set, Union, overload
)

from cylc.flow import LOG
Expand Down Expand Up @@ -612,18 +613,29 @@ def get_install_target_from_platform(platform: Dict[str, Any]) -> str:


def get_install_target_to_platforms_map(
platform_names: Iterable[str]
platform_names: Iterable[str],
quiet: bool = False
) -> Dict[str, List[Dict[str, Any]]]:
"""Get a dictionary of unique install targets and the platforms which use
them.

Args:
platform_names: List of platform names to look up in the global config.
quiet: Supress PlatformNotFound Errors

Return {install_target_1: [platform_1_dict, platform_2_dict, ...], ...}
"""
platform_names = set(platform_names)
platforms = [platform_from_name(p_name) for p_name in platform_names]
platforms: List[Dict[str, Any]] = []
for p_name in platform_names:
try:
platform = platform_from_name(p_name)
except PlatformLookupError as exc:
if not quiet:
raise exc
else:
platforms.append(platform)

install_targets = {
get_install_target_from_platform(platform)
for platform in platforms
Expand All @@ -649,34 +661,6 @@ def is_platform_with_target_in_list(
)


def get_all_platforms_for_install_target(
install_target: str
) -> List[Dict[str, Any]]:
"""Return list of platform dictionaries for given install target."""
platforms: List[Dict[str, Any]] = []
all_platforms = glbl_cfg(cached=True).get(['platforms'], sparse=False)
for k, v in all_platforms.iteritems(): # noqa: B301 (iteritems valid here)
if (v.get('install target', k) == install_target):
v_copy = deepcopy(v)
v_copy['name'] = k
platforms.append(v_copy)
return platforms


def get_random_platform_for_install_target(
install_target: str
) -> Dict[str, Any]:
"""Return a randomly selected platform (dict) for given install target."""
platforms = get_all_platforms_for_install_target(install_target)
try:
return random.choice(platforms) # nosec (not crypto related)
except IndexError:
# No platforms to choose from
raise PlatformLookupError(
f'Could not select platform for install target: {install_target}'
)


def get_localhost_install_target() -> str:
"""Returns the install target of localhost platform"""
localhost = get_platform()
Expand Down
2 changes: 1 addition & 1 deletion cylc/flow/task_job_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ def __init__(self, workflow, proc_pool, workflow_db_mgr,
self.bad_hosts = bad_hosts
self.bad_hosts_to_clear = set()
self.task_remote_mgr = TaskRemoteMgr(
workflow, proc_pool, self.bad_hosts)
workflow, proc_pool, self.bad_hosts, self.workflow_db_mgr)

def check_task_jobs(self, workflow, task_pool):
"""Check submission and execution timeout and polling timers.
Expand Down
111 changes: 80 additions & 31 deletions cylc/flow/task_remote_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,13 @@
from time import sleep, time
from typing import (
Any, Deque, Dict, TYPE_CHECKING, List,
NamedTuple, Optional, Tuple
NamedTuple, Optional, Set, Tuple
)

from cylc.flow import LOG
from cylc.flow.exceptions import PlatformError
from cylc.flow.exceptions import (
PlatformError, PlatformLookupError, NoHostsError, NoPlatformsError
)
import cylc.flow.flags
from cylc.flow.network.client_factory import CommsMeth
from cylc.flow.pathutil import (
Expand All @@ -50,12 +52,10 @@
from cylc.flow.platforms import (
HOST_REC_COMMAND,
PLATFORM_REC_COMMAND,
NoHostsError,
PlatformLookupError,
get_host_from_platform,
get_install_target_from_platform,
get_install_target_to_platforms_map,
get_localhost_install_target,
get_random_platform_for_install_target,
log_platform_event,
)
from cylc.flow.remote import construct_rsync_over_ssh_cmd, construct_ssh_cmd
Expand Down Expand Up @@ -96,7 +96,7 @@ class RemoteTidyQueueTuple(NamedTuple):
class TaskRemoteMgr:
"""Manage task remote initialisation, tidy, selection."""

def __init__(self, workflow, proc_pool, bad_hosts):
def __init__(self, workflow, proc_pool, bad_hosts, db_mgr):
self.workflow = workflow
self.proc_pool = proc_pool
# self.remote_command_map = {command: host|PlatformError|None}
Expand All @@ -110,6 +110,7 @@ def __init__(self, workflow, proc_pool, bad_hosts):
self.bad_hosts = bad_hosts
self.is_reload = False
self.is_restart = False
self.db_mgr = db_mgr

def _subshell_eval(
self, eval_str: str, command_pattern: re.Pattern
Expand Down Expand Up @@ -300,44 +301,92 @@ def construct_remote_tidy_ssh_cmd(
cmd = construct_ssh_cmd(cmd, platform, host, timeout='10s')
return cmd, host

@staticmethod
def _get_remote_tidy_targets(
platform_names: List[str],
wxtim marked this conversation as resolved.
Show resolved Hide resolved
install_targets: Set[str]
) -> Dict[str, List[Dict[str, Any]]]:
"""Finds valid platforms for install targets, warns about in invalid
install targets.

logs:
A list of install targets where no platform can be found.

returns:
A mapping of install targets to valid platforms only where
platforms are available.
"""
if install_targets and not platform_names:
install_targets_map: Dict[str, List[Dict[str, Any]]] = {
t: [] for t in install_targets}
unreachable_targets = install_targets
else:
install_targets_map = get_install_target_to_platforms_map(
platform_names, quiet=True)
# If we couldn't find a platform for a target, we cannot tidy it -
# raise an Error:
unreachable_targets = install_targets.difference(
install_targets_map)

if unreachable_targets:
msg = 'No platforms available to remote tidy install targets:'
for unreachable_target in unreachable_targets:
msg += f'\n * {unreachable_target}'
LOG.error(msg)

return install_targets_map

def remote_tidy(self) -> None:
"""Remove workflow contact files and keys from initialised remotes.

Call "cylc remote-tidy".
This method is called on workflow shutdown, so we want nothing to hang.
Timeout any incomplete commands after 10 seconds.
"""
# Get a list of all platforms used from workflow database:
platforms_used = (
self.db_mgr.get_pri_dao().select_task_job_platforms())
Comment on lines +347 to +348
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
platforms_used = (
self.db_mgr.get_pri_dao().select_task_job_platforms())
with self.db_mgr.get_pri_dao() as dao:
platforms_used = dao.select_task_job_platforms())

# For each install target compile a list of platforms:
install_targets = {
target for target, msg
in self.remote_init_map.items()
if msg == REMOTE_FILE_INSTALL_DONE
}
install_targets_map = self._get_remote_tidy_targets(
platforms_used, install_targets)

# Issue all SSH commands in parallel
queue: Deque[RemoteTidyQueueTuple] = deque()
for install_target, message in self.remote_init_map.items():
if message != REMOTE_FILE_INSTALL_DONE:
continue
for install_target, platforms in install_targets_map.items():
if install_target == get_localhost_install_target():
continue
try:
platform = get_random_platform_for_install_target(
install_target
)
cmd, host = self.construct_remote_tidy_ssh_cmd(platform)
except (NoHostsError, PlatformLookupError) as exc:
LOG.warning(
PlatformError(
f'{PlatformError.MSG_TIDY}\n{exc}',
platform['name'],
for platform in platforms:
try:
cmd, host = self.construct_remote_tidy_ssh_cmd(platform)
except NoHostsError as exc:
LOG.warning(
PlatformError(
f'{PlatformError.MSG_TIDY}\n{exc}',
platform['name'],
)
)
)
else:
log_platform_event('remote tidy', platform, host)
queue.append(
RemoteTidyQueueTuple(
platform,
host,
Popen( # nosec
cmd, stdout=PIPE, stderr=PIPE, stdin=DEVNULL,
text=True
) # * command constructed by internal interface
else:
log_platform_event('remote tidy', platform, host)
queue.append(
RemoteTidyQueueTuple(
platform,
host,
Popen( # nosec
cmd, stdout=PIPE, stderr=PIPE, stdin=DEVNULL,
text=True
) # * command constructed by internal interface
)
)
)
break
else:
LOG.error(
NoPlatformsError(
install_target, 'install target', 'remote tidy'))
# Wait for commands to complete for a max of 10 seconds
timeout = time() + 10.0
while queue and time() < timeout:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ platform: fake-platform - initialisation did not complete
platform: fake-platform - remote init (on localhost)
platform: fake-platform - remote file install (on localhost)
platform: fake-platform - initialisation did not complete
platform: fake-platform - remote tidy (on localhost)
__HERE__

purge
Expand Down
Loading