Skip to content

Commit

Permalink
made reinstall work on multiple workflows
Browse files Browse the repository at this point in the history
flake8 clean up

fixed unit tests

fixed cylc-combination/01-vr-reload functional test

fixed cylc-reinstall/00-simple.t functional test

updated change log

review amends
  • Loading branch information
Mark Dawson committed Nov 22, 2023
1 parent eb769d8 commit 8140c6f
Show file tree
Hide file tree
Showing 6 changed files with 91 additions and 66 deletions.
1 change: 1 addition & 0 deletions changes.d/5803.feat.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Updated 'reinstall' functionality to support multiple workflows
46 changes: 28 additions & 18 deletions cylc/flow/scheduler_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
from cylc.flow.id import upgrade_legacy_ids
from cylc.flow.host_select import select_workflow_host
from cylc.flow.hostuserutil import is_remote_host
from cylc.flow.id_cli import parse_ids
from cylc.flow.id_cli import parse_ids_async
from cylc.flow.loggingutil import (
close_log,
RotatingLogFileHandler,
Expand Down Expand Up @@ -354,7 +354,11 @@ def _open_logs(id_: str, no_detach: bool, restart_num: int) -> None:
)


def scheduler_cli(options: 'Values', workflow_id_raw: str) -> None:
async def scheduler_cli(
options: 'Values',
workflow_id_raw: str,
parse_workflow_id: bool = True
) -> None:
"""Run the workflow.
This function should contain all of the command line facing
Expand All @@ -368,15 +372,18 @@ def scheduler_cli(options: 'Values', workflow_id_raw: str) -> None:
# Parse workflow name but delay Cylc 7 suite.rc deprecation warning
# until after the start-up splash is printed.
# TODO: singleton
(workflow_id,), _ = parse_ids(
workflow_id_raw,
constraint='workflows',
max_workflows=1,
# warn_depr=False, # TODO
)
if parse_workflow_id:
(workflow_id,), _ = await parse_ids_async(
workflow_id_raw,
constraint='workflows',
max_workflows=1,
# warn_depr=False, # TODO
)
else:
workflow_id = workflow_id_raw

# resume the workflow if it is already running
_resume(workflow_id, options)
await _resume(workflow_id, options)

# check the workflow can be safely restarted with this version of Cylc
db_file = Path(get_workflow_srv_dir(workflow_id), 'db')
Expand All @@ -400,9 +407,7 @@ def scheduler_cli(options: 'Values', workflow_id_raw: str) -> None:
# NOTE: asyncio.run opens an event loop, runs your coro,
# then shutdown async generators and closes the event loop
scheduler = Scheduler(workflow_id, options)
asyncio.run(
_setup(scheduler)
)
await _setup(scheduler)

# daemonize if requested
# NOTE: asyncio event loops cannot persist across daemonization
Expand All @@ -419,9 +424,14 @@ def scheduler_cli(options: 'Values', workflow_id_raw: str) -> None:
)

# run the workflow
ret = asyncio.run(
_run(scheduler)
)
if options.no_detach:
ret = await _run(scheduler)
else:
# Note: The daemonization messes with asyncio so we have to start a
# new event loop if detaching
ret = asyncio.run(
_run(scheduler)
)

# exit
# NOTE: we must clean up all asyncio / threading stuff before exiting
Expand All @@ -432,7 +442,7 @@ def scheduler_cli(options: 'Values', workflow_id_raw: str) -> None:
sys.exit(ret)


def _resume(workflow_id, options):
async def _resume(workflow_id, options):
"""Resume the workflow if it is already running."""
try:
detect_old_contact_file(workflow_id)
Expand All @@ -448,7 +458,7 @@ def _resume(workflow_id, options):
'wFlows': [workflow_id]
}
}
pclient('graphql', mutation_kwargs)
await pclient.async_request('graphql', mutation_kwargs)
sys.exit(0)
except CylcError as exc:
LOG.error(exc)
Expand Down Expand Up @@ -651,4 +661,4 @@ def _play(parser: COP, options: 'Values', id_: str):
*options.starttask,
relative=True,
)
return scheduler_cli(options, id_)
return asyncio.run(scheduler_cli(options, id_))
35 changes: 21 additions & 14 deletions cylc/flow/scripts/reinstall.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
from pathlib import Path
import sys
from typing import Optional, TYPE_CHECKING, List, Callable
from functools import partial

from ansimarkup import parse as cparse

Expand All @@ -83,12 +84,13 @@
from cylc.flow.install import (
reinstall_workflow,
)
from cylc.flow.id_cli import parse_id
from cylc.flow.network.multi import call_multi
from cylc.flow.option_parsers import (
CylcOptionParser as COP,
OptionSettings,
WORKFLOW_ID_ARG_DOC,
ID_MULTI_ARG_DOC
)

from cylc.flow.pathutil import get_workflow_run_dir
from cylc.flow.workflow_files import (
get_workflow_source_dir,
Expand All @@ -101,7 +103,6 @@

_input = input # to enable testing


REINSTALL_CYLC_ROSE_OPTIONS = [
OptionSettings(
['--clear-rose-install-options'],
Expand All @@ -127,7 +128,12 @@

def get_option_parser() -> COP:
parser = COP(
__doc__, comms=True, argdoc=[WORKFLOW_ID_ARG_DOC]
__doc__,
comms=True,
multiworkflow=True,
argdoc=[
ID_MULTI_ARG_DOC,
],
)

try:
Expand All @@ -149,27 +155,28 @@ def get_option_parser() -> COP:
def main(
_parser: COP,
opts: 'Values',
args: Optional[str] = None
*ids: str
) -> None:
"""CLI wrapper."""
reinstall_cli(opts, args)
call_multi(
partial(reinstall_cli, opts),
*ids,
constraint='workflows',
report=lambda x: print('Done')
)


def reinstall_cli(
async def reinstall_cli(
opts: 'Values',
args: Optional[str] = None,
print_reload_tip: bool = True,
workflow_id,
*tokens_list,
print_reload_tip: bool = True
) -> bool:
"""Implement cylc reinstall.
This is the bit which contains all the CLI logic.
"""
run_dir: Optional[Path]
workflow_id: str
workflow_id, *_ = parse_id(
args,
constraint='workflows',
)
run_dir = Path(get_workflow_run_dir(workflow_id))
if not run_dir.is_dir():
raise WorkflowFilesError(
Expand Down
25 changes: 15 additions & 10 deletions cylc/flow/scripts/validate_reinstall.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@
ContactFileExists,
CylcError,
)
from cylc.flow.id_cli import parse_id
from cylc.flow.id_cli import parse_id_async
from cylc.flow.loggingutil import set_timestamps
from cylc.flow.option_parsers import (
WORKFLOW_ID_ARG_DOC,
Expand All @@ -62,19 +62,20 @@
from cylc.flow.scheduler_cli import PLAY_OPTIONS, scheduler_cli
from cylc.flow.scripts.validate import (
VALIDATE_OPTIONS,
_main as cylc_validate
wrapped_main as cylc_validate
)
from cylc.flow.scripts.reinstall import (
REINSTALL_CYLC_ROSE_OPTIONS,
REINSTALL_OPTIONS,
reinstall_cli as cylc_reinstall,
)
from cylc.flow.scripts.reload import (
reload_cli as cylc_reload
run as cylc_reload
)
from cylc.flow.terminal import cli_function
from cylc.flow.workflow_files import detect_old_contact_file

import asyncio

CYLC_ROSE_OPTIONS = COP.get_cylc_rose_options()
VR_OPTIONS = combine_options(
Expand Down Expand Up @@ -124,16 +125,16 @@ def check_tvars_and_workflow_stopped(

@cli_function(get_option_parser)
def main(parser: COP, options: 'Values', workflow_id: str):
sys.exit(vro_cli(parser, options, workflow_id))
sys.exit(asyncio.run(vr_cli(parser, options, workflow_id)))


def vro_cli(parser: COP, options: 'Values', workflow_id: str):
async def vr_cli(parser: COP, options: 'Values', workflow_id: str):
"""Run Cylc (re)validate - reinstall - reload in sequence."""
# Attempt to work out whether the workflow is running.
# We are trying to avoid reinstalling then subsequently being
# unable to play or reload because we cannot identify workflow state.
unparsed_wid = workflow_id
workflow_id, *_ = parse_id(
workflow_id, *_ = await parse_id_async(
workflow_id,
constraint='workflows',
)
Expand Down Expand Up @@ -166,10 +167,14 @@ def vro_cli(parser: COP, options: 'Values', workflow_id: str):
# Force on the against_source option:
options.against_source = True # Make validate check against source.
log_subcommand('validate --against-source', workflow_id)
cylc_validate(parser, options, workflow_id)
await cylc_validate(parser, options, workflow_id)

log_subcommand('reinstall', workflow_id)
reinstall_ok = cylc_reinstall(options, workflow_id, print_reload_tip=False)
reinstall_ok = await cylc_reinstall(
options, workflow_id,
[],
print_reload_tip=False
)
if not reinstall_ok:
LOG.warning(
'No changes to source: No reinstall or'
Expand All @@ -180,7 +185,7 @@ def vro_cli(parser: COP, options: 'Values', workflow_id: str):
# Run reload if workflow is running or paused:
if workflow_running:
log_subcommand('reload', workflow_id)
cylc_reload(options, workflow_id)
await cylc_reload(options, workflow_id)

# run play anyway, to play a stopped workflow:
else:
Expand All @@ -197,4 +202,4 @@ def vro_cli(parser: COP, options: 'Values', workflow_id: str):
source='', # Intentionally blank
)
log_subcommand(*sys.argv[1:])
scheduler_cli(options, workflow_id)
await scheduler_cli(options, workflow_id, parse_workflow_id=False)
1 change: 1 addition & 0 deletions tests/functional/cylc-reinstall/00-simple.t
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ run_ok "${TEST_NAME}-reinstall" cylc reinstall "${RND_WORKFLOW_NAME}/run1"
cmp_ok "${TEST_NAME}-reinstall.stdout" <<__OUT__
REINSTALLED $RND_WORKFLOW_NAME/run1 from ${RND_WORKFLOW_SOURCE}
Successfully reinstalled.
Done
__OUT__
popd || exit 1
purge_rnd_workflow
Expand Down
Loading

0 comments on commit 8140c6f

Please sign in to comment.