Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Scan workflow name during install. #5184

Merged
merged 11 commits into from
Oct 25, 2022
3 changes: 3 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ ones in. -->

### Enhancements

[#5184](https://github.com/cylc/cylc-flow/pull/5184) - scan for active
runs of the same workflow at install time.

[#5032](https://github.com/cylc/cylc-flow/pull/5032) - set a default limit of
100 for the "default" queue.

Expand Down
119 changes: 106 additions & 13 deletions cylc/flow/scripts/install.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,21 +80,37 @@
multiple workflow run directories that link to the same workflow definition.
"""

from ansimarkup import ansiprint as cprint
import asyncio
from optparse import Values
from pathlib import Path
from typing import Optional, TYPE_CHECKING, Dict, Any
from typing import Optional, Dict, Any

from cylc.flow.scripts.scan import (
get_pipe,
_format_plain,
FLOW_STATE_SYMBOLS,
FLOW_STATE_CMAP
)
from cylc.flow import iter_entry_points
from cylc.flow.exceptions import PluginError, InputError
from cylc.flow.option_parsers import CylcOptionParser as COP
from cylc.flow.pathutil import EXPLICIT_RELATIVE_PATH_REGEX, expand_path
from cylc.flow.loggingutil import CylcLogFormatter
from cylc.flow.option_parsers import (
CylcOptionParser as COP,
Options
)
from cylc.flow.pathutil import (
EXPLICIT_RELATIVE_PATH_REGEX,
expand_path,
get_workflow_run_dir
)
from cylc.flow.workflow_files import (
install_workflow, search_install_source_dirs, parse_cli_sym_dirs
install_workflow,
parse_cli_sym_dirs,
search_install_source_dirs
)
from cylc.flow.terminal import cli_function

if TYPE_CHECKING:
from optparse import Values


def get_option_parser() -> COP:
parser = COP(
Expand Down Expand Up @@ -150,6 +166,16 @@ def get_option_parser() -> COP:
default=False,
dest="no_run_name")

parser.add_option(
"--no-ping",
help=(
"When scanning for active instances of the workflow, "
"do not attempt to contact the schedulers to get status."
),
action="store_true",
default=False,
dest="no_ping")

parser.add_cylc_rose_options()

return parser
Expand All @@ -162,7 +188,7 @@ def get_source_location(path: Optional[str]) -> Path:
"""
if path is None:
return Path.cwd()
path = path.strip()
path = str(path).strip()
expanded_path = Path(expand_path(path))
if expanded_path.is_absolute():
return expanded_path
Expand All @@ -171,14 +197,79 @@ def get_source_location(path: Optional[str]) -> Path:
return search_install_source_dirs(expanded_path)


async def scan(wf_name: str, ping: bool = True) -> None:
"""Print any instances of wf_name that are already active."""
opts = Values({
'name': [f'{wf_name}/*'],
hjoliver marked this conversation as resolved.
Show resolved Hide resolved
'states': {'running', 'paused', 'stopping'},
'source': False,
'ping': ping, # get status of scanned workflows
})
active = [
item async for item in get_pipe(
opts, None,
scan_dir=get_workflow_run_dir(wf_name) # restricted scan
)
]
if active:
n = len(active)
grammar = (
["s", "are", "them all"]
if n > 1 else
["", "is", "it"]
)
print(
CylcLogFormatter.COLORS['WARNING'].format(
f'NOTE: {n} run%s of "{wf_name}"'
' %s already active:' % tuple(grammar[:2])
)
)
for item in active:
if opts.ping:
status = item['status']
tag = FLOW_STATE_CMAP[status]
symbol = f" <{tag}>{FLOW_STATE_SYMBOLS[status]}</{tag}>"
else:
symbol = " "
cprint(symbol, _format_plain(item, opts))
pattern = (
f"'{wf_name}/*'"
if n > 1 else
f"{item['name']}"
)
print(
f'You can stop %s with:\n cylc stop {pattern}'
'\nSee "cylc stop --help" for options.' % grammar[-1]
)


InstallOptions = Options(get_option_parser())


@cli_function(get_option_parser)
def main(parser, opts, reg=None):
install(parser, opts, reg)
def main(
_parser: COP,
opts: 'Values',
reg: Optional[str] = None
) -> None:
"""CLI wrapper."""
install_cli(opts, reg)


def install(
parser: COP, opts: 'Values', reg: Optional[str] = None
def install_cli(
opts: 'Values',
reg: Optional[str] = None
) -> None:
"""Install workflow and scan for already-running instances."""
wf_name = install(opts, reg)
asyncio.run(
scan(wf_name, not opts.no_ping)
)


def install(
opts: 'Values', reg: Optional[str] = None
) -> str:
if opts.no_run_name and opts.run_name:
raise InputError(
"options --no-run-name and --run-name are mutually exclusive."
Expand All @@ -204,7 +295,7 @@ def install(
elif opts.symlink_dirs:
cli_symdirs = parse_cli_sym_dirs(opts.symlink_dirs)

source_dir, rundir, _workflow_name = install_workflow(
source_dir, rundir, workflow_name = install_workflow(
source=source,
workflow_name=opts.workflow_name,
run_name=opts.run_name,
Expand All @@ -229,3 +320,5 @@ def install(
entry_point.name,
exc
) from None

return workflow_name
150 changes: 150 additions & 0 deletions tests/integration/test_install.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE.
# Copyright (C) NIWA & British Crown (Met Office) & Contributors.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

"""Test cylc install."""

import pytest
from pathlib import Path

from .test_scan import init_flows

from cylc.flow.async_util import pipe
from cylc.flow.scripts import scan
from cylc.flow.workflow_files import WorkflowFiles
from cylc.flow.scripts.install import (
InstallOptions,
install_cli
)

from typing import Callable, Tuple

SRV_DIR = Path(WorkflowFiles.Service.DIRNAME)
CONTACT = Path(WorkflowFiles.Service.CONTACT)
RUN_N = Path(WorkflowFiles.RUN_N)
INSTALL = Path(WorkflowFiles.Install.DIRNAME)

INSTALLED_MSG = "INSTALLED {wfrun} from"
WF_ACTIVE_MSG = '1 run of "{wf}" is already active:'
BAD_CONTACT_MSG = "Bad contact file:"


@pytest.fixture()
def patch_graphql_query(
monkeypatch: pytest.MonkeyPatch
):
# Define a mocked graphql_query pipe function.
@pipe
async def _graphql_query(flow, fields, filters=None):
flow.update({"status": "running"})
return flow

# Swap out the function that cylc.flow.scripts.scan.
monkeypatch.setattr(
'cylc.flow.scripts.scan.graphql_query',
_graphql_query,
)


@pytest.fixture()
def src_run_dirs(
mock_glbl_cfg: Callable,
monkeypatch: pytest.MonkeyPatch,
tmp_path: Path
) -> Tuple[Path, Path]:
"""Create some workflow source and run dirs for testing.

Source dirs:
<tmp-src>/w1
<tmp-src>/w2

Run dir:
<tmp-run>/w1/run1

"""
tmp_src_path = tmp_path / 'cylc-src'
tmp_run_path = tmp_path / 'cylc-run'
tmp_src_path.mkdir()
tmp_run_path.mkdir()

init_flows(
tmp_run_path=tmp_run_path,
running=('w1/run1',),
tmp_src_path=tmp_src_path,
src=('w1', 'w2')
)
mock_glbl_cfg(
'cylc.flow.workflow_files.glbl_cfg',
f'''
[install]
source dirs = {tmp_src_path}
'''
)
monkeypatch.setattr('cylc.flow.pathutil._CYLC_RUN_DIR', tmp_run_path)

return tmp_src_path, tmp_run_path


def test_install_scan_no_ping(
src_run_dirs: Callable,
capsys: pytest.CaptureFixture,
caplog: pytest.LogCaptureFixture
) -> None:
"""At install, running intances should be reported.

Ping = False case: don't query schedulers.
"""

opts = InstallOptions()
opts.no_ping = True

install_cli(opts, reg='w1')
out = capsys.readouterr().out
assert INSTALLED_MSG.format(wfrun='w1/run2') in out
assert WF_ACTIVE_MSG.format(wf='w1') in out
# Empty contact file faked with "touch":
assert f"{BAD_CONTACT_MSG} w1/run1" in caplog.text
hjoliver marked this conversation as resolved.
Show resolved Hide resolved

install_cli(opts, reg='w2')
out = capsys.readouterr().out
assert WF_ACTIVE_MSG.format(wf='w2') not in out
assert INSTALLED_MSG.format(wfrun='w2/run1') in out


def test_install_scan_ping(
src_run_dirs: Callable,
capsys: pytest.CaptureFixture,
caplog: pytest.LogCaptureFixture,
patch_graphql_query: Callable
) -> None:
"""At install, running intances should be reported.

Ping = True case: but mock scan's scheduler query method.
"""
opts = InstallOptions()
opts.no_ping = False

install_cli(opts, reg='w1')
out = capsys.readouterr().out
assert INSTALLED_MSG.format(wfrun='w1/run2') in out
assert WF_ACTIVE_MSG.format(wf='w1') in out
assert scan.FLOW_STATE_SYMBOLS["running"] in out
# Empty contact file faked with "touch":
assert f"{BAD_CONTACT_MSG} w1/run1" in caplog.text

install_cli(opts, reg='w2')
out = capsys.readouterr().out
assert INSTALLED_MSG.format(wfrun='w2/run1') in out
assert WF_ACTIVE_MSG.format(wf='w2') not in out
26 changes: 18 additions & 8 deletions tests/integration/test_scan.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,17 +42,20 @@
INSTALL = Path(WorkflowFiles.Install.DIRNAME)


def init_flows(tmp_path, running=None, registered=None, un_registered=None):
def init_flows(tmp_run_path=None, running=None, registered=None,
un_registered=None, tmp_src_path=None, src=None):
"""Create some dummy workflows for scan to discover.

Assume "run1, run2, ..., runN" structure if flow name constains "run".
Optionally create workflow source dirs in a give location too.

"""
def make_registered(name, running=False):
run_d = Path(tmp_path, name)
run_d = Path(tmp_run_path, name)
run_d.mkdir(parents=True, exist_ok=True)
(run_d / "flow.cylc").touch()
if "run" in name:
root = Path(tmp_path, name).parent
root = Path(tmp_run_path, name).parent
with suppress(FileExistsError):
(root / "runN").symlink_to(run_d, target_is_directory=True)
else:
Expand All @@ -63,12 +66,19 @@ def make_registered(name, running=False):
if running:
(srv_d / CONTACT).touch()

def make_src(name):
src_d = Path(tmp_src_path, name)
src_d.mkdir(parents=True, exist_ok=True)
(src_d / "flow.cylc").touch()

for name in (running or []):
make_registered(name, running=True)
for name in (registered or []):
make_registered(name)
for name in (un_registered or []):
Path(tmp_path, name).mkdir(parents=True, exist_ok=True)
Path(tmp_run_path, name).mkdir(parents=True, exist_ok=True)
for name in (src or []):
make_src(name)


@pytest.fixture(scope='session')
Expand Down Expand Up @@ -157,14 +167,14 @@ def source_dirs(mock_glbl_cfg):
src1 = src / '1'
src1.mkdir()
init_flows(
src1,
registered=('a', 'b/c')
tmp_src_path=src1,
src=('a', 'b/c')
)
src2 = src / '2'
src2.mkdir()
init_flows(
src2,
registered=('d', 'e/f')
tmp_src_path=src2,
src=('d', 'e/f')
)
mock_glbl_cfg(
'cylc.flow.scripts.scan.glbl_cfg',
Expand Down