Skip to content

Commit

Permalink
Merge pull request #4386 from MetRonnie/fix-uiserver-stop
Browse files Browse the repository at this point in the history
Fix mutation error when stopping workflow in UI
  • Loading branch information
wxtim authored Sep 7, 2021
2 parents 4c10b6e + e69a9a3 commit 4735043
Show file tree
Hide file tree
Showing 6 changed files with 73 additions and 59 deletions.
4 changes: 4 additions & 0 deletions cylc/flow/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,10 @@ class WorkflowEventError(CylcError):
"""Exception for errors in Cylc event handlers."""


class CommandFailedError(CylcError):
"""Exception for when scheduler commands fail."""


class ServiceFileError(CylcError):
"""Exception for errors related to workflow service files."""

Expand Down
35 changes: 16 additions & 19 deletions cylc/flow/network/resolvers.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import logging
import queue
from time import time
from typing import Iterable, Tuple, TYPE_CHECKING
from typing import Iterable, Optional, Tuple, TYPE_CHECKING
from uuid import uuid4

from graphene.utils.str_converters import to_snake_case
Expand All @@ -38,6 +38,7 @@
if TYPE_CHECKING:
from cylc.flow.data_store_mgr import DataStoreMgr
from cylc.flow.scheduler import Scheduler
from cylc.flow.workflow_status import StopMode


logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -733,29 +734,25 @@ def force_spawn_children(self, tasks, outputs):
return (True, 'Command queued')

def stop(
self,
mode=None,
cycle_point=None,
clock_time=None,
task=None,
flow_label=None
):
self,
mode: 'StopMode',
cycle_point: Optional[str] = None,
clock_time: Optional[str] = None,
task: Optional[str] = None,
flow_label: Optional[str] = None
) -> Tuple[bool, str]:
"""Stop the workflow or specific flow from spawning any further.
Args:
mode (StopMode.Value): Stop mode to set
cycle_point (str): Cycle point after which to stop.
clock_time (str): Wallclock time after which to stop.
task (str): Stop after this task succeeds.
flow_label (str): The flow to sterilise.
mode: Stop mode to set
cycle_point: Cycle point after which to stop.
clock_time: Wallclock time after which to stop.
task: Stop after this task succeeds.
flow_label: The flow to sterilise.
Returns:
tuple: (outcome, message)
outcome (bool)
True if command successfully queued.
message (str)
Information about outcome.
outcome: True if command successfully queued.
message: Information about outcome.
"""
self.schd.command_queue.put((
Expand Down
13 changes: 6 additions & 7 deletions cylc/flow/network/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -1446,13 +1446,12 @@ class WorkflowStopMode(Enum):

# NOTE: using a different enum because:
# * Graphene requires special enums.
# * We only want to offer a subset of stop modes.
# * We only want to offer a subset of stop modes (REQUEST_* only).

# Note: contains only the REQUEST_* values from StopMode
Clean = StopMode.REQUEST_CLEAN.value
Kill = StopMode.REQUEST_KILL.value
Now = StopMode.REQUEST_NOW.value
NowNow = StopMode.REQUEST_NOW_NOW.value
Clean = StopMode.REQUEST_CLEAN
Kill = StopMode.REQUEST_KILL
Now = StopMode.REQUEST_NOW
NowNow = StopMode.REQUEST_NOW_NOW

@property
def description(self):
Expand Down Expand Up @@ -1726,7 +1725,7 @@ class Meta:
class Arguments:
workflows = List(WorkflowID, required=True)
mode = WorkflowStopMode(
default_value=WorkflowStopMode.Clean.value # type: ignore
default_value=WorkflowStopMode.Clean.value
)
cycle_point = CyclePoint(
description='Stop after the workflow reaches this cycle.'
Expand Down
63 changes: 36 additions & 27 deletions cylc/flow/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import asyncio
from contextlib import suppress
from collections import deque
from cylc.flow.parsec.exceptions import TemplateVarLanguageClash
from dataclasses import dataclass
import logging
from optparse import Values
Expand All @@ -45,7 +44,7 @@
from cylc.flow.cycling.loader import get_point
from cylc.flow.data_store_mgr import DataStoreMgr, parse_job_item
from cylc.flow.exceptions import (
CyclingError, CylcError, UserInputError
CommandFailedError, CyclingError, CylcError, UserInputError
)
import cylc.flow.flags
from cylc.flow.host_select import select_workflow_host
Expand All @@ -63,6 +62,7 @@
from cylc.flow.network.server import WorkflowRuntimeServer
from cylc.flow.network.publisher import WorkflowPublisher
from cylc.flow.option_parsers import verbosity_to_env
from cylc.flow.parsec.exceptions import TemplateVarLanguageClash
from cylc.flow.parsec.OrderedDict import DictTree
from cylc.flow.parsec.util import printcfg
from cylc.flow.parsec.validate import DurationFloat
Expand Down Expand Up @@ -150,6 +150,13 @@ class Scheduler:
START_PUB_MESSAGE_PREFIX +
'url=%(comms_method)s://%(host)s:%(port)s')

# flow information
workflow: str
owner: str
host: str
id: str # noqa: A003 (instance attr not local)
uuid_str: str

# managers
profiler: Profiler
pool: TaskPool
Expand All @@ -168,24 +175,23 @@ class Scheduler:
ext_trigger_queue: Queue

# configuration
config: WorkflowConfig # flow config
options: Values
cylc_config: DictTree # [scheduler] config
flow_file: Optional[str] = None
flow_file_update_time: Optional[float] = None

# Note: attributes without a default must come before those with defaults

# flow information
workflow: Optional[str] = None
owner: Optional[str] = None
host: Optional[str] = None
id: Optional[str] = None # noqa: A003 (instance attr not local)
uuid_str: Optional[str] = None
contact_data: Optional[dict] = None
bad_hosts: Optional[Set[str]] = None

# configuration
config: Optional[WorkflowConfig] = None # flow config
flow_file: Optional[str] = None
flow_file_update_time: Optional[float] = None

# run options
is_restart: Optional[bool] = None
template_vars: Optional[dict] = None
options: Optional[Values] = None

# workflow params
stop_mode: Optional[StopMode] = None
Expand Down Expand Up @@ -237,7 +243,7 @@ class Scheduler:
time_next_kill: Optional[float] = None
already_timed_out: bool = False

def __init__(self, reg, options):
def __init__(self, reg: str, options: Values) -> None:
# flow information
self.workflow = reg
self.owner = get_user()
Expand Down Expand Up @@ -793,9 +799,13 @@ def process_command_queue(self) -> None:
except SchedulerStop:
LOG.info(f"Command succeeded: {cmdstr}")
raise
except Exception as exc:
except (CommandFailedError, Exception) as exc:
# Don't let a bad command bring the workflow down.
LOG.warning(traceback.format_exc())
if (
cylc.flow.flags.verbosity > 1 or
not isinstance(exc, CommandFailedError)
):
LOG.warning(traceback.format_exc())
LOG.warning(str(exc))
LOG.warning(f"Command failed: {cmdstr}")
else:
Expand All @@ -818,14 +828,14 @@ def info_get_graph_raw(self, cto, ctn, grouping=None):
)

def command_stop(
self,
mode=None,
cycle_point=None,
# NOTE clock_time YYYY/MM/DD-HH:mm back-compat removed
clock_time=None,
task=None,
flow_label=None
):
self,
mode: 'StopMode',
cycle_point: Optional[str] = None,
# NOTE clock_time YYYY/MM/DD-HH:mm back-compat removed
clock_time: Optional[str] = None,
task: Optional[str] = None,
flow_label: Optional[str] = None
) -> None:
if flow_label:
self.pool.stop_flow(flow_label)
return
Expand All @@ -843,9 +853,9 @@ def command_stop(
elif clock_time:
# schedule shutdown after wallclock time passes provided time
parser = TimePointParser()
clock_time = parser.parse(clock_time)
self.set_stop_clock(
int(clock_time.get("seconds_since_unix_epoch")))
int(parser.parse(clock_time).get("seconds_since_unix_epoch"))
)
elif task:
# schedule shutdown after task succeeds
task_id = TaskID.get_standardised_taskid(task)
Expand All @@ -859,13 +869,12 @@ def command_stop(
try:
mode = StopMode(mode)
except ValueError:
LOG.error(f'Invalid stop mode {mode}')
return
raise CommandFailedError(f"Invalid stop mode: '{mode}'")
self._set_stop(mode)
if mode is StopMode.REQUEST_KILL:
self.time_next_kill = time()

def _set_stop(self, stop_mode=None):
def _set_stop(self, stop_mode: Optional[StopMode] = None) -> None:
"""Set shutdown mode."""
self.proc_pool.set_stopping()
self.stop_mode = stop_mode
Expand Down
7 changes: 4 additions & 3 deletions cylc/flow/scripts/stop.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
from cylc.flow.command_polling import Poller
from cylc.flow.exceptions import ClientError, ClientTimeout
from cylc.flow.network.client_factory import get_client
from cylc.flow.network.schema import WorkflowStopMode
from cylc.flow.option_parsers import CylcOptionParser as COP
from cylc.flow.task_id import TaskID
from cylc.flow.terminal import cli_function
Expand Down Expand Up @@ -182,11 +183,11 @@ def main(
# not a task ID, may be a cycle point
cycle_point = shutdown_arg
elif options.kill:
mode = 'Kill'
mode = WorkflowStopMode.Kill.name
elif options.now > 1:
mode = 'NowNow'
mode = WorkflowStopMode.NowNow.name
elif options.now:
mode = 'Now'
mode = WorkflowStopMode.Now.name

mutation_kwargs = {
'request_string': MUTATION,
Expand Down
10 changes: 7 additions & 3 deletions tests/unit/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,10 +145,14 @@ class _DefaultCycler:
def xtrigger_mgr() -> XtriggerManager:
"""A fixture to build an XtriggerManager which uses a mocked proc_pool,
and uses a mocked broadcast_mgr."""
workflow_name = "sample_workflow"
user = "john-foo"
return XtriggerManager(
workflow="sample_workflow",
user="john-foo",
workflow=workflow_name,
user=user,
proc_pool=Mock(put_command=lambda *a, **k: True),
broadcast_mgr=Mock(put_broadcast=lambda *a, **k: True),
data_store_mgr=DataStoreMgr(create_autospec(Scheduler))
data_store_mgr=DataStoreMgr(
create_autospec(Scheduler, workflow=workflow_name, owner=user)
)
)

0 comments on commit 4735043

Please sign in to comment.