Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master' into feature.sim_mode_…
Browse files Browse the repository at this point in the history
…at_runtime

* upstream/master:
  log_vc_info: handle long command output (cylc#5821)
  GH Actions: limit tutorial workflow to Py3.11
  remove cylc task dependencies env var (cylc#5836)
  Refactor.lint (cylc#5718)
  protobuf 4.24.4 upgrade (cylc#5828)
  made reinstall work on multiple workflows
  Fix `IndepQueueManager` test (cylc#5832)
  Lint: Add a check for indentation being 4N spaces. (cylc#5772)
  • Loading branch information
wxtim committed Dec 5, 2023
2 parents 01aa4e8 + 1dace6e commit 7590351
Show file tree
Hide file tree
Showing 21 changed files with 528 additions and 330 deletions.
1 change: 1 addition & 0 deletions changes.d/5772.feat.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add a check for indentation being 4N spaces.
1 change: 1 addition & 0 deletions changes.d/5803.feat.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Updated 'reinstall' functionality to support multiple workflows
1 change: 1 addition & 0 deletions changes.d/5821.fix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fixed issue where large uncommitted changes could cause `cylc install` to hang.
1 change: 1 addition & 0 deletions changes.d/5836.break.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Removed the 'CYLC_TASK_DEPENDENCIES' environment variable
2 changes: 1 addition & 1 deletion conda-environment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ dependencies:
- metomi-isodatetime >=1!3.0.0, <1!3.2.0
- packaging
# Constrain protobuf version for compatible Scheduler-UIS comms across hosts
- protobuf >=4.21.2,<4.22.0
- protobuf >=4.24.4,<4.25.0
- psutil >=5.6.0
- python
- pyzmq >=22
Expand Down
25 changes: 13 additions & 12 deletions cylc/flow/data_messages_pb2.py

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

29 changes: 20 additions & 9 deletions cylc/flow/install_plugins/log_vc_info.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,12 +63,22 @@
from pathlib import Path
from subprocess import Popen, DEVNULL, PIPE
from typing import (
Any, Dict, Iterable, List, Optional, TYPE_CHECKING, TextIO, Union, overload
Any,
Dict,
Iterable,
List,
Optional,
TYPE_CHECKING,
TextIO,
Union,
overload,
)

from cylc.flow import LOG as _LOG, LoggerAdaptor
from cylc.flow.exceptions import CylcError
import cylc.flow.flags
from cylc.flow.pipe_poller import pipe_poller
from cylc.flow.util import format_cmd
from cylc.flow.workflow_files import WorkflowFiles

if TYPE_CHECKING:
Expand Down Expand Up @@ -171,7 +181,7 @@ def get_vc_info(path: Union[Path, str]) -> Optional[Dict[str, Any]]:
):
LOG.debug(f"Source dir {path} is not a {vcs} repository")
elif cylc.flow.flags.verbosity > -1:
LOG.warning(f"$ {vcs} {' '.join(args)}\n{exc}")
LOG.warning(f"$ {vcs} {format_cmd(args)}\n{exc}")
continue

info['version control system'] = vcs
Expand Down Expand Up @@ -217,9 +227,7 @@ def _run_cmd(
args: The args to pass to the version control command.
cwd: Directory to run the command in.
stdout: Where to redirect output (either PIPE or a
text stream/file object). Note: only use PIPE for
commands that will not generate a large output, otherwise
the pipe might get blocked.
text stream/file object).
Returns:
Stdout output if stdout=PIPE, else None as the output has been
Expand All @@ -231,6 +239,7 @@ def _run_cmd(
OSError: Non-zero return code for VCS command.
"""
cmd = [vcs, *args]
LOG.debug(f'$ {format_cmd(cmd)}')
try:
proc = Popen( # nosec
cmd,
Expand All @@ -245,13 +254,15 @@ def _run_cmd(
# This will only be raised if the VCS command is not installed,
# otherwise Popen() will succeed with a non-zero return code
raise VCSNotInstalledError(vcs, exc)
ret_code = proc.wait()
out, err = proc.communicate()
if ret_code:
if stdout == PIPE:
out, err = pipe_poller(proc, proc.stdout, proc.stderr)
else:
out, err = proc.communicate()
if proc.returncode:
if any(err.lower().startswith(msg) for msg in NO_BASE_ERRS[vcs]):
# No base commit in repo
raise VCSMissingBaseError(vcs, cwd)
raise OSError(ret_code, err)
raise OSError(proc.returncode, err)
return out


Expand Down
19 changes: 0 additions & 19 deletions cylc/flow/job_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,6 @@
from cylc.flow.log_level import verbosity_to_env
from cylc.flow.config import interpolate_template, ParamExpandError

# the maximum number of task dependencies which Cylc will list before
# omitting the CYLC_TASK_DEPENDENCIES environment variable
# see: https://github.com/cylc/cylc-flow/issues/5551
# NOTE: please update `src/reference/job-script-vars/var-list.txt`
# in cylc-doc if changing this value
MAX_CYLC_TASK_DEPENDENCIES_LEN = 50


class JobFileWriter:

Expand Down Expand Up @@ -227,18 +220,6 @@ def _write_task_environment(self, handle, job_conf):
handle.write(
'\n export CYLC_TASK_NAMESPACE_HIERARCHY="%s"' %
' '.join(job_conf['namespace_hierarchy']))
if len(job_conf['dependencies']) <= MAX_CYLC_TASK_DEPENDENCIES_LEN:
handle.write(
'\n export CYLC_TASK_DEPENDENCIES="%s"' %
' '.join(job_conf['dependencies']))
else:
# redact the CYLC_TASK_DEPENDENCIES variable but leave a note
# explaining why
# see: https://github.com/cylc/cylc-flow/issues/5551
handle.write(
'\n # CYLC_TASK_DEPENDENCIES=disabled'
f' (more than {MAX_CYLC_TASK_DEPENDENCIES_LEN} dependencies)'
)
handle.write(
'\n export CYLC_TASK_TRY_NUMBER=%s' % job_conf['try_num'])
handle.write(
Expand Down
73 changes: 73 additions & 0 deletions cylc/flow/pipe_poller.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE.
# Copyright (C) NIWA & British Crown (Met Office) & Contributors.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

"""Utility for preventing pipes from getting clogged up.
If you're reading files from Popen (i.e. to extract command output) where the
command output has the potential to be long-ish, then you should use this
function to protect against the buffer filling up.
Note, there is a more advanced version of this baked into the subprocpool.
"""

from select import select


def pipe_poller(proc, *files, chunk_size=4096):
"""Read from a process without hitting buffer issues.
Standin for subprocess.Popen.communicate.
When PIPE'ing from subprocesses, the output goes into a buffer. If the
buffer gets full, the subprocess will hang trying to write to it.
This function polls the process, reading output from the buffers into
memory to prevent them from filling up.
Args:
proc:
The process to poll.
files:
The files you want to read from, likely anything you've directed to
PIPE.
chunk_size:
The amount of text to read from the buffer on each pass.
Returns:
tuple - The text read from each of the files in the order they were
specified.
"""
_files = {
file: b'' if 'b' in getattr(file, 'mode', 'r') else ''
for file in files
}

def _read(timeout=1.0):
# read any data from files
nonlocal chunk_size, files
for file in select(list(files), [], [], timeout)[0]:
buffer = file.read(chunk_size)
if len(buffer) > 0:
_files[file] += buffer

while proc.poll() is None:
# read from the buffers
_read()
# double check the buffers now that the process has finished
_read(timeout=0.01)

return tuple(_files.values())
46 changes: 28 additions & 18 deletions cylc/flow/scheduler_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
from cylc.flow.id import upgrade_legacy_ids
from cylc.flow.host_select import select_workflow_host
from cylc.flow.hostuserutil import is_remote_host
from cylc.flow.id_cli import parse_ids
from cylc.flow.id_cli import parse_ids_async
from cylc.flow.loggingutil import (
close_log,
RotatingLogFileHandler,
Expand Down Expand Up @@ -354,7 +354,11 @@ def _open_logs(id_: str, no_detach: bool, restart_num: int) -> None:
)


def scheduler_cli(options: 'Values', workflow_id_raw: str) -> None:
async def scheduler_cli(
options: 'Values',
workflow_id_raw: str,
parse_workflow_id: bool = True
) -> None:
"""Run the workflow.
This function should contain all of the command line facing
Expand All @@ -368,15 +372,18 @@ def scheduler_cli(options: 'Values', workflow_id_raw: str) -> None:
# Parse workflow name but delay Cylc 7 suite.rc deprecation warning
# until after the start-up splash is printed.
# TODO: singleton
(workflow_id,), _ = parse_ids(
workflow_id_raw,
constraint='workflows',
max_workflows=1,
# warn_depr=False, # TODO
)
if parse_workflow_id:
(workflow_id,), _ = await parse_ids_async(
workflow_id_raw,
constraint='workflows',
max_workflows=1,
# warn_depr=False, # TODO
)
else:
workflow_id = workflow_id_raw

# resume the workflow if it is already running
_resume(workflow_id, options)
await _resume(workflow_id, options)

# check the workflow can be safely restarted with this version of Cylc
db_file = Path(get_workflow_srv_dir(workflow_id), 'db')
Expand All @@ -400,9 +407,7 @@ def scheduler_cli(options: 'Values', workflow_id_raw: str) -> None:
# NOTE: asyncio.run opens an event loop, runs your coro,
# then shutdown async generators and closes the event loop
scheduler = Scheduler(workflow_id, options)
asyncio.run(
_setup(scheduler)
)
await _setup(scheduler)

# daemonize if requested
# NOTE: asyncio event loops cannot persist across daemonization
Expand All @@ -419,9 +424,14 @@ def scheduler_cli(options: 'Values', workflow_id_raw: str) -> None:
)

# run the workflow
ret = asyncio.run(
_run(scheduler)
)
if options.no_detach:
ret = await _run(scheduler)
else:
# Note: The daemonization messes with asyncio so we have to start a
# new event loop if detaching
ret = asyncio.run(
_run(scheduler)
)

# exit
# NOTE: we must clean up all asyncio / threading stuff before exiting
Expand All @@ -432,7 +442,7 @@ def scheduler_cli(options: 'Values', workflow_id_raw: str) -> None:
sys.exit(ret)


def _resume(workflow_id, options):
async def _resume(workflow_id, options):
"""Resume the workflow if it is already running."""
try:
detect_old_contact_file(workflow_id)
Expand All @@ -448,7 +458,7 @@ def _resume(workflow_id, options):
'wFlows': [workflow_id]
}
}
pclient('graphql', mutation_kwargs)
await pclient.async_request('graphql', mutation_kwargs)
sys.exit(0)
except CylcError as exc:
LOG.error(exc)
Expand Down Expand Up @@ -651,4 +661,4 @@ def _play(parser: COP, options: 'Values', id_: str):
*options.starttask,
relative=True,
)
return scheduler_cli(options, id_)
return asyncio.run(scheduler_cli(options, id_))
Loading

0 comments on commit 7590351

Please sign in to comment.