diff --git a/.github/workflows/2_auto_publish_release.yml b/.github/workflows/2_auto_publish_release.yml index 6282a0f6e57..3896243511d 100644 --- a/.github/workflows/2_auto_publish_release.yml +++ b/.github/workflows/2_auto_publish_release.yml @@ -70,3 +70,10 @@ jobs: GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} with: release-url: ${{ steps.create-release.outputs.html_url }} + + - name: Bump dev version + uses: cylc/release-actions/stage-2/bump-dev-version@v1 + env: + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} + with: + init-file: 'cylc/flow/__init__.py' diff --git a/.github/workflows/bash.yml b/.github/workflows/bash.yml index 988975f6581..cf0e49c362b 100644 --- a/.github/workflows/bash.yml +++ b/.github/workflows/bash.yml @@ -14,7 +14,9 @@ on: - '**.md' - '**/README*/**' push: - branches: [master] + branches: + - master + - '8.*.x' paths-ignore: - '.github/workflows/*.ya?ml' - '!.github/workflows/bash.yml' diff --git a/.github/workflows/test_fast.yml b/.github/workflows/test_fast.yml index 97ac3bb0fc2..192dd7d10fa 100644 --- a/.github/workflows/test_fast.yml +++ b/.github/workflows/test_fast.yml @@ -4,7 +4,9 @@ on: pull_request: workflow_dispatch: push: - branches: [master] + branches: + - master + - '8.*.x' concurrency: group: ${{ github.workflow }}-${{ github.ref }} diff --git a/.github/workflows/test_functional.yml b/.github/workflows/test_functional.yml index 1d01a420e49..7f9514164ae 100644 --- a/.github/workflows/test_functional.yml +++ b/.github/workflows/test_functional.yml @@ -13,7 +13,9 @@ on: - '**.md' - '**/README*/**' push: - branches: [master] + branches: + - master + - '8.*.x' paths-ignore: - '.github/workflows/*.ya?ml' - '!.github/workflows/test_functional.yml' diff --git a/.github/workflows/test_tutorial_workflow.yml b/.github/workflows/test_tutorial_workflow.yml index e566a3eeb1f..0db3325aec1 100644 --- a/.github/workflows/test_tutorial_workflow.yml +++ b/.github/workflows/test_tutorial_workflow.yml @@ -2,7 +2,9 @@ name: test-tutorial-workflow on: push: - branches: [master] + branches: + - master + - '8.*.x' pull_request: paths-ignore: - '.github/workflows/*.ya?ml' diff --git a/CHANGES.md b/CHANGES.md index d02980efc04..baf3d428a56 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -28,8 +28,34 @@ creating a new release entry be sure to copy & paste the span tag with the `actions:bind` attribute, which is used by a regex to find the text to be updated. Only the first match gets replaced, so it's fine to leave the old ones in. --> + ------------------------------------------------------------------------------- +## __cylc-8.0.3 (Pending YYYY-MM-DD)__ + +Maintenance release. + +### Fixes + +[#5023](https://github.com/cylc/cylc-flow/pull/5023) - tasks force-triggered +after a shutdown was ordered should submit to run immediately on restart. + +[#5137](https://github.com/cylc/cylc-flow/pull/5137) - +Install the `ana/` directory to remote platforms by default. + +[#5146](https://github.com/cylc/cylc-flow/pull/5146) - no-flow tasks should not +retrigger incomplete children. + +[#5104](https://github.com/cylc/cylc-flow/pull/5104) - Fix retriggering of +failed tasks after a reload. + +[#5139](https://github.com/cylc/cylc-flow/pull/5139) - Fix bug where +`cylc install` could hang if there was a large uncommitted diff in the +source dir (for git/svn repos). +[#5131](https://github.com/cylc/cylc-flow/pull/5131) - Infer workflow run number +for `workflow_state` xtrigger. + +------------------------------------------------------------------------------- ## __cylc-8.1.0 (Upcoming)__ ### Enhancements @@ -42,15 +68,21 @@ numbers of Cylc Lint's style issues and allow users to ignore Cylc Lint issues using `--ignore `. ------------------------------------------------------------------------------- -## __cylc-8.0.2 (Upcoming)__ -Maintenance release. +## __cylc-8.0.3 (Upcoming)__ ### Fixes [#5125](https://github.com/cylc/cylc-flow/pull/5125) - Allow rose-suite.conf changes to be considered by ``cylc reinstall``. +------------------------------------------------------------------------------- +## __cylc-8.0.2 (Released 2022-09-12)__ + +Maintenance release. + +### Fixes + [#5115](https://github.com/cylc/cylc-flow/pull/5115) - Updates rsync commands to make them compatible with latest rsync releases. diff --git a/cylc/flow/__init__.py b/cylc/flow/__init__.py index 1227bf0a407..6dd2e8936ec 100644 --- a/cylc/flow/__init__.py +++ b/cylc/flow/__init__.py @@ -46,7 +46,7 @@ def environ_init(): environ_init() -__version__ = '8.1.0.dev' +__version__ = '8.0.3.dev' def iter_entry_points(entry_point_name): diff --git a/cylc/flow/cfgspec/globalcfg.py b/cylc/flow/cfgspec/globalcfg.py index e637434e8b0..f0223d155f3 100644 --- a/cylc/flow/cfgspec/globalcfg.py +++ b/cylc/flow/cfgspec/globalcfg.py @@ -1614,11 +1614,14 @@ def default_for( with Conf('localhost', meta=Platform, desc=''' A default platform for running jobs on the the scheduler host. - .. attention:: + This platform configures the host on which + :term:`schedulers ` run. By default this is the + host where ``cylc play`` is run, however, we often configure + Cylc to start schedulers on dedicated hosts by configuring + :cylc:conf:`global.cylc[scheduler][run hosts]available`. - It is common practice to start Cylc schedulers on dedicated - hosts, in which case **"localhost" is the scheduler host and - not necessarily where you ran "cylc play"**. + This platform affects connections made to the scheduler host and + any jobs run on it. .. versionadded:: 8.0.0 '''): @@ -1626,6 +1629,11 @@ def default_for( List of hosts for the localhost platform. You are unlikely to need to change this. + The scheduler hosts are configured by + :cylc:conf:`global.cylc[scheduler][run hosts]available`. + See :ref:`Submitting Workflows To a Pool Of Hosts` for + more information. + .. seealso:: :cylc:conf:`global.cylc[platforms][]hosts` diff --git a/cylc/flow/cfgspec/workflow.py b/cylc/flow/cfgspec/workflow.py index f8948588d1e..10d199d6832 100644 --- a/cylc/flow/cfgspec/workflow.py +++ b/cylc/flow/cfgspec/workflow.py @@ -222,10 +222,17 @@ def get_script_common_text(this: str, example: Optional[str] = None): The following directories already get installed by default: - * ``app/`` - * ``bin/`` - * ``etc/`` - * ``lib/`` + ``ana/`` + Rose ana analysis modules + ``app/`` + Rose applications + ``bin/`` + Cylc bin directory (added to ``PATH``) + ``etc/`` + Miscellaneous resources + ``lib/`` + Cylc lib directory (``lib/python`` added to ``PYTHONPATH`` + for workflow config) These should be located in the top level of your Cylc workflow, i.e. the directory that contains your ``flow.cylc`` file. diff --git a/cylc/flow/data_store_mgr.py b/cylc/flow/data_store_mgr.py index a53e9cdca4f..d5e4bd5acef 100644 --- a/cylc/flow/data_store_mgr.py +++ b/cylc/flow/data_store_mgr.py @@ -733,11 +733,18 @@ def _expand_graph_window( and e_id not in self.added[EDGES] and edge_distance <= self.n_edge_distance ): - self.added[EDGES][e_id] = PbEdge( - id=e_id, - source=s_tokens.id, - target=t_tokens.id - ) + if is_parent: + self.added[EDGES][e_id] = PbEdge( + id=e_id, + source=t_tokens.id, + target=s_tokens.id + ) + else: + self.added[EDGES][e_id] = PbEdge( + id=e_id, + source=s_tokens.id, + target=t_tokens.id + ) # Add edge id to node field for resolver reference self.updated[TASK_PROXIES].setdefault( t_tokens.id, diff --git a/cylc/flow/host_select.py b/cylc/flow/host_select.py index 6c8676197ac..3d98b0edc13 100644 --- a/cylc/flow/host_select.py +++ b/cylc/flow/host_select.py @@ -29,7 +29,7 @@ from cylc.flow.cfgspec.glbl_cfg import glbl_cfg from cylc.flow.exceptions import CylcError, HostSelectException from cylc.flow.hostuserutil import get_fqdn_by_host, is_remote_host -from cylc.flow.remote import _remote_cylc_cmd, run_cmd +from cylc.flow.remote import run_cmd, cylc_server_cmd from cylc.flow.terminal import parse_dirty_json @@ -533,7 +533,7 @@ def _get_metrics(hosts, metrics, data=None): } for host in hosts: if is_remote_host(host): - proc_map[host] = _remote_cylc_cmd(cmd, host=host, **kwargs) + proc_map[host] = cylc_server_cmd(cmd, host=host, **kwargs) else: proc_map[host] = run_cmd(['cylc'] + cmd, **kwargs) diff --git a/cylc/flow/install_plugins/log_vc_info.py b/cylc/flow/install_plugins/log_vc_info.py index 9be634c5b0f..1aec3eecd5d 100644 --- a/cylc/flow/install_plugins/log_vc_info.py +++ b/cylc/flow/install_plugins/log_vc_info.py @@ -62,7 +62,9 @@ import json from pathlib import Path from subprocess import Popen, DEVNULL, PIPE -from typing import Any, Dict, Iterable, List, Optional, TYPE_CHECKING, Union +from typing import ( + Any, Dict, Iterable, List, Optional, TYPE_CHECKING, TextIO, Union, overload +) from cylc.flow import LOG from cylc.flow.exceptions import CylcError @@ -80,8 +82,6 @@ GIT: ['describe', '--always', '--dirty'] } -# git ['show', '--quiet', '--format=short'], - STATUS_COMMANDS: Dict[str, List[str]] = { SVN: ['status', '--non-interactive'], GIT: ['status', '--short'] @@ -189,13 +189,40 @@ def get_vc_info(path: Union[Path, str]) -> Optional[Dict[str, Any]]: return None -def _run_cmd(vcs: str, args: Iterable[str], cwd: Union[Path, str]) -> str: - """Run a VCS command, return stdout. +@overload +def _run_cmd( + vcs: str, args: Iterable[str], cwd: Union[Path, str], stdout: int = PIPE +) -> str: + ... + + +@overload +def _run_cmd( + vcs: str, args: Iterable[str], cwd: Union[Path, str], stdout: TextIO +) -> None: + ... + + +def _run_cmd( + vcs: str, + args: Iterable[str], + cwd: Union[Path, str], + stdout: Union[TextIO, int] = PIPE +) -> Optional[str]: + """Run a VCS command. Args: vcs: The version control system. args: The args to pass to the version control command. cwd: Directory to run the command in. + stdout: Where to redirect output (either PIPE or a + text stream/file object). Note: only use PIPE for + commands that will not generate a large output, otherwise + the pipe might get blocked. + + Returns: + Stdout output if stdout=PIPE, else None as the output has been + written directly to the specified file. Raises: VCSNotInstalledError: The VCS is not found. @@ -208,7 +235,7 @@ def _run_cmd(vcs: str, args: Iterable[str], cwd: Union[Path, str]) -> str: cmd, cwd=cwd, stdin=DEVNULL, - stdout=PIPE, + stdout=stdout, stderr=PIPE, text=True, ) @@ -275,41 +302,40 @@ def _parse_svn_info(info_text: str) -> Dict[str, Any]: return ret -def get_diff(vcs: str, path: Union[Path, str]) -> Optional[str]: - """Return the diff of uncommitted changes for a repository. +def write_diff( + vcs: str, repo_path: Union[Path, str], run_dir: Union[Path, str] +) -> Path: + """Get and write the diff of uncommitted changes for a repository to the + workflow's vcs log dir. Args: vcs: The version control system. - path: The path to the repo. - """ - args_ = DIFF_COMMANDS[vcs] - if Path(path).is_absolute(): - args_.append(str(path)) - else: - args_.append(str(Path().cwd() / path)) - - try: - diff = _run_cmd(vcs, args_, cwd=path) - except (VCSNotInstalledError, VCSMissingBaseError): - return None - header = ( - "# Auto-generated diff of uncommitted changes in the Cylc " - "workflow repository:\n" - f"# {path}") - return f"{header}\n{diff}" - - -def write_diff(diff: str, run_dir: Union[Path, str]) -> None: - """Write a diff to the workflow's vcs log dir. - - Args: - diff: The diff. + repo_path: The path to the repo. run_dir: The workflow run directory. + + Returns the path to diff file. """ + args = DIFF_COMMANDS[vcs] + args.append( + str(repo_path) if Path(repo_path).is_absolute() else + str(Path().cwd() / repo_path) + ) + diff_file = Path(run_dir, LOG_VERSION_DIR, DIFF_FILENAME) diff_file.parent.mkdir(exist_ok=True) - with open(diff_file, 'w') as f: - f.write(diff) + + with open(diff_file, 'a') as f: + f.write( + "# Auto-generated diff of uncommitted changes in the Cylc " + "workflow repository:\n" + f"# {repo_path}\n" + ) + f.flush() + try: + _run_cmd(vcs, args, repo_path, stdout=f) + except VCSMissingBaseError as exc: + f.write(f"# No diff - {exc}") + return diff_file # Entry point: @@ -331,8 +357,6 @@ def main( if vc_info is None: return False vcs = vc_info['version control system'] - diff = get_diff(vcs, srcdir) write_vc_info(vc_info, rundir) - if diff is not None: - write_diff(diff, rundir) + write_diff(vcs, srcdir, rundir) return True diff --git a/cylc/flow/network/ssh_client.py b/cylc/flow/network/ssh_client.py index 7607512d99a..3b55b2258d7 100644 --- a/cylc/flow/network/ssh_client.py +++ b/cylc/flow/network/ssh_client.py @@ -23,7 +23,7 @@ from cylc.flow.exceptions import ClientError, ClientTimeout from cylc.flow.network.client_factory import CommsMeth from cylc.flow.network import get_location -from cylc.flow.remote import _remote_cylc_cmd +from cylc.flow.remote import remote_cylc_cmd from cylc.flow.workflow_files import load_contact_file, ContactFileFields @@ -60,15 +60,20 @@ async def async_request(self, command, args=None, timeout=None): try: async with ascyncto(timeout): cmd, ssh_cmd, login_sh, cylc_path, msg = self.prepare_command( - command, args, timeout) - proc = _remote_cylc_cmd( + command, args, timeout + ) + platform = { + 'ssh command': ssh_cmd, + 'cylc path': cylc_path, + 'use login shell': login_sh, + } + proc = remote_cylc_cmd( cmd, + platform, host=self.host, stdin_str=msg, - ssh_cmd=ssh_cmd, - remote_cylc_path=cylc_path, - ssh_login_shell=login_sh, - capture_process=True) + capture_process=True + ) while True: if proc.poll() is not None: break diff --git a/cylc/flow/parsec/jinja2support.py b/cylc/flow/parsec/jinja2support.py index a3c6e4a20b8..35ae56bd210 100644 --- a/cylc/flow/parsec/jinja2support.py +++ b/cylc/flow/parsec/jinja2support.py @@ -169,10 +169,12 @@ def jinja2environment(dir_=None): # | return str(value).rjust( int(length), str(fillchar) ) for namespace in ['filters', 'tests', 'globals']: nspdir = 'Jinja2' + namespace.capitalize() - fdirs = [ - os.path.join(dir_, nspdir), - os.path.join(os.environ['HOME'], '.cylc', nspdir) - ] + fdirs = [os.path.join(dir_, nspdir)] + try: + fdirs.append(os.path.join(os.environ['HOME'], '.cylc', nspdir)) + except KeyError: + # (Needed for tests/f/cylc-get-site-config/04-homeless.t!) + LOG.warning(f"$HOME undefined: can't load ~/.cylc/{nspdir}") for fdir in fdirs: if os.path.isdir(fdir): sys.path.insert(1, os.path.abspath(fdir)) diff --git a/cylc/flow/remote.py b/cylc/flow/remote.py index ccbb5558000..c9851101a6b 100644 --- a/cylc/flow/remote.py +++ b/cylc/flow/remote.py @@ -31,9 +31,10 @@ from typing import Any, Dict, List, Tuple import cylc.flow.flags -from cylc.flow import __version__ as CYLC_VERSION +from cylc.flow import __version__ as CYLC_VERSION, LOG from cylc.flow.option_parsers import verbosity_to_opts from cylc.flow.platforms import get_platform, get_host_from_platform +from cylc.flow.util import format_cmd def get_proc_ancestors(): @@ -121,6 +122,7 @@ def run_cmd( stdin = read try: + LOG.debug(f'running command:\n$ {format_cmd(command)}') proc = Popen( # nosec command, stdin=stdin, @@ -172,6 +174,15 @@ def get_includes_to_rsync(rsync_includes=None): '--no-t' ] +DEFAULT_INCLUDES = [ + '/ana/***', # Rose ana analysis modules + '/app/***', # Rose applications + '/bin/***', # Cylc bin directory (added to PATH) + '/etc/***', # Miscellaneous resources + '/lib/***', # Cylc lib directory (lib/python added to PYTHONPATH for + # workflow config) +] + def construct_rsync_over_ssh_cmd( src_path: str, dst_path: str, platform: Dict[str, Any], @@ -209,12 +220,7 @@ def construct_rsync_over_ssh_cmd( rsync_cmd.extend(rsync_options) for exclude in ['log', 'share', 'work']: rsync_cmd.append(f"--exclude={exclude}") - default_includes = [ - '/app/***', - '/bin/***', - '/etc/***', - '/lib/***'] - for include in default_includes: + for include in DEFAULT_INCLUDES: rsync_cmd.append(f"--include={include}") for include in get_includes_to_rsync(rsync_includes): rsync_cmd.append(f"--include={include}") @@ -226,54 +232,29 @@ def construct_rsync_over_ssh_cmd( def construct_ssh_cmd( - raw_cmd, platform, host, **kwargs -): - """Build an SSH command for execution on a remote platform. - - Constructs the SSH command according to the platform configuration. - - See _construct_ssh_cmd for argument documentation. - """ - return _construct_ssh_cmd( - raw_cmd, - host=host, - ssh_cmd=platform['ssh command'], - remote_cylc_path=platform['cylc path'], - ssh_login_shell=platform['use login shell'], - **kwargs - ) - - -def _construct_ssh_cmd( - raw_cmd, - host=None, - forward_x11=False, - stdin=False, - ssh_cmd=None, - ssh_login_shell=None, - remote_cylc_path=None, - set_UTC=False, - set_verbosity=False, - timeout=None + raw_cmd, + platform, + host, + forward_x11=False, + stdin=False, + set_UTC=False, + set_verbosity=False, + timeout=None, ): """Build an SSH command for execution on a remote platform hosts. Arguments: raw_cmd (list): primitive command to run remotely. + platform (dict): + The Cylc job "platform" to run the command on. This is used + to determine the settings used e.g. "ssh command". host (string): remote host name. Use 'localhost' if not specified. forward_x11 (boolean): If True, use 'ssh -Y' to enable X11 forwarding, else just 'ssh'. stdin: If None, the `-n` option will be added to the SSH command line. - ssh_cmd (string): - ssh command to use: If unset defaults to localhost ssh cmd. - ssh_login_shell (boolean): - If True, launch remote command with `bash -l -c 'exec "$0" "$@"'`. - remote_cylc_path (string): - Path containing the `cylc` executable. - This is required if the remote executable is not in $PATH. set_UTC (boolean): If True, check UTC mode and specify if set to True (non-default). set_verbosity (boolean): @@ -281,28 +262,20 @@ def _construct_ssh_cmd( timeout (str): String for bash timeout command. - Return: + Returns: list - A list containing a chosen command including all arguments and options necessary to directly execute the bare command on a given host via ssh. + """ - # If ssh cmd isn't given use the default from localhost settings. - if ssh_cmd is None: - command = shlex.split(get_platform()['ssh command']) - else: - command = shlex.split(ssh_cmd) + command = shlex.split(platform['ssh command']) if forward_x11: command.append('-Y') if stdin is None: command.append('-n') - user_at_host = '' - if host: - user_at_host += host - else: - user_at_host += 'localhost' - command.append(user_at_host) + command.append(host) # Pass CYLC_VERSION and optionally, CYLC_CONF_PATH & CYLC_UTC through. command += ['env', quote(r'CYLC_VERSION=%s' % CYLC_VERSION)] @@ -323,8 +296,7 @@ def _construct_ssh_cmd( command.append(quote(r'TZ=UTC')) # Use bash -l? - if ssh_login_shell is None: - ssh_login_shell = get_platform()['use login shell'] + ssh_login_shell = platform['use login shell'] if ssh_login_shell: # A login shell will always source /etc/profile and the user's bash # profile file. To avoid having to quote the entire remote command @@ -335,14 +307,11 @@ def _construct_ssh_cmd( command += ['timeout', timeout] # 'cylc' on the remote host - if not remote_cylc_path: - remote_cylc_path = get_platform()['cylc path'] - + remote_cylc_path = platform['cylc path'] if remote_cylc_path: cylc_cmd = str(Path(remote_cylc_path) / 'cylc') else: cylc_cmd = 'cylc' - command.append(cylc_cmd) # Insert core raw command after ssh, but before its own, command options. @@ -354,52 +323,63 @@ def _construct_ssh_cmd( return command -def remote_cylc_cmd(cmd, platform, bad_hosts=None, **kwargs): - """Execute a Cylc command on a remote platform. +def construct_cylc_server_ssh_cmd( + cmd, + host, + **kwargs, +): + """Convenience function to building SSH commands for remote Cylc servers. + + Build an SSH command that connects to the specified host using the + localhost platform config. - Uses the platform configuration to construct the command. + * To run commands on job platforms use construct_ssh_cmd. + * Use this interface to connect to: + * Cylc servers (i.e. `[scheduler][run hosts]available`). + * The host `cylc play` was run on, use this interface. - See _construct_ssh_cmd for argument documentation. + This assumes the host you are connecting to shares the $HOME filesystem + with the localhost platform. + + For arguments and returns see construct_ssh_cmd. """ - return _remote_cylc_cmd( + return construct_ssh_cmd( cmd, - host=get_host_from_platform(platform, bad_hosts=bad_hosts), - ssh_cmd=platform['ssh command'], - remote_cylc_path=platform['cylc path'], - ssh_login_shell=platform['use login shell'], - **kwargs + get_platform(), # use localhost settings + host, + **kwargs, ) -def _remote_cylc_cmd( - cmd, - host=None, - stdin=None, - stdin_str=None, - ssh_login_shell=None, - ssh_cmd=None, - remote_cylc_path=None, - capture_process=False, - manage=False +def remote_cylc_cmd( + cmd, + platform, + bad_hosts=None, + host=None, + stdin=None, + stdin_str=None, + ssh_login_shell=None, + ssh_cmd=None, + remote_cylc_path=None, + capture_process=False, + manage=False ): """Execute a Cylc command on a remote platform. - See run_cmd and _construct_ssh_cmd for argument documentation. - - Returns: - subprocess.Popen or int - If capture_process=True, return the Popen - object if created successfully. Otherwise, return the exit code of the - remote command. + Uses the provided platform configuration to construct the command. + For arguments and returns see construct_ssh_cmd and run_cmd. """ + if not host: + # no host selected => perform host selection from platform config + host = get_host_from_platform(platform, bad_hosts=bad_hosts) + return run_cmd( - _construct_ssh_cmd( + construct_ssh_cmd( cmd, + platform, host=host, stdin=True if stdin_str else stdin, - ssh_login_shell=ssh_login_shell, - ssh_cmd=ssh_cmd, - remote_cylc_path=remote_cylc_path ), stdin=stdin, stdin_str=stdin_str, @@ -407,3 +387,28 @@ def _remote_cylc_cmd( capture_status=True, manage=manage ) + + +def cylc_server_cmd(cmd, host=None, **kwargs): + """Convenience function for running commands on remote Cylc servers. + + Executes a Cylc command on the specified host using localhost platform + config. + + * To run commands on job platforms use remote_cylc_cmd. + * Use this interface to run commands on: + * Cylc servers (i.e. `[scheduler][run hosts]available`). + * The host `cylc play` was run on. + + Runs a command via SSH using the configuration for the localhost platform. + This assumes the host you are connecting to shares the $HOME filesystem + with the localhost platform. + + For arguments and returns see construct_ssh_cmd and run_cmd. + """ + return remote_cylc_cmd( + cmd, + get_platform(), # use localhost settings + host=host, + **kwargs, + ) diff --git a/cylc/flow/rundb.py b/cylc/flow/rundb.py index 4cb3c630638..42a4cca3e9a 100644 --- a/cylc/flow/rundb.py +++ b/cylc/flow/rundb.py @@ -297,6 +297,7 @@ class CylcWorkflowDAO: ["submit_num", {"datatype": "INTEGER"}], ["status"], ["flow_wait", {"datatype": "INTEGER"}], + ["is_manual_submit", {"datatype": "INTEGER"}], ], TABLE_TASK_TIMEOUT_TIMERS: [ ["cycle", {"is_primary_key": True}], @@ -802,14 +803,15 @@ def select_task_pool_for_restart(self, callback): """Select from task_pool+task_states+task_jobs for restart. Invoke callback(row_idx, row) on each row, where each row contains: - [cycle, name, is_late, status, is_held, submit_num, - try_num, platform_name, time_submit, time_run, timeout, outputs] + the fields in the SELECT statement below. """ form_stmt = r""" SELECT %(task_pool)s.cycle, %(task_pool)s.name, %(task_pool)s.flow_nums, + %(task_states)s.flow_wait, + %(task_states)s.is_manual_submit, %(task_late_flags)s.value, %(task_pool)s.status, %(task_pool)s.is_held, diff --git a/cylc/flow/scheduler_cli.py b/cylc/flow/scheduler_cli.py index 5091a1a1400..1caa0f85628 100644 --- a/cylc/flow/scheduler_cli.py +++ b/cylc/flow/scheduler_cli.py @@ -41,7 +41,7 @@ icp_option, ) from cylc.flow.pathutil import get_workflow_run_scheduler_log_path -from cylc.flow.remote import _remote_cylc_cmd +from cylc.flow.remote import cylc_server_cmd from cylc.flow.scheduler import Scheduler, SchedulerError from cylc.flow.scripts.common import cylc_header from cylc.flow.workflow_files import ( @@ -393,7 +393,7 @@ def _distribute(host, workflow_id_raw, workflow_id): cmd.append("--host=localhost") # Re-invoke the command - _remote_cylc_cmd(cmd, host=host) + cylc_server_cmd(cmd, host=host) sys.exit(0) diff --git a/cylc/flow/task_job_mgr.py b/cylc/flow/task_job_mgr.py index f5b727ebb7b..8b3acb9ce06 100644 --- a/cylc/flow/task_job_mgr.py +++ b/cylc/flow/task_job_mgr.py @@ -238,7 +238,6 @@ def prep_submit_task_jobs(self, workflow, itasks, check_syntax=True): itask.submit_num += 1 itask.state_reset(TASK_STATUS_PREPARING) self.data_store_mgr.delta_task_state(itask) - self.workflow_db_mgr.put_update_task_state(itask) prep_task = self._prep_submit_task_job( workflow, itask, check_syntax=check_syntax) if prep_task: @@ -767,7 +766,10 @@ def _manip_task_jobs_callback( tasks = {} for itask in itasks: while itask.reload_successor is not None: + # Note submit number could be incremented since reload. + subnum = itask.submit_num itask = itask.reload_successor + itask.submit_num = subnum if itask.point is not None and itask.submit_num: submit_num = "%02d" % (itask.submit_num) tasks[(str(itask.point), itask.tdef.name, submit_num)] = itask diff --git a/cylc/flow/task_pool.py b/cylc/flow/task_pool.py index 33bedecedd2..2582f0a508f 100644 --- a/cylc/flow/task_pool.py +++ b/cylc/flow/task_pool.py @@ -154,14 +154,10 @@ def stop_task_done(self): def _swap_out(self, itask): """Swap old task for new, during reload.""" - if itask.point in self.hidden_pool: - if itask.identity in self.hidden_pool[itask.point]: - self.hidden_pool[itask.point][itask.identity] = itask - self.hidden_pool_changed = True - elif ( - itask.point in self.main_pool - and itask.identity in self.main_pool[itask.point] - ): + if itask.identity in self.hidden_pool.get(itask.point, set()): + self.hidden_pool[itask.point][itask.identity] = itask + self.hidden_pool_changed = True + elif itask.identity in self.main_pool.get(itask.point, set()): self.main_pool[itask.point][itask.identity] = itask self.main_pool_changed = True @@ -178,13 +174,32 @@ def load_from_point(self): point = tdef.first_point(self.config.start_point) self.spawn_to_rh_limit(tdef, point, {flow_num}) - def add_to_pool(self, itask, is_new: bool = True) -> None: + def db_add_new_flow_rows(self, itask: TaskProxy) -> None: + """Add new rows to DB task tables that record flow_nums. + + Call when a new task is spawned or a flow merge occurs. + """ + # Add row to task_states table. + now = get_current_time_string() + self.workflow_db_mgr.put_insert_task_states( + itask, + { + "time_created": now, + "time_updated": now, + "status": itask.state.status, + "flow_nums": serialise(itask.flow_nums), + "flow_wait": itask.flow_wait, + "is_manual_submit": itask.is_manual_submit + } + ) + # Add row to task_outputs table: + self.workflow_db_mgr.put_insert_task_outputs(itask) + + def add_to_pool(self, itask) -> None: """Add a task to the hidden (if not satisfied) or main task pool. If the task already exists in the hidden pool and is satisfied, move it to the main pool. - - (is_new is False inidcates load from DB at restart). """ if itask.is_task_prereqs_not_done() and not itask.is_manual_submit: # Add to hidden pool if not satisfied. @@ -210,21 +225,6 @@ def add_to_pool(self, itask, is_new: bool = True) -> None: self.create_data_store_elements(itask) - if is_new: - # Add row to "task_states" table. - now = get_current_time_string() - self.workflow_db_mgr.put_insert_task_states( - itask, - { - "time_created": now, - "time_updated": now, - "status": itask.state.status, - "flow_nums": serialise(itask.flow_nums) - } - ) - # Add row to "task_outputs" table: - self.workflow_db_mgr.put_insert_task_outputs(itask) - if itask.tdef.max_future_prereq_offset is not None: # (Must do this once added to the pool). self.set_max_future_offset() @@ -421,9 +421,9 @@ def load_db_task_pool_for_restart(self, row_idx, row): if row_idx == 0: LOG.info("LOADING task proxies") # Create a task proxy corresponding to this DB entry. - (cycle, name, flow_nums, is_late, status, is_held, submit_num, _, - platform_name, time_submit, time_run, timeout, outputs_str) = row - + (cycle, name, flow_nums, flow_wait, is_manual_submit, is_late, status, + is_held, submit_num, _, platform_name, time_submit, time_run, timeout, + outputs_str) = row try: itask = TaskProxy( self.config.get_taskdef(name), @@ -432,7 +432,9 @@ def load_db_task_pool_for_restart(self, row_idx, row): status=status, is_held=is_held, submit_num=submit_num, - is_late=bool(is_late) + is_late=bool(is_late), + flow_wait=bool(flow_wait), + is_manual_submit=bool(is_manual_submit) ) except WorkflowConfigError: LOG.exception( @@ -496,7 +498,7 @@ def load_db_task_pool_for_restart(self, row_idx, row): if itask.state_reset(status, is_runahead=True): self.data_store_mgr.delta_task_runahead(itask) - self.add_to_pool(itask, is_new=False) + self.add_to_pool(itask) # All tasks load as runahead-limited, but finished and manually # triggered tasks (incl. --start-task's) can be released now. @@ -633,8 +635,9 @@ def _get_spawned_or_merged_task( def spawn_to_rh_limit(self, tdef, point, flow_nums) -> None: """Spawn parentless task instances from point to runahead limit.""" - if not flow_nums: - # force-triggered no-flow task. + if not flow_nums or point is None: + # Force-triggered no-flow task. + # Or called with an invalid next_point. return if self.runahead_limit_point is None: self.compute_runahead() @@ -1210,14 +1213,6 @@ def spawn_on_output(self, itask, output, forced=False): if c_task is not None and c_task != itask: # (Avoid self-suicide: A => !A) self.merge_flows(c_task, itask.flow_nums) - self.workflow_db_mgr.put_insert_task_states( - c_task, - { - "status": c_task.state.status, - "flow_nums": serialise(c_task.flow_nums) - } - ) - # self.workflow_db_mgr.process_queued_ops() elif ( c_task is None and (itask.flow_nums or forced) @@ -1492,6 +1487,7 @@ def spawn_task( return None LOG.info(f"[{itask}] spawned") + self.db_add_new_flow_rows(itask) return itask def force_spawn_children( @@ -1597,7 +1593,6 @@ def force_trigger_tasks( ) if itask is None: continue - self.add_to_pool(itask, is_new=True) itasks.append(itask) # Trigger matched tasks if not already active. @@ -1625,7 +1620,6 @@ def force_trigger_tasks( # De-queue it to run now. self.task_queue_mgr.force_release_task(itask) - self.workflow_db_mgr.put_update_task_state(itask) return len(unmatched) def sim_time_check(self, message_queue): @@ -1929,31 +1923,35 @@ def merge_flows(self, itask: TaskProxy, flow_nums: 'FlowNums') -> None: This also performs required spawning / state changing for edge cases. """ - if flow_nums == itask.flow_nums: - # Don't do anything if trying to spawn the same task in the same - # flow. This arises downstream of an AND trigger (if "A & B => C" + if not flow_nums or (flow_nums == itask.flow_nums): + # Don't do anything if: + # 1. merging from a no-flow task, or + # 2. trying to spawn the same task in the same flow. This arises + # downstream of an AND trigger (if "A & B => C" # and A spawns C first, B will find C is already in the pool), # and via suicide triggers ("A =>!A": A tries to spawn itself). return + merge_with_no_flow = not itask.flow_nums + + itask.merge_flows(flow_nums) + # Merged tasks get a new row in the db task_states table. + self.db_add_new_flow_rows(itask) + if ( itask.state(*TASK_STATUSES_FINAL) and itask.state.outputs.get_incomplete() ): # Re-queue incomplete task to run again in the merged flow. LOG.info(f"[{itask}] incomplete task absorbed by new flow.") - itask.merge_flows(flow_nums) itask.state_reset(TASK_STATUS_WAITING) self.queue_task(itask) self.data_store_mgr.delta_task_state(itask) - elif not itask.flow_nums or itask.flow_wait: + elif merge_with_no_flow or itask.flow_wait: # 2. Retro-spawn on completed outputs and continue as merged flow. LOG.info(f"[{itask}] spawning on pre-merge outputs") - itask.merge_flows(flow_nums) itask.flow_wait = False self.spawn_on_all_outputs(itask, completed_only=True) self.spawn_to_rh_limit( itask.tdef, itask.next_point(), itask.flow_nums) - else: - itask.merge_flows(flow_nums) diff --git a/cylc/flow/workflow_db_mgr.py b/cylc/flow/workflow_db_mgr.py index e8f82a27669..18265d67b79 100644 --- a/cylc/flow/workflow_db_mgr.py +++ b/cylc/flow/workflow_db_mgr.py @@ -431,14 +431,15 @@ def put_xtriggers(self, sat_xtrig): def put_update_task_state(self, itask): """Update task_states table for current state of itask. - For final event-driven update before removing finished tasks. - No need to update task_pool table as finished tasks are immediately - removed from the pool. + NOTE the task_states table is normally updated along with the task pool + table. This method is only needed as a final update for finished tasks, + when they get removed from the task_pool. """ set_args = { "time_updated": itask.state.time_updated, "status": itask.state.status, - "flow_wait": itask.flow_wait + "flow_wait": itask.flow_wait, + "is_manual_submit": itask.is_manual_submit } where_args = { "cycle": str(itask.point), @@ -451,10 +452,15 @@ def put_update_task_state(self, itask): (set_args, where_args)) def put_task_pool(self, pool: 'TaskPool') -> None: - """Update various task tables for current pool, in runtime database. + """Delete task pool table content and recreate from current task pool. - Queue delete (everything) statements to wipe the tables, and queue the - relevant insert statements for the current tasks in the pool. + Also recreate: + - prerequisites table + - timeout timers table + - action timers table + + And update: + - task states table """ self.db_deletes_map[self.TABLE_TASK_POOL].append({}) # Comment this out to retain the trigger-time prereq status of past diff --git a/cylc/flow/workflow_files.py b/cylc/flow/workflow_files.py index 2cf15996c46..e4e9acfe1a1 100644 --- a/cylc/flow/workflow_files.py +++ b/cylc/flow/workflow_files.py @@ -78,7 +78,7 @@ ) from cylc.flow.remote import ( DEFAULT_RSYNC_OPTS, - _construct_ssh_cmd, + construct_cylc_server_ssh_cmd, construct_ssh_cmd, ) from cylc.flow.terminal import parse_dirty_json @@ -406,7 +406,7 @@ def _is_process_running( metric = f'[["Process", {pid}]]' if is_remote_host(host): cmd = ['psutil'] - cmd = _construct_ssh_cmd(cmd, host) + cmd = construct_cylc_server_ssh_cmd(cmd, host) else: cmd = ['cylc', 'psutil'] proc = Popen( # nosec @@ -1087,9 +1087,11 @@ def _remote_clean_cmd( for item in rm_dirs: cmd.extend(['--rm', item]) cmd = construct_ssh_cmd( - cmd, platform, + cmd, + platform, get_host_from_platform(platform), - timeout=timeout, set_verbosity=True + timeout=timeout, + set_verbosity=True, ) LOG.debug(" ".join(cmd)) return Popen( # nosec diff --git a/cylc/flow/xtriggers/workflow_state.py b/cylc/flow/xtriggers/workflow_state.py index 1a6ca10f31b..4620c02104a 100644 --- a/cylc/flow/xtriggers/workflow_state.py +++ b/cylc/flow/xtriggers/workflow_state.py @@ -14,41 +14,52 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see . +from pathlib import Path import sqlite3 +from typing import Dict, Optional, Tuple + +from metomi.isodatetime.parsers import TimePointParser from cylc.flow.cycling.util import add_offset from cylc.flow.dbstatecheck import CylcWorkflowDBChecker from cylc.flow.pathutil import expand_path, get_cylc_run_dir -from metomi.isodatetime.parsers import TimePointParser +from cylc.flow.workflow_files import infer_latest_run -def workflow_state(workflow, task, point, offset=None, status='succeeded', - message=None, cylc_run_dir=None): +def workflow_state( + workflow: str, + task: str, + point: str, + offset: Optional[str] = None, + status: str = 'succeeded', + message: Optional[str] = None, + cylc_run_dir: Optional[str] = None +) -> Tuple[bool, Optional[Dict[str, Optional[str]]]]: """Connect to a workflow DB and query the requested task state. * Reports satisfied only if the remote workflow state has been achieved. * Returns all workflow state args to pass on to triggering tasks. Arguments: - workflow (str): + workflow: The workflow to interrogate. - task (str): + task: The name of the task to query. - point (str): + point: The cycle point. - offset (str): + offset: The offset between the cycle this xtrigger is used in and the one it is querying for as an ISO8601 time duration. e.g. PT1H (one hour). - status (str): + status: The task status required for this xtrigger to be satisfied. - message (str): + message: The custom task output required for this xtrigger to be satisfied. .. note:: This cannot be specified in conjunction with ``status``. - cylc_run_dir (str): + cylc_run_dir: The directory in which the workflow to interrogate. .. note:: @@ -60,9 +71,9 @@ def workflow_state(workflow, task, point, offset=None, status='succeeded', Returns: tuple: (satisfied, results) - satisfied (bool): + satisfied: True if ``satisfied`` else ``False``. - results (dict): + results: Dictionary containing the args / kwargs which were provided to this xtrigger. @@ -73,6 +84,7 @@ def workflow_state(workflow, task, point, offset=None, status='succeeded', cylc_run_dir = get_cylc_run_dir() if offset is not None: point = str(add_offset(point, offset)) + _, workflow = infer_latest_run(Path(cylc_run_dir, workflow)) try: checker = CylcWorkflowDBChecker(cylc_run_dir, workflow) except (OSError, sqlite3.Error): diff --git a/tests/flakyfunctional/database/00-simple/schema.out b/tests/flakyfunctional/database/00-simple/schema.out index ac62d48639d..814faac2a59 100644 --- a/tests/flakyfunctional/database/00-simple/schema.out +++ b/tests/flakyfunctional/database/00-simple/schema.out @@ -10,7 +10,7 @@ CREATE TABLE task_late_flags(cycle TEXT, name TEXT, value INTEGER, PRIMARY KEY(c CREATE TABLE task_outputs(cycle TEXT, name TEXT, flow_nums TEXT, outputs TEXT, PRIMARY KEY(cycle, name, flow_nums)); CREATE TABLE task_pool(cycle TEXT, name TEXT, flow_nums TEXT, status TEXT, is_held INTEGER, PRIMARY KEY(cycle, name, flow_nums)); CREATE TABLE task_prerequisites(cycle TEXT, name TEXT, flow_nums TEXT, prereq_name TEXT, prereq_cycle TEXT, prereq_output TEXT, satisfied TEXT, PRIMARY KEY(cycle, name, flow_nums, prereq_name, prereq_cycle, prereq_output)); -CREATE TABLE task_states(name TEXT, cycle TEXT, flow_nums TEXT, time_created TEXT, time_updated TEXT, submit_num INTEGER, status TEXT, flow_wait INTEGER, PRIMARY KEY(name, cycle, flow_nums)); +CREATE TABLE task_states(name TEXT, cycle TEXT, flow_nums TEXT, time_created TEXT, time_updated TEXT, submit_num INTEGER, status TEXT, flow_wait INTEGER, is_manual_submit INTEGER, PRIMARY KEY(name, cycle, flow_nums)); CREATE TABLE task_timeout_timers(cycle TEXT, name TEXT, timeout REAL, PRIMARY KEY(cycle, name)); CREATE TABLE tasks_to_hold(name TEXT, cycle TEXT); CREATE TABLE workflow_flows(flow_num INTEGER, start_time TEXT, description TEXT, PRIMARY KEY(flow_num)); diff --git a/tests/functional/cylc-get-site-config/04-homeless.t b/tests/functional/cylc-get-site-config/04-homeless.t index 0afa2abe062..1ad973f8191 100644 --- a/tests/functional/cylc-get-site-config/04-homeless.t +++ b/tests/functional/cylc-get-site-config/04-homeless.t @@ -15,10 +15,12 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see . -# Test absence of HOME env var (https://github.com/cylc/cylc-flow/pull/2895) +# Check undefined $HOME does not break: +# a) use of $HOME in global config (GitHub #2895) +# b) global config Jinja2 support (GitHub #5155) . "$(dirname "$0")/test_header" -set_test_number 3 +set_test_number 5 # shellcheck disable=SC2016 create_test_global_config '' ' @@ -27,10 +29,15 @@ create_test_global_config '' ' [[[localhost]]] run = $HOME/dr-malcolm ' - run_ok "${TEST_NAME_BASE}" \ env -u HOME \ cylc config --item='[install][symlink dirs][localhost]run' + cmp_ok "${TEST_NAME_BASE}.stdout" <<<"\$HOME/dr-malcolm" -cmp_ok "${TEST_NAME_BASE}.stderr" <'/dev/null' -exit + +# The test global config is created with #!Jinja2 at the top, in case of any +# Jinja2 code in global-tests.cylc. Parsec Jinja2 support uses $HOME to find +# custom filters etc. GitHub #5155. +for DIR in Filters Tests Globals; do + grep_ok "\$HOME undefined: can't load ~/.cylc/Jinja2$DIR" "${TEST_NAME_BASE}.stderr" +done diff --git a/tests/functional/flow-triggers/11-wait-merge.t b/tests/functional/flow-triggers/11-wait-merge.t index 25b6443776a..cb3218ae463 100644 --- a/tests/functional/flow-triggers/11-wait-merge.t +++ b/tests/functional/flow-triggers/11-wait-merge.t @@ -34,8 +34,8 @@ cmp_ok "${TEST_NAME}.stdout" <<\__END__ 1|b|[1]|["submitted", "started", "succeeded"] 1|a|[2]|["submitted", "started", "succeeded"] 1|c|[2]|["submitted", "started", "x"] -1|x|[1, 2]|["submitted", "started", "succeeded"] 1|c|[1, 2]|["submitted", "started", "succeeded", "x"] +1|x|[1, 2]|["submitted", "started", "succeeded"] 1|d|[1, 2]|["submitted", "started", "succeeded"] 1|b|[2]|["submitted", "started", "succeeded"] __END__ diff --git a/tests/functional/flow-triggers/13-noflow-nomerge.t b/tests/functional/flow-triggers/13-noflow-nomerge.t new file mode 100644 index 00000000000..c8b4528a2f9 --- /dev/null +++ b/tests/functional/flow-triggers/13-noflow-nomerge.t @@ -0,0 +1,47 @@ +#!/usr/bin/env bash +# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE. +# Copyright (C) NIWA & British Crown (Met Office) & Contributors. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +#------------------------------------------------------------------------------- + +. "$(dirname "$0")/test_header" +set_test_number 7 + +install_workflow "${TEST_NAME_BASE}" "${TEST_NAME_BASE}" +DB="${WORKFLOW_RUN_DIR}/log/db" + +run_ok "${TEST_NAME_BASE}-validate" cylc validate "${WORKFLOW_NAME}" +run_ok "${TEST_NAME_BASE}-run" cylc play "${WORKFLOW_NAME}" +poll_grep_workflow_log "Workflow stalled" + +run_ok "${TEST_NAME_BASE}-trigger" cylc trigger --flow=none "${WORKFLOW_NAME}//1/a" +poll_grep_workflow_log -E "1/a running job:02 flows:none.*=> succeeded" + +cylc stop --now --now --max-polls=5 --interval=2 "$WORKFLOW_NAME" + +TEST_NAME="${TEST_NAME_BASE}-count" +QUERY="SELECT COUNT(*) FROM task_states WHERE name=='a'" +run_ok "${TEST_NAME}" sqlite3 "${DB}" "$QUERY" +cmp_ok "${TEST_NAME}.stdout" <<__END__ +2 +__END__ + +QUERY="SELECT COUNT(*) FROM task_states WHERE name=='b'" +run_ok "${TEST_NAME}" sqlite3 "${DB}" "$QUERY" +cmp_ok "${TEST_NAME}.stdout" <<__END__ +1 +__END__ + +purge diff --git a/tests/functional/flow-triggers/13-noflow-nomerge/flow.cylc b/tests/functional/flow-triggers/13-noflow-nomerge/flow.cylc new file mode 100644 index 00000000000..9b7e2b90a25 --- /dev/null +++ b/tests/functional/flow-triggers/13-noflow-nomerge/flow.cylc @@ -0,0 +1,7 @@ +[scheduling] + [[graph]] + R1 = "a => b" +[runtime] + [[a]] + [[b]] + script = false # die as incomplete diff --git a/tests/functional/job-submission/01-job-nn-localhost/db.sqlite3 b/tests/functional/job-submission/01-job-nn-localhost/db.sqlite3 index 9453b6960e3..15d9e84418f 100644 --- a/tests/functional/job-submission/01-job-nn-localhost/db.sqlite3 +++ b/tests/functional/job-submission/01-job-nn-localhost/db.sqlite3 @@ -18,8 +18,8 @@ CREATE TABLE task_late_flags(cycle TEXT, name TEXT, value INTEGER, PRIMARY KEY(c CREATE TABLE task_outputs(cycle TEXT, name TEXT, flow_nums TEXT, outputs TEXT, PRIMARY KEY(cycle, name, flow_nums)); CREATE TABLE task_pool(cycle TEXT, name TEXT, flow_nums TEXT, status TEXT, is_held INTEGER, PRIMARY KEY(cycle, name, flow_nums)); INSERT INTO task_pool VALUES('1','foo','["1", "2"]','waiting', 0); -CREATE TABLE task_states(name TEXT, cycle TEXT, flow_nums TEXT, time_created TEXT, time_updated TEXT, submit_num INTEGER, status TEXT, flow_wait INTEGER, PRIMARY KEY(name, cycle, flow_nums)); -INSERT INTO task_states VALUES('foo','1','["1", "2"]', '2019-06-14T11:30:16+01:00','2019-06-14T11:40:24+01:00',99,'waiting','0'); +CREATE TABLE task_states(name TEXT, cycle TEXT, flow_nums TEXT, time_created TEXT, time_updated TEXT, submit_num INTEGER, status TEXT, flow_wait INTEGER, is_manual_submit INTEGER, PRIMARY KEY(name, cycle, flow_nums)); +INSERT INTO task_states VALUES('foo','1','["1", "2"]', '2019-06-14T11:30:16+01:00','2019-06-14T11:40:24+01:00',99,'waiting','0', '0'); CREATE TABLE task_prerequisites(cycle TEXT, name TEXT, flow_nums TEXT, prereq_name TEXT, prereq_cycle TEXT, prereq_output TEXT, satisfied TEXT, PRIMARY KEY(cycle, name, flow_nums, prereq_name, prereq_cycle, prereq_output)); CREATE TABLE task_timeout_timers(cycle TEXT, name TEXT, timeout REAL, PRIMARY KEY(cycle, name)); CREATE TABLE xtriggers(signature TEXT, results TEXT, PRIMARY KEY(signature)); diff --git a/tests/functional/lib/bash/test_header b/tests/functional/lib/bash/test_header index 263545aae46..8a2b74b9d1e 100644 --- a/tests/functional/lib/bash/test_header +++ b/tests/functional/lib/bash/test_header @@ -143,8 +143,9 @@ # Expect 1 OK test. # create_test_global_config [PRE [POST]] # Create a new global config file $PWD/etc from global-tests.cylc -# with PRE and POST pre- and ap-pended (PRE for e.g. jinja2 shebang). +# with PRE and POST pre- and ap-pended. # PRE and POST are strings. +# The global config starts with #!Jinja2 in case Jinja2 is used. # localhost_fqdn # Get the FQDN of the current host using the same mechanism Cylc uses. # get_fqdn [TARGET] @@ -902,8 +903,9 @@ create_test_global_config() { # Tidy in case of previous use of this function. rm -fr 'etc' mkdir 'etc' - # Scheduler host self-identification method. - echo "$PRE" >'etc/global.cylc' + # Start with Jinja2 hashbang. Harmless if not needed. + echo "#!Jinja2" >'etc/global.cylc' + echo "$PRE" >>'etc/global.cylc' # add defaults cat >>'etc/global.cylc' <<__HERE__ # set a default timeout for all flow runs to avoid hanging tests diff --git a/tests/functional/reload/27-stall-retrigger.t b/tests/functional/reload/27-stall-retrigger.t new file mode 100644 index 00000000000..29771920f21 --- /dev/null +++ b/tests/functional/reload/27-stall-retrigger.t @@ -0,0 +1,26 @@ +#!/usr/bin/env bash +# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE. +# Copyright (C) NIWA & British Crown (Met Office) & Contributors. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +#------------------------------------------------------------------------------- + +# Test retriggering a failed task after fixing the bug and reloading. +# It should run correctly with the updated settings. + +# https://github.com/cylc/cylc-flow/issues/5103 + +. "$(dirname "$0")/test_header" +set_test_number 2 +reftest diff --git a/tests/functional/reload/27-stall-retrigger/bin/stall-handler.sh b/tests/functional/reload/27-stall-retrigger/bin/stall-handler.sh new file mode 100755 index 00000000000..f9c14d60db2 --- /dev/null +++ b/tests/functional/reload/27-stall-retrigger/bin/stall-handler.sh @@ -0,0 +1,14 @@ +#!/bin/bash + +# Change "script = false" -> "true" in 1/foo, then reload and retrigger it. + +if grep "\[command\] reload_workflow" "${CYLC_WORKFLOW_LOG_DIR}/log" >/dev/null; then + # Abort if not the first call (avoid an endless loop if the reload does not + # have the intended effect). + >&2 echo "ERROR (stall-handler.sh): should only be called once" + cylc stop --now --now "${CYLC_WORKFLOW_ID}" + exit 1 +fi +sed -i "s/false/true/" "${CYLC_WORKFLOW_RUN_DIR}/suite.rc" +cylc reload "${CYLC_WORKFLOW_ID}" +cylc trigger "${CYLC_WORKFLOW_ID}//1/foo" diff --git a/tests/functional/reload/27-stall-retrigger/reference.log b/tests/functional/reload/27-stall-retrigger/reference.log new file mode 100644 index 00000000000..63d91f5f612 --- /dev/null +++ b/tests/functional/reload/27-stall-retrigger/reference.log @@ -0,0 +1,3 @@ +1/foo -triggered off [] in flow 1 +1/foo -triggered off [] in flow 1 +1/bar -triggered off ['1/foo'] in flow 1 diff --git a/tests/functional/reload/27-stall-retrigger/suite.rc b/tests/functional/reload/27-stall-retrigger/suite.rc new file mode 100644 index 00000000000..ac90faffa80 --- /dev/null +++ b/tests/functional/reload/27-stall-retrigger/suite.rc @@ -0,0 +1,13 @@ +# Use a stall handler to fix and reload the workflow config, then retrigger the +# failed task, which should run successfully with the new settings. +[scheduler] + [[events]] + stall handlers = stall-handler.sh + expected task failures = 1/foo +[scheduling] + [[graph]] + R1 = "foo => bar" +[runtime] + [[foo]] + script = false + [[bar]] diff --git a/tests/functional/remote/01-file-install.t b/tests/functional/remote/01-file-install.t index e673b7c7c9d..9f89c49f79a 100644 --- a/tests/functional/remote/01-file-install.t +++ b/tests/functional/remote/01-file-install.t @@ -23,7 +23,7 @@ set_test_number 6 create_files () { # dump some files into the run dir - for DIR in "bin" "app" "etc" "lib" "dir1" "dir2" + for DIR in "bin" "ana" "app" "etc" "lib" "dir1" "dir2" do mkdir -p "${WORKFLOW_RUN_DIR}/${DIR}" touch "${WORKFLOW_RUN_DIR}/${DIR}/moo" @@ -35,7 +35,7 @@ create_files () { } # Test configured files/directories along with default files/directories -# (app, bin, etc, lib) are correctly installed on the remote platform. +# (ana, app, bin, etc, lib) are correctly installed on the remote platform. TEST_NAME="${TEST_NAME_BASE}-default-paths" init_workflow "${TEST_NAME}" <<__FLOW_CONFIG__ [scheduling] @@ -59,8 +59,9 @@ workflow_run_ok "${TEST_NAME}-run1" cylc play "${WORKFLOW_NAME}" \ # ensure these files get installed on the remote platform SSH="$(cylc config -d -i "[platforms][$CYLC_TEST_PLATFORM]ssh command")" ${SSH} "${CYLC_TEST_HOST}" \ - find "${RUN_DIR_REL}/"{app,bin,etc,lib} -type f | sort > 'find.out' + find "${RUN_DIR_REL}/"{ana,app,bin,etc,lib} -type f | sort > 'find.out' cmp_ok 'find.out' <<__OUT__ +${RUN_DIR_REL}/ana/moo ${RUN_DIR_REL}/app/moo ${RUN_DIR_REL}/bin/moo ${RUN_DIR_REL}/etc/moo @@ -93,8 +94,9 @@ workflow_run_ok "${TEST_NAME}-run2" cylc play "${WORKFLOW_NAME}" \ -s "CYLC_TEST_PLATFORM='${CYLC_TEST_PLATFORM}'" ${SSH} "${CYLC_TEST_HOST}" \ - find "${RUN_DIR_REL}/"{app,bin,dir1,dir2,file1,file2,etc,lib} -type f | sort > 'find.out' + find "${RUN_DIR_REL}/"{ana,app,bin,dir1,dir2,file1,file2,etc,lib} -type f | sort > 'find.out' cmp_ok 'find.out' <<__OUT__ +${RUN_DIR_REL}/ana/moo ${RUN_DIR_REL}/app/moo ${RUN_DIR_REL}/bin/moo ${RUN_DIR_REL}/dir1/moo diff --git a/tests/functional/restart/57-ghost-job/db.sqlite3 b/tests/functional/restart/57-ghost-job/db.sqlite3 index d6837d6bd0c..4230831602f 100644 --- a/tests/functional/restart/57-ghost-job/db.sqlite3 +++ b/tests/functional/restart/57-ghost-job/db.sqlite3 @@ -19,8 +19,8 @@ INSERT INTO task_outputs VALUES('1','foo','[1]','[]'); CREATE TABLE task_pool(cycle TEXT, name TEXT, flow_nums TEXT, status TEXT, is_held INTEGER, PRIMARY KEY(cycle, name, flow_nums)); INSERT INTO task_pool VALUES('1','foo','[1]','preparing',0); CREATE TABLE task_prerequisites(cycle TEXT, name TEXT, flow_nums TEXT, prereq_name TEXT, prereq_cycle TEXT, prereq_output TEXT, satisfied TEXT, PRIMARY KEY(cycle, name, flow_nums, prereq_name, prereq_cycle, prereq_output)); -CREATE TABLE task_states(name TEXT, cycle TEXT, flow_nums TEXT, time_created TEXT, time_updated TEXT, submit_num INTEGER, status TEXT, flow_wait INTEGER, PRIMARY KEY(name, cycle, flow_nums)); -INSERT INTO task_states VALUES('foo','1','[1]','2022-07-25T16:18:23+01:00','2022-07-25T16:18:23+01:00',1,'preparing',NULL); +CREATE TABLE task_states(name TEXT, cycle TEXT, flow_nums TEXT, time_created TEXT, time_updated TEXT, submit_num INTEGER, status TEXT, flow_wait INTEGER, is_manual_submit INTEGER, PRIMARY KEY(name, cycle, flow_nums)); +INSERT INTO task_states VALUES('foo','1','[1]','2022-07-25T16:18:23+01:00','2022-07-25T16:18:23+01:00',1,'preparing',NULL, '0'); CREATE TABLE task_timeout_timers(cycle TEXT, name TEXT, timeout REAL, PRIMARY KEY(cycle, name)); CREATE TABLE tasks_to_hold(name TEXT, cycle TEXT); CREATE TABLE workflow_flows(flow_num INTEGER, start_time TEXT, description TEXT, PRIMARY KEY(flow_num)); diff --git a/tests/functional/restart/58-waiting-manual-triggered.t b/tests/functional/restart/58-waiting-manual-triggered.t new file mode 100644 index 00000000000..efba9f42b70 --- /dev/null +++ b/tests/functional/restart/58-waiting-manual-triggered.t @@ -0,0 +1,47 @@ +#!/bin/bash +# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE. +# Copyright (C) NIWA & British Crown (Met Office) & Contributors. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +#------------------------------------------------------------------------------- +# Test that a task manually triggered just before shutdown will run on restart. + +. "$(dirname "$0")/test_header" + +set_test_number 6 + +install_workflow "${TEST_NAME_BASE}" "${TEST_NAME_BASE}" + +run_ok "${TEST_NAME_BASE}-validate" cylc validate "${WORKFLOW_NAME}" + +workflow_run_ok "${TEST_NAME_BASE}-run" cylc play --no-detach "${WORKFLOW_NAME}" + +DB_FILE="${WORKFLOW_RUN_DIR}/log/db" + +# It should have shut down with 2/foo waiting with the is_manual_submit flag on. +TEST_NAME="${TEST_NAME_BASE}-db-task-states" +QUERY='SELECT status, is_manual_submit FROM task_states WHERE cycle IS 2;' +run_ok "$TEST_NAME" sqlite3 "$DB_FILE" "$QUERY" +cmp_ok "${TEST_NAME}.stdout" << '__EOF__' +waiting|1 +__EOF__ + +# It should restart and shut down normally, not stall with 2/foo waiting on 1/foo. +workflow_run_ok "${TEST_NAME_BASE}-restart" cylc play --no-detach "${WORKFLOW_NAME}" +# Check that 2/foo job 02 did run before shutdown. +grep_workflow_log_ok "${TEST_NAME_BASE}-grep" "\[2\/foo running job:02 flows:1\] => succeeded" + +purge +exit diff --git a/tests/functional/restart/58-waiting-manual-triggered/flow.cylc b/tests/functional/restart/58-waiting-manual-triggered/flow.cylc new file mode 100644 index 00000000000..ea5f47c46d7 --- /dev/null +++ b/tests/functional/restart/58-waiting-manual-triggered/flow.cylc @@ -0,0 +1,22 @@ +[scheduler] + [[events]] + stall timeout = PT0S + abort on stall timeout = True +[scheduling] + cycling mode = integer + runahead limit = P1 + final cycle point = 3 + [[graph]] + P1 = foo[-P1] => foo +[runtime] + [[foo]] + script = """ + if (( CYLC_TASK_CYCLE_POINT == 3 )); then + # Order a normal shutdown: no more job submissions, and shut + # down after active jobs (i.e. this one) finish. + cylc stop "$CYLC_WORKFLOW_ID" + # Force-trigger 2/foo before shutdown. On restart it should be + # in the waiting state with the force-triggered flag set. + cylc trigger "${CYLC_WORKFLOW_ID}//2/foo" + fi + """ diff --git a/tests/unit/post_install/test_log_vc_info.py b/tests/unit/post_install/test_log_vc_info.py index cddb8012997..58e16e241b5 100644 --- a/tests/unit/post_install/test_log_vc_info.py +++ b/tests/unit/post_install/test_log_vc_info.py @@ -26,13 +26,15 @@ from cylc.flow.install_plugins.log_vc_info import ( INFO_FILENAME, LOG_VERSION_DIR, - get_diff, _get_git_commit, get_status, get_vc_info, main, + write_diff, ) +from cylc.flow.workflow_files import WorkflowFiles + Fixture = Any @@ -161,12 +163,14 @@ def test_get_vc_info_git(git_source_repo: Tuple[str, str]): @require_git -def test_get_diff_git(git_source_repo: Tuple[str, str]): - """Test get_diff() for a git repo""" - source_dir, commit_sha = git_source_repo - diff = get_diff('git', source_dir) - assert diff is not None - diff_lines = diff.splitlines() +def test_write_diff_git(git_source_repo: Tuple[str, str], tmp_path: Path): + """Test write_diff() for a git repo""" + source_dir, _ = git_source_repo + run_dir = tmp_path / 'run_dir' + (run_dir / WorkflowFiles.LOG_DIR).mkdir(parents=True) + diff_file = write_diff('git', source_dir, run_dir) + diff_lines = diff_file.read_text().splitlines() + assert diff_lines[0].startswith("# Auto-generated diff") for line in ("diff --git a/flow.cylc b/flow.cylc", "- R1 = foo", "+ R1 = bar"): @@ -205,12 +209,14 @@ def test_get_vc_info_svn(svn_source_repo: Tuple[str, str, str]): @require_svn -def test_get_diff_svn(svn_source_repo: Tuple[str, str, str]): - """Test get_diff() for an svn working copy""" - source_dir, uuid, repo_path = svn_source_repo - diff = get_diff('svn', source_dir) - assert diff is not None - diff_lines = diff.splitlines() +def test_write_diff_svn(svn_source_repo: Tuple[str, str, str], tmp_path: Path): + """Test write_diff() for an svn working copy""" + source_dir, _, _ = svn_source_repo + run_dir = tmp_path / 'run_dir' + (run_dir / WorkflowFiles.LOG_DIR).mkdir(parents=True) + diff_file = write_diff('svn', source_dir, run_dir) + diff_lines = diff_file.read_text().splitlines() + assert diff_lines[0].startswith("# Auto-generated diff") for line in (f"--- {source_dir}/flow.cylc (revision 1)", f"+++ {source_dir}/flow.cylc (working copy)", "- R1 = foo", @@ -239,20 +245,26 @@ def test_not_repo(tmp_path: Path, monkeypatch: MonkeyPatch): @require_git def test_no_base_commit_git(tmp_path: Path): - """Test get_vc_info() and get_diff() for a recently init'd git source dir + """Test get_vc_info() and write_diff() for a recently init'd git source dir that does not have a base commit yet.""" source_dir = Path(tmp_path, 'new_git_repo') source_dir.mkdir() subprocess.run(['git', 'init'], cwd=source_dir, check=True) flow_file = source_dir.joinpath('flow.cylc') flow_file.write_text(BASIC_FLOW_1) + run_dir = tmp_path / 'run_dir' + (run_dir / WorkflowFiles.LOG_DIR).mkdir(parents=True) vc_info = get_vc_info(source_dir) assert vc_info is not None - expected = [ + assert list(vc_info.items()) == [ ('version control system', "git"), ('working copy root path', str(source_dir)), ('status', ["?? flow.cylc"]) ] - assert list(vc_info.items()) == expected - assert get_diff('git', source_dir) is None + + # Diff file expected to be empty (only containing comment lines), + # but should work without raising + diff_file = write_diff('git', source_dir, run_dir) + for line in diff_file.read_text().splitlines(): + assert line.startswith('#') diff --git a/tests/unit/test_remote.py b/tests/unit/test_remote.py index 1645665a59f..a1d28cf63e0 100644 --- a/tests/unit/test_remote.py +++ b/tests/unit/test_remote.py @@ -63,11 +63,26 @@ def test_construct_rsync_over_ssh_cmd(): } ) assert host == 'miklegard' - assert ' '.join(cmd) == ( - 'rsync command --delete --rsh=strange_ssh --include=/.service/ ' - '--include=/.service/server.key -a --checksum ' - '--out-format=%o %n%L --no-t --exclude=log --exclude=share ' - '--exclude=work --include=/app/*** --include=/bin/*** ' - '--include=/etc/*** --include=/lib/*** --exclude=* ' - '/foo/ miklegard:/bar/' - ) + assert cmd == [ + 'rsync', + 'command', + '--delete', + '--rsh=strange_ssh', + '--include=/.service/', + '--include=/.service/server.key', + '-a', + '--checksum', + '--out-format=%o %n%L', + '--no-t', + '--exclude=log', + '--exclude=share', + '--exclude=work', + '--include=/ana/***', + '--include=/app/***', + '--include=/bin/***', + '--include=/etc/***', + '--include=/lib/***', + '--exclude=*', + '/foo/', + 'miklegard:/bar/', + ] diff --git a/tests/unit/xtriggers/__init__.py b/tests/unit/xtriggers/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/tests/unit/xtriggers/test_workflow_state.py b/tests/unit/xtriggers/test_workflow_state.py new file mode 100644 index 00000000000..99a8c6d5fe0 --- /dev/null +++ b/tests/unit/xtriggers/test_workflow_state.py @@ -0,0 +1,40 @@ +# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE. +# Copyright (C) NIWA & British Crown (Met Office) & Contributors. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +from typing import Callable +from unittest.mock import Mock + + +from cylc.flow.xtriggers.workflow_state import workflow_state +from ..conftest import MonkeyMock + + +def test_inferred_run(tmp_run_dir: Callable, monkeymock: MonkeyMock): + """Test that the workflow_state xtrigger infers the run number""" + reg = 'isildur' + expected_workflow_id = f'{reg}/run1' + cylc_run_dir = str(tmp_run_dir()) + tmp_run_dir(expected_workflow_id, installed=True, named=True) + mock_db_checker = monkeymock( + 'cylc.flow.xtriggers.workflow_state.CylcWorkflowDBChecker', + return_value=Mock( + get_remote_point_format=lambda: 'CCYY', + ) + ) + + _, results = workflow_state(reg, task='precious', point='3000') + mock_db_checker.assert_called_once_with(cylc_run_dir, expected_workflow_id) + assert results['workflow'] == expected_workflow_id