diff --git a/CHANGES.md b/CHANGES.md index 8e4e45dad80..cef8ca5967e 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -148,6 +148,10 @@ improvements. using the same name for a platform and a platform group. Which one it should pick is ambiguous, and is a setup error. +[#4465](https://github.com/cylc/cylc-flow/pull/4465) - +Fix a `ValueError` that could occasionally occur during remote tidy on +workflow shutdown. + ------------------------------------------------------------------------------- ## __cylc-8.0b2 (Released 2021-07-28)__ diff --git a/cylc/flow/exceptions.py b/cylc/flow/exceptions.py index b33784b7ec1..fb2d7a5f8da 100644 --- a/cylc/flow/exceptions.py +++ b/cylc/flow/exceptions.py @@ -152,7 +152,7 @@ def __init__( if isinstance(cmd, list): self.cmd = " ".join(cmd) - def __str__(self): + def __str__(self) -> str: ret = (f"{self.platform_n}: {self.msg}:\n" f"COMMAND FAILED ({self.ret_code}): {self.cmd}\n") for label, item in ('STDOUT', self.out), ('STDERR', self.err): diff --git a/cylc/flow/task_remote_mgr.py b/cylc/flow/task_remote_mgr.py index 708b700e3f5..05512821498 100644 --- a/cylc/flow/task_remote_mgr.py +++ b/cylc/flow/task_remote_mgr.py @@ -22,6 +22,7 @@ - Implement basic host select functionality. """ +from collections import deque from contextlib import suppress from cylc.flow.option_parsers import verbosity_to_opts import os @@ -29,8 +30,8 @@ import re from subprocess import Popen, PIPE, DEVNULL import tarfile -from time import time -from typing import Any, Dict, TYPE_CHECKING +from time import sleep, time +from typing import Any, Deque, Dict, TYPE_CHECKING, List, NamedTuple, Tuple from cylc.flow import LOG, RSYNC_LOG from cylc.flow.exceptions import TaskRemoteMgmtError @@ -52,9 +53,11 @@ get_contact_file) from cylc.flow.platforms import ( get_host_from_platform, + get_install_target_from_platform, get_localhost_install_target, get_random_platform_for_install_target, - NoHostsError + NoHostsError, + PlatformLookupError ) if TYPE_CHECKING: @@ -71,6 +74,12 @@ REMOTE_FILE_INSTALL_255 = 'REMOTE FILE INSTALL 255' +class RemoteTidyQueueTuple(NamedTuple): + platform: Dict[str, Any] + host: str + proc: 'Popen[str]' + + class TaskRemoteMgr: """Manage task job remote initialisation, tidy, selection.""" @@ -245,114 +254,109 @@ def remote_init( callback_255_args=[platform] ) - def remote_tidy(self): + def remote_tidy(self) -> None: """Remove workflow contact files and keys from initialised remotes. Call "cylc remote-tidy". This method is called on workflow shutdown, so we want nothing to hang. Timeout any incomplete commands after 10 seconds. """ - from cylc.flow.platforms import PlatformLookupError # Issue all SSH commands in parallel - def construct_remote_tidy_ssh_cmd(install_target, platform): + def construct_remote_tidy_ssh_cmd( + platform: Dict[str, Any] + ) -> Tuple[List[str], str]: cmd = ['remote-tidy'] cmd.extend(verbosity_to_opts(cylc.flow.flags.verbosity)) - cmd.append(install_target) + cmd.append(get_install_target_from_platform(platform)) cmd.append(get_remote_workflow_run_dir(self.workflow)) host = get_host_from_platform( platform, bad_hosts=self.bad_hosts ) - cmd = construct_ssh_cmd( - cmd, platform, host, timeout='10s' - ) + cmd = construct_ssh_cmd(cmd, platform, host, timeout='10s') return cmd, host - procs = {} + queue: Deque[RemoteTidyQueueTuple] = deque() for install_target, message in self.remote_init_map.items(): if message != REMOTE_FILE_INSTALL_DONE: continue if install_target == get_localhost_install_target(): continue - platform = get_random_platform_for_install_target(install_target) - platform_n = platform['name'] try: - cmd, host = construct_remote_tidy_ssh_cmd( - install_target, platform) - except (NoHostsError, PlatformLookupError): - LOG.warning(TaskRemoteMgmtError( - TaskRemoteMgmtError.MSG_TIDY, - platform_n, 1, '', '', 'remote tidy' - )) + platform = get_random_platform_for_install_target( + install_target + ) + cmd, host = construct_remote_tidy_ssh_cmd(platform) + except (NoHostsError, PlatformLookupError) as exc: + LOG.warning(f"{exc}; {TaskRemoteMgmtError.MSG_TIDY}") else: LOG.debug( "Removing authentication keys and contact file " f"from remote: \"{install_target}\"") - procs[platform_n] = ( - cmd, - host, - Popen( # nosec - cmd, - stdout=PIPE, - stderr=PIPE, - stdin=DEVNULL, + queue.append( + RemoteTidyQueueTuple( + platform, + host, + Popen( # nosec + cmd, stdout=PIPE, stderr=PIPE, stdin=DEVNULL, + text=True + ) # * command constructed by internal interface ) - # * command constructed by internal interface ) # Wait for commands to complete for a max of 10 seconds timeout = time() + 10.0 - while procs and time() < timeout: - for platform_n, (cmd, host, proc) in procs.copy().items(): - if proc.poll() is None: - continue - del procs[platform_n] - out, err = (f.decode() for f in proc.communicate()) - # 255 error has to be handled here becuase remote tidy doesn't - # use SubProcPool. - if proc.returncode == 255: - timeout = time() + 10.0 - self.bad_hosts.add(host) - LOG.warning( - f'Tried to tidy remote platform: \'{platform_n}\' ' - f'using host \'{host}\' but failed; ' - 'trying a different host' + while queue and time() < timeout: + item = queue.popleft() + if item.proc.poll() is None: # proc still running + queue.append(item) + continue + out, err = item.proc.communicate() + # 255 error has to be handled here because remote tidy doesn't + # use SubProcPool. + if item.proc.returncode == 255: + timeout = time() + 10.0 + self.bad_hosts.add(item.host) + try: + retry_cmd, retry_host = construct_remote_tidy_ssh_cmd( + item.platform ) - try: - retry_cmd, host = construct_remote_tidy_ssh_cmd( - install_target, platform - ) - except (NoHostsError, PlatformLookupError): - LOG.warning(TaskRemoteMgmtError( - TaskRemoteMgmtError.MSG_TIDY, platform_n, '', - '', '', '' - )) - else: - procs[platform_n] = ( - retry_cmd, - host, - Popen( # nosec - retry_cmd, - stdout=PIPE, - stderr=PIPE, - stdin=DEVNULL, - ) - # * command constructed by internal interface + except (NoHostsError, PlatformLookupError) as exc: + LOG.warning(f"{exc}; {TaskRemoteMgmtError.MSG_TIDY}") + else: + LOG.debug( + "Failed to tidy remote platform " + f"'{item.platform['name']}' using host '{item.host}'; " + f"trying new host '{retry_host}'" + ) + queue.append( + item._replace( + host=retry_host, + proc=Popen( # nosec + retry_cmd, stdout=PIPE, stderr=PIPE, + stdin=DEVNULL, text=True + ) # * command constructed by internal interface ) - if proc.wait() and proc.returncode != 255: - LOG.warning(TaskRemoteMgmtError( - TaskRemoteMgmtError.MSG_TIDY, - platform_n, ' '.join(quote(item) for item in cmd), - proc.returncode, out, err)) + ) + elif item.proc.returncode: + LOG.warning( + TaskRemoteMgmtError( + TaskRemoteMgmtError.MSG_TIDY, item.platform['name'], + item.proc.args, item.proc.returncode, out, err + ) + ) + sleep(0.1) # Terminate any remaining commands - for platform_n, (cmd, proc) in procs.items(): + for item in queue: with suppress(OSError): - proc.terminate() - out, err = (f.decode() for f in proc.communicate()) - if proc.wait(): - LOG.warning(TaskRemoteMgmtError( - TaskRemoteMgmtError.MSG_TIDY, - platform_n, ' '.join(quote(item) for item in cmd), - proc.returncode, out, err)) + item.proc.terminate() + out, err = item.proc.communicate() + if item.proc.wait(): + LOG.warning( + TaskRemoteMgmtError( + TaskRemoteMgmtError.MSG_TIDY, item.platform['name'], + item.proc.args, item.proc.returncode, out, err + ) + ) def _subshell_eval_callback(self, proc_ctx, cmd_str): """Callback when subshell eval command exits""" diff --git a/tests/functional/intelligent-host-selection/01-periodic-clear-badhosts.t b/tests/functional/intelligent-host-selection/01-periodic-clear-badhosts.t index 30a759fdb25..72a41d4c07d 100644 --- a/tests/functional/intelligent-host-selection/01-periodic-clear-badhosts.t +++ b/tests/functional/intelligent-host-selection/01-periodic-clear-badhosts.t @@ -82,7 +82,7 @@ __HERE__ # remote tidy fails definition order time round" named_grep_ok "definition order remote tidy fails" \ - "Tried to tidy remote platform: 'mixedhostplatform' using host 'unreachable_host' but failed; trying a different host" \ + "Failed to tidy remote platform 'mixedhostplatform' using host 'unreachable_host'; trying new host '${CYLC_TEST_HOST}'" \ "${WORKFLOW_RUN_DIR}/log/workflow/log" purge "${WORKFLOW_NAME}" "mixedhostplatform"