Skip to content

Commit

Permalink
Ensure queued deltas are published before shutdown (#5098)
Browse files Browse the repository at this point in the history
Ensure final deltas published before WorkflowRuntimeServer shutdown

* Improve error logging
* Update changelog
  • Loading branch information
MetRonnie authored Sep 7, 2022
1 parent 51db763 commit ca2ab09
Show file tree
Hide file tree
Showing 8 changed files with 55 additions and 25 deletions.
3 changes: 3 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ taskdefs removed before restart.
[#5091](https://github.com/cylc/cylc-flow/pull/5091) - Fix problems with
tutorial workflows.

[#5098](https://github.com/cylc/cylc-flow/pull/5098) - Fix bug where final task
status updates were not being sent to UI before shutdown.

[#5114](https://github.com/cylc/cylc-flow/pull/5114) - Fix bug where
validation errors during workflow startup were not printed to stderr before
daemonisation.
Expand Down
5 changes: 2 additions & 3 deletions cylc/flow/network/publisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,12 +81,11 @@ async def send_multi(self, topic, data, serializer=None):
"""
if self.socket:
# don't attempt to send anything if we are in the process of
# shutting down
self.topics.add(topic)
self.socket.send_multipart(
[topic, serialize_data(data, serializer)]
)
# else we are in the process of shutting down - don't send anything

async def publish(self, items):
"""Publish topics.
Expand All @@ -98,4 +97,4 @@ async def publish(self, items):
try:
await gather_coros(self.send_multi, items)
except Exception as exc:
LOG.error('publish: %s', exc)
LOG.exception(f"publish: {exc}")
1 change: 0 additions & 1 deletion cylc/flow/network/replier.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""Server for workflow runtime API."""

import getpass # noqa: F401
from queue import Queue

import zmq
Expand Down
47 changes: 34 additions & 13 deletions cylc/flow/network/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -205,22 +205,30 @@ def start(self, barrier):

self.operate()

async def stop(self, reason):
"""Stop the TCP servers, and clean up authentication."""
async def stop(self, reason: Union[BaseException, str]) -> None:
"""Stop the TCP servers, and clean up authentication.
This method must be called/awaited from a different thread to the
server's self.thread in order to interrupt the self.operate() loop
and wait for self.thread to terminate.
"""
self.queue.put('STOP')
if self.thread and self.thread.is_alive():
while not self.stopping:
# Non-async sleep - yield to other threads rather
# than event loop.
# Non-async sleep - yield to other threads rather than
# event loop (allows self.operate() running in different
# thread to return)
sleep(self.STOP_SLEEP_INTERVAL)

if self.replier:
self.replier.stop(stop_loop=False)
if self.publisher:
await self.publish_queued_items()
await self.publisher.publish(
[(b'shutdown', str(reason).encode('utf-8'))]
)
self.publisher.stop(stop_loop=False)
self.publisher = None
if self.curve_auth:
self.curve_auth.stop() # stop the authentication thread
if self.loop and self.loop.is_running():
Expand All @@ -230,8 +238,11 @@ async def stop(self, reason):

self.stopped = True

def operate(self):
def operate(self) -> None:
"""Orchestrate the receive, send, publish of messages."""
# Note: this cannot be an async method because the response part
# of the listener runs the event loop synchronously
# (in graphql AsyncioExecutor)
while True:
# process messages from the scheduler.
if self.queue.qsize():
Expand All @@ -243,15 +254,18 @@ def operate(self):

# Gather and respond to any requests.
self.replier.listener()

# Publish all requested/queued.
while self.publish_queue.qsize():
articles = self.publish_queue.get()
self.loop.run_until_complete(self.publisher.publish(articles))
self.loop.run_until_complete(self.publish_queued_items())

# Yield control to other threads
sleep(self.OPERATE_SLEEP_INTERVAL)

async def publish_queued_items(self):
"""Publish all queued items."""
while self.publish_queue.qsize():
articles = self.publish_queue.get()
await self.publisher.publish(articles)

def receiver(self, message):
"""Process incoming messages and coordinate response.
Expand Down Expand Up @@ -371,12 +385,19 @@ def graphql(
if executed.errors:
errors: List[Any] = []
for error in executed.errors:
LOG.error(error)
if hasattr(error, '__traceback__'):
import traceback
errors.append({'error': {
'message': str(error),
'traceback': traceback.format_exception(
error.__class__, error, error.__traceback__)}})
formatted_tb = traceback.format_exception(
type(error), error, error.__traceback__
)
LOG.error("".join(formatted_tb))
errors.append({
'error': {
'message': str(error),
'traceback': formatted_tb
}
})
continue
errors.append(getattr(error, 'message', None))
return errors
Expand Down
6 changes: 3 additions & 3 deletions cylc/flow/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1712,7 +1712,7 @@ def check_workflow_stalled(self) -> bool:
self.timers[self.EVENT_STALL_TIMEOUT].reset()
return self.is_stalled

async def shutdown(self, reason: Exception) -> None:
async def shutdown(self, reason: BaseException) -> None:
"""Gracefully shut down the scheduler."""
# At the moment this method must be called from the main_loop.
# In the future it should shutdown the main_loop itself but
Expand All @@ -1733,7 +1733,7 @@ async def shutdown(self, reason: Exception) -> None:
# Re-raise exception to be caught higher up (sets the exit code)
raise exc from None

async def _shutdown(self, reason: Exception) -> None:
async def _shutdown(self, reason: BaseException) -> None:
"""Shutdown the workflow."""
shutdown_msg = "Workflow shutting down"
with patch_log_level(LOG):
Expand Down Expand Up @@ -1995,7 +1995,7 @@ def _check_startup_opts(self) -> None:
f"option --{opt}=reload is only valid for restart"
)

async def handle_exception(self, exc: Exception) -> NoReturn:
async def handle_exception(self, exc: BaseException) -> NoReturn:
"""Gracefully shut down the scheduler given a caught exception.
Re-raises the exception to be caught higher up (sets the exit code).
Expand Down
12 changes: 12 additions & 0 deletions tests/integration/test_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,15 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

from typing import Callable
from async_timeout import timeout
import asyncio
from getpass import getuser

import pytest

from cylc.flow.network.server import PB_METHOD_MAP
from cylc.flow.scheduler import Scheduler


@pytest.fixture(scope='module')
Expand Down Expand Up @@ -135,3 +137,13 @@ def _api(*args, **kwargs):
raise Exception('foo')
one.server.api = _api
assert 'error' in one.server.receiver(msg)


async def test_publish_before_shutdown(
one: Scheduler, start: Callable
):
"""Test that the server publishes final deltas before shutting down."""
async with start(one):
one.server.publish_queue.put([(b'fake', b'blah')])
await one.server.stop('i said stop!')
assert not one.server.publish_queue.qsize()
2 changes: 0 additions & 2 deletions tests/integration/test_zmq.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

from time import sleep

import pytest
import zmq

Expand Down
4 changes: 1 addition & 3 deletions tests/unit/network/test_publisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,7 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

from time import sleep

from cylc.flow.network.publisher import WorkflowPublisher, serialize_data
from cylc.flow.network.publisher import serialize_data


def test_serialize_data():
Expand Down

0 comments on commit ca2ab09

Please sign in to comment.