Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix error after remote tidy timeout #4465

Merged
merged 6 commits into from
Oct 15, 2021
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,10 @@ improvements.
using the same name for a platform and a platform group. Which one it should
pick is ambiguous, and is a setup error.

[#4465](https://github.com/cylc/cylc-flow/pull/4465) -
Fix a `ValueError` that could occasionally occur during remote tidy on
workflow shutdown.

-------------------------------------------------------------------------------
## __cylc-8.0b2 (<span actions:bind='release-date'>Released 2021-07-28</span>)__

Expand Down
2 changes: 1 addition & 1 deletion cylc/flow/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ def __init__(
if isinstance(cmd, list):
self.cmd = " ".join(cmd)

def __str__(self):
def __str__(self) -> str:
ret = (f"{self.platform_n}: {self.msg}:\n"
f"COMMAND FAILED ({self.ret_code}): {self.cmd}\n")
for label, item in ('STDOUT', self.out), ('STDERR', self.err):
Expand Down
155 changes: 81 additions & 74 deletions cylc/flow/task_remote_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
- Implement basic host select functionality.
"""

from collections import deque
from contextlib import suppress
from cylc.flow.option_parsers import verbosity_to_opts
import os
Expand All @@ -30,7 +31,7 @@
from subprocess import Popen, PIPE, DEVNULL
import tarfile
from time import time
from typing import Any, Dict, TYPE_CHECKING
from typing import Any, Deque, Dict, TYPE_CHECKING, List, NamedTuple, Tuple

from cylc.flow import LOG, RSYNC_LOG
from cylc.flow.exceptions import TaskRemoteMgmtError
Expand All @@ -52,9 +53,11 @@
get_contact_file)
from cylc.flow.platforms import (
get_host_from_platform,
get_install_target_from_platform,
get_localhost_install_target,
get_random_platform_for_install_target,
NoHostsError
NoHostsError,
PlatformLookupError
)

if TYPE_CHECKING:
Expand All @@ -71,6 +74,12 @@
REMOTE_FILE_INSTALL_255 = 'REMOTE FILE INSTALL 255'


class RemoteTidyQueueTuple(NamedTuple):
platform: Dict[str, Any]
host: str
proc: 'Popen[str]'
oliver-sanders marked this conversation as resolved.
Show resolved Hide resolved


class TaskRemoteMgr:
"""Manage task job remote initialisation, tidy, selection."""

Expand Down Expand Up @@ -245,114 +254,112 @@ def remote_init(
callback_255_args=[platform]
)

def remote_tidy(self):
def remote_tidy(self) -> None:
"""Remove workflow contact files and keys from initialised remotes.

Call "cylc remote-tidy".
This method is called on workflow shutdown, so we want nothing to hang.
Timeout any incomplete commands after 10 seconds.
"""
from cylc.flow.platforms import PlatformLookupError
# Issue all SSH commands in parallel

def construct_remote_tidy_ssh_cmd(install_target, platform):
def construct_remote_tidy_ssh_cmd(
platform: Dict[str, Any]
) -> Tuple[List[str], str]:
cmd = ['remote-tidy']
cmd.extend(verbosity_to_opts(cylc.flow.flags.verbosity))
cmd.append(install_target)
cmd.append(get_install_target_from_platform(platform))
cmd.append(get_remote_workflow_run_dir(self.workflow))
host = get_host_from_platform(
platform, bad_hosts=self.bad_hosts
)
cmd = construct_ssh_cmd(
cmd, platform, host, timeout='10s'
)
cmd = construct_ssh_cmd(cmd, platform, host, timeout='10s')
return cmd, host

procs = {}
queue: Deque[RemoteTidyQueueTuple] = deque()
for install_target, message in self.remote_init_map.items():
if message != REMOTE_FILE_INSTALL_DONE:
continue
if install_target == get_localhost_install_target():
continue
platform = get_random_platform_for_install_target(install_target)
platform_n = platform['name']
try:
cmd, host = construct_remote_tidy_ssh_cmd(
install_target, platform)
except (NoHostsError, PlatformLookupError):
LOG.warning(TaskRemoteMgmtError(
TaskRemoteMgmtError.MSG_TIDY,
platform_n, 1, '', '', 'remote tidy'
))
cmd, host = construct_remote_tidy_ssh_cmd(platform)
except (NoHostsError, PlatformLookupError) as exc:
LOG.warning(
f"remote tidy on {platform['name']}: "
f"{TaskRemoteMgmtError.MSG_TIDY}\n{exc}"
)
else:
LOG.debug(
"Removing authentication keys and contact file "
f"from remote: \"{install_target}\"")
procs[platform_n] = (
cmd,
host,
Popen( # nosec
cmd,
stdout=PIPE,
stderr=PIPE,
stdin=DEVNULL,
queue.append(
RemoteTidyQueueTuple(
platform,
host,
Popen( # nosec
cmd, stdout=PIPE, stderr=PIPE, stdin=DEVNULL,
text=True
) # * command constructed by internal interface
)
# * command constructed by internal interface
)
# Wait for commands to complete for a max of 10 seconds
timeout = time() + 10.0
while procs and time() < timeout:
for platform_n, (cmd, host, proc) in procs.copy().items():
if proc.poll() is None:
continue
del procs[platform_n]
out, err = (f.decode() for f in proc.communicate())
# 255 error has to be handled here becuase remote tidy doesn't
# use SubProcPool.
if proc.returncode == 255:
timeout = time() + 10.0
self.bad_hosts.add(host)
while queue and time() < timeout:
MetRonnie marked this conversation as resolved.
Show resolved Hide resolved
item = queue.popleft()
if item.proc.poll() is None: # proc still running
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could use proc.wait(timeout=10) to avoid this polling loop and the subsequent termination loop e.g:

for proc in procs:
    try:
        if proc.wait(10):
            ...
        else:
            ...
    except subprocess.TimeoutExpired:
        ...

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that would negate the concurrency in the way the queue is currently handled?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The processes would still be concurrent, however, the result retrieval would be sequential:

from subprocess import *
from time import time

procs = [
    Popen(['sleep', '2'])
    for _ in range(5)
]

start = time()

for proc in procs:
    proc.wait()

print(f'{time() - start}')
$ python test.py
2.007575035095215

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, but considering that retries for SSH 255 failures are queued after result retrieval, it would still slow it down in those cases, I'd have thought

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, damn, adding to the list whilst processing it makes a mess of things.

queue.append(item)
continue
out, err = item.proc.communicate()
# 255 error has to be handled here because remote tidy doesn't
# use SubProcPool.
if item.proc.returncode == 255:
timeout = time() + 10.0
self.bad_hosts.add(item.host)
LOG.warning(
f"Failed to tidy remote platform: "
f"'{item.platform['name']}' using host '{item.host}'; "
"trying a different host"
)
try:
retry_cmd, retry_host = construct_remote_tidy_ssh_cmd(
item.platform
)
except (NoHostsError, PlatformLookupError) as exc:
LOG.warning(
f'Tried to tidy remote platform: \'{platform_n}\' '
f'using host \'{host}\' but failed; '
'trying a different host'
f"remote tidy on {item.platform['name']}: "
MetRonnie marked this conversation as resolved.
Show resolved Hide resolved
f"{TaskRemoteMgmtError.MSG_TIDY}\n{exc}"
)
try:
retry_cmd, host = construct_remote_tidy_ssh_cmd(
install_target, platform
)
except (NoHostsError, PlatformLookupError):
LOG.warning(TaskRemoteMgmtError(
TaskRemoteMgmtError.MSG_TIDY, platform_n, '',
'', '', ''
))
else:
procs[platform_n] = (
retry_cmd,
host,
Popen( # nosec
retry_cmd,
stdout=PIPE,
stderr=PIPE,
stdin=DEVNULL,
)
# * command constructed by internal interface
else:
queue.append(
item._replace(
host=retry_host,
proc=Popen( # nosec
retry_cmd, stdout=PIPE, stderr=PIPE,
stdin=DEVNULL, text=True
) # * command constructed by internal interface
)
if proc.wait() and proc.returncode != 255:
LOG.warning(TaskRemoteMgmtError(
TaskRemoteMgmtError.MSG_TIDY,
platform_n, ' '.join(quote(item) for item in cmd),
proc.returncode, out, err))
)
else:
LOG.warning(
TaskRemoteMgmtError(
TaskRemoteMgmtError.MSG_TIDY, item.platform['name'],
item.proc.args, item.proc.returncode, out, err
)
)
# Terminate any remaining commands
for platform_n, (cmd, proc) in procs.items():
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was the offending line

for item in queue:
with suppress(OSError):
proc.terminate()
out, err = (f.decode() for f in proc.communicate())
if proc.wait():
LOG.warning(TaskRemoteMgmtError(
TaskRemoteMgmtError.MSG_TIDY,
platform_n, ' '.join(quote(item) for item in cmd),
proc.returncode, out, err))
item.proc.terminate()
out, err = item.proc.communicate()
if item.proc.wait():
LOG.warning(
TaskRemoteMgmtError(
TaskRemoteMgmtError.MSG_TIDY, item.platform['name'],
item.proc.args, item.proc.returncode, out, err
)
)

def _subshell_eval_callback(self, proc_ctx, cmd_str):
"""Callback when subshell eval command exits"""
Expand Down