Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ihs: catch NoHostsError #5195

Merged
merged 2 commits into from
Dec 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ with warning, for scan errors where workflow is stopped.
[#5199](https://github.com/cylc/cylc-flow/pull/5199) - Fix a problem with
the consolidation tutorial.

[#5195](https://github.com/cylc/cylc-flow/pull/5195) -
Fix issue where workflows can fail to shutdown due to unavailable remote
platforms and make job log retrieval more robust.

-------------------------------------------------------------------------------
## __cylc-8.0.3 (<span actions:bind='release-date'>Released 2022-10-17</span>)__

Expand Down
12 changes: 10 additions & 2 deletions cylc/flow/host_select.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,11 @@

from cylc.flow import LOG
from cylc.flow.cfgspec.glbl_cfg import glbl_cfg
from cylc.flow.exceptions import GlobalConfigError, HostSelectException
from cylc.flow.exceptions import (
GlobalConfigError,
HostSelectException,
NoHostsError,
)
from cylc.flow.hostuserutil import get_fqdn_by_host, is_remote_host
from cylc.flow.remote import run_cmd, cylc_server_cmd
from cylc.flow.terminal import parse_dirty_json
Expand Down Expand Up @@ -553,7 +557,11 @@ def _get_metrics(hosts, metrics, data=None):
}
for host in hosts:
if is_remote_host(host):
proc_map[host] = cylc_server_cmd(cmd, host=host, **kwargs)
try:
proc_map[host] = cylc_server_cmd(cmd, host=host, **kwargs)
except NoHostsError:
LOG.warning(f'Could not contact {host}')
continue
else:
proc_map[host] = run_cmd(['cylc'] + cmd, **kwargs)

Expand Down
2 changes: 2 additions & 0 deletions cylc/flow/network/ssh_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ async def async_request(self, command, args=None, timeout=None):
'cylc path': cylc_path,
'use login shell': login_sh,
}
# NOTE: this can not raise NoHostsError
# because we have provided the host
proc = remote_cylc_cmd(
cmd,
platform,
Expand Down
12 changes: 12 additions & 0 deletions cylc/flow/remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,10 @@ def construct_rsync_over_ssh_cmd(
platform: contains info relating to platform
rsync_includes: files and directories to be included in the rsync

Raises:
NoHostsError:
If there are no hosts available for the requested platform.

Developer Warning:
The Cylc Subprocess Pool method ``rsync_255_fail`` relies on
``rsync_cmd[0] == 'rsync'``. Please check that changes to this function
Expand Down Expand Up @@ -369,6 +373,10 @@ def remote_cylc_cmd(
Uses the provided platform configuration to construct the command.

For arguments and returns see construct_ssh_cmd and run_cmd.

Raises:
NoHostsError: If the platform is not contactable.

"""
if not host:
# no host selected => perform host selection from platform config
Expand Down Expand Up @@ -405,6 +413,10 @@ def cylc_server_cmd(cmd, host=None, **kwargs):
with the localhost platform.

For arguments and returns see construct_ssh_cmd and run_cmd.

Raises:
NoHostsError: If the platform is not contactable.

"""
return remote_cylc_cmd(
cmd,
Expand Down
2 changes: 2 additions & 0 deletions cylc/flow/scheduler_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -393,6 +393,8 @@ def _distribute(host, workflow_id_raw, workflow_id):
cmd.append("--host=localhost")

# Re-invoke the command
# NOTE: has the potential to raise NoHostsError, however, this will
# most likely have been raised during host-selection
cylc_server_cmd(cmd, host=host)
sys.exit(0)

Expand Down
2 changes: 2 additions & 0 deletions cylc/flow/scripts/cat_log.py
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,8 @@ def main(
# TODO: Add Intelligent Host selection to this
with suppress(KeyboardInterrupt):
# (Ctrl-C while tailing)
# NOTE: This will raise NoHostsError if the platform is not
# contactable
remote_cylc_cmd(
cmd,
platform,
Expand Down
25 changes: 20 additions & 5 deletions cylc/flow/scripts/check_versions.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
from cylc.flow.cylc_subproc import procopen, PIPE, DEVNULL
from cylc.flow import __version__ as CYLC_VERSION
from cylc.flow.config import WorkflowConfig
from cylc.flow.exceptions import NoHostsError
from cylc.flow.id_cli import parse_id
from cylc.flow.platforms import get_platform, get_host_from_platform
from cylc.flow.remote import construct_ssh_cmd
Expand Down Expand Up @@ -101,15 +102,26 @@ def main(_, options: 'Values', *ids) -> None:
sys.exit(0)

verbose = cylc.flow.flags.verbosity > 0
versions = check_versions(platforms, verbose)
report_results(platforms, versions, options.error)


def check_versions(platforms, verbose):
# get the cylc version on each platform
versions = {}
for platform_name in sorted(platforms):
platform = get_platform(platform_name)
host = get_host_from_platform(
platform,
bad_hosts=None
)
try:
host = get_host_from_platform(
platform,
bad_hosts=None
)
except NoHostsError:
print(
f'Could not connect to {platform["name"]}',
file=sys.stderr
)
continue
cmd = construct_ssh_cmd(
['version'],
platform,
Expand All @@ -127,7 +139,10 @@ def main(_, options: 'Values', *ids) -> None:
versions[platform_name] = out.strip()
else:
versions[platform_name] = f'ERROR: {err.strip()}'
return versions


def report_results(platforms, versions, exit_error):
# report results
max_len = max((len(platform_name) for platform_name in platforms))
print(f'{"platform".rjust(max_len)}: cylc version')
Expand All @@ -136,7 +151,7 @@ def main(_, options: 'Values', *ids) -> None:
print(f'{platform_name.rjust(max_len)}: {result}')
if all((version == CYLC_VERSION for version in versions.values())):
ret_code = 0
elif options.error:
elif exit_error:
ret_code = 1
else:
ret_code = 0
Expand Down
26 changes: 25 additions & 1 deletion cylc/flow/task_events_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@

from cylc.flow import LOG, LOG_LEVELS
from cylc.flow.cfgspec.glbl_cfg import glbl_cfg
from cylc.flow.exceptions import NoHostsError
from cylc.flow.hostuserutil import get_host, get_user, is_remote_platform
from cylc.flow.pathutil import (
get_remote_workflow_run_job_dir,
Expand Down Expand Up @@ -935,8 +936,29 @@ def _get_events_conf(self, itask, key, default=None):

def _process_job_logs_retrieval(self, schd, ctx, id_keys):
"""Process retrieval of task job logs from remote user@host."""
# get a host to run retrieval on
platform = get_platform(ctx.platform_name)
host = get_host_from_platform(platform, bad_hosts=self.bad_hosts)
try:
host = get_host_from_platform(platform, bad_hosts=self.bad_hosts)
except NoHostsError:
# All of the platforms hosts have been found to be uncontactable.
# Reset the bad hosts to allow retrieval retry to take place.
self.bad_hosts -= set(platform['hosts'])
oliver-sanders marked this conversation as resolved.
Show resolved Hide resolved
try:
# Get a new host and try again.
host = get_host_from_platform(
platform,
bad_hosts=self.bad_hosts
)
except NoHostsError:
# We really can't get a host to try on e.g. no hosts
# configured (shouldn't happen). Nothing more we can do here,
# move onto the next submission retry.
for id_key in id_keys:
self.unset_waiting_event_timer(id_key)
return

# construct the retrieval command
ssh_str = str(platform["ssh command"])
rsync_str = str(platform["retrieve job logs command"])
cmd = shlex.split(rsync_str) + ["--rsh=" + ssh_str]
Expand All @@ -962,6 +984,8 @@ def _process_job_logs_retrieval(self, schd, ctx, id_keys):
)
# Local target
cmd.append(get_workflow_run_job_dir(schd.workflow) + "/")

# schedule command
self.proc_pool.put_command(
SubProcContext(
ctx, cmd, env=dict(os.environ), id_keys=id_keys, host=host
Expand Down
29 changes: 17 additions & 12 deletions cylc/flow/task_job_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -522,9 +522,6 @@ def submit_task_jobs(self, workflow, itasks, curve_auth,
cmd, [len(b) for b in itasks_batches])

if remote_mode:
host = get_host_from_platform(
platform, bad_hosts=self.task_remote_mgr.bad_hosts
)
hjoliver marked this conversation as resolved.
Show resolved Hide resolved
cmd = construct_ssh_cmd(
cmd, platform, host
)
Expand Down Expand Up @@ -944,13 +941,23 @@ def _run_job_cmd(
cmd.append(get_remote_workflow_run_job_dir(workflow))
job_log_dirs = []
host = 'localhost'

ctx = SubProcContext(cmd_key, cmd, host=host)
if remote_mode:
host = get_host_from_platform(
platform, bad_hosts=self.task_remote_mgr.bad_hosts
)
cmd = construct_ssh_cmd(
cmd, platform, host
)
try:
host = get_host_from_platform(
platform, bad_hosts=self.task_remote_mgr.bad_hosts
)
cmd = construct_ssh_cmd(
cmd, platform, host
)
except NoHostsError:
ctx.err = f'No available hosts for {platform["name"]}'
callback_255(ctx, workflow, itasks)
continue
else:
ctx = SubProcContext(cmd_key, cmd, host=host)

for itask in sorted(itasks, key=lambda task: task.identity):
job_log_dirs.append(
itask.tokens.duplicate(
Expand All @@ -960,9 +967,7 @@ def _run_job_cmd(
cmd += job_log_dirs
LOG.debug(f'{cmd_key} for {platform["name"]} on {host}')
self.proc_pool.put_command(
SubProcContext(
cmd_key, cmd, host=host
),
ctx,
bad_hosts=self.task_remote_mgr.bad_hosts,
callback=callback,
callback_args=[workflow, itasks],
Expand Down
37 changes: 21 additions & 16 deletions cylc/flow/task_remote_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,7 @@ def remote_init(
)
self.remote_init_map[
platform['install target']] = REMOTE_INIT_FAILED
# reset the bad hosts to allow remote-init to retry
self.bad_hosts -= set(platform['hosts'])
self.ready = True
else:
Expand All @@ -266,6 +267,24 @@ def remote_init(
callback_255_args=[platform]
)

def construct_remote_tidy_ssh_cmd(
self, platform: Dict[str, Any]
) -> Tuple[List[str], str]:
"""Return a remote-tidy SSH command.

Rasies:
NoHostsError: If the platform is not contactable.
"""
cmd = ['remote-tidy']
cmd.extend(verbosity_to_opts(cylc.flow.flags.verbosity))
cmd.append(get_install_target_from_platform(platform))
cmd.append(get_remote_workflow_run_dir(self.workflow))
host = get_host_from_platform(
platform, bad_hosts=self.bad_hosts
)
cmd = construct_ssh_cmd(cmd, platform, host, timeout='10s')
return cmd, host

def remote_tidy(self) -> None:
"""Remove workflow contact files and keys from initialised remotes.

Expand All @@ -274,20 +293,6 @@ def remote_tidy(self) -> None:
Timeout any incomplete commands after 10 seconds.
"""
# Issue all SSH commands in parallel

def construct_remote_tidy_ssh_cmd(
platform: Dict[str, Any]
) -> Tuple[List[str], str]:
cmd = ['remote-tidy']
hjoliver marked this conversation as resolved.
Show resolved Hide resolved
cmd.extend(verbosity_to_opts(cylc.flow.flags.verbosity))
cmd.append(get_install_target_from_platform(platform))
cmd.append(get_remote_workflow_run_dir(self.workflow))
host = get_host_from_platform(
platform, bad_hosts=self.bad_hosts
)
cmd = construct_ssh_cmd(cmd, platform, host, timeout='10s')
return cmd, host

queue: Deque[RemoteTidyQueueTuple] = deque()
for install_target, message in self.remote_init_map.items():
if message != REMOTE_FILE_INSTALL_DONE:
Expand All @@ -298,7 +303,7 @@ def construct_remote_tidy_ssh_cmd(
platform = get_random_platform_for_install_target(
install_target
)
cmd, host = construct_remote_tidy_ssh_cmd(platform)
cmd, host = self.construct_remote_tidy_ssh_cmd(platform)
except (NoHostsError, PlatformLookupError) as exc:
LOG.warning(
PlatformError(
Expand Down Expand Up @@ -332,7 +337,7 @@ def construct_remote_tidy_ssh_cmd(
timeout = time() + 10.0
self.bad_hosts.add(item.host)
try:
retry_cmd, retry_host = construct_remote_tidy_ssh_cmd(
retry_cmd, retry_host = self.construct_remote_tidy_ssh_cmd(
item.platform
)
except (NoHostsError, PlatformLookupError) as exc:
Expand Down
4 changes: 4 additions & 0 deletions cylc/flow/workflow_files.py
Original file line number Diff line number Diff line change
Expand Up @@ -1077,6 +1077,10 @@ def _remote_clean_cmd(
platform: Config for the platform on which to remove the workflow.
rm_dirs: Sub dirs to remove instead of the whole run dir.
timeout: Number of seconds to wait before cancelling the command.

Raises:
NoHostsError: If the platform is not contactable.

"""
LOG.debug(
f"Cleaning {reg} on install target: {platform['install target']} "
Expand Down
Loading