From 1061c9234003af5b9a5c6f7c5bcedece39cfbcc3 Mon Sep 17 00:00:00 2001 From: jan Iversen Date: Sun, 23 Oct 2022 16:04:52 +0200 Subject: [PATCH] Start/stop multiple servers. --- MAKE_RELEASE.rst | 9 ++ pymodbus/server/async_io.py | 158 +++++++++++++++++++++++------------- test/test_examples.py | 11 ++- 3 files changed, 119 insertions(+), 59 deletions(-) diff --git a/MAKE_RELEASE.rst b/MAKE_RELEASE.rst index 045400896..9d0d32095 100644 --- a/MAKE_RELEASE.rst +++ b/MAKE_RELEASE.rst @@ -13,6 +13,8 @@ Prepare/make release on dev. * Update README.rst "Supported versions" * Update CHANGELOG.rst * Add commits from last release, but selectively ! + git log --oneline v3.0.0..HEAD > commit.log + git log v3.0.0..HEAD | grep Author > contributors.log * Commit, push and merge. * Checkout master locally * git merge dev @@ -30,6 +32,13 @@ Prepare/make release on dev. * twine upload dist/* (upload to pypi) * Double check Read me docs are updated * trigger build https://readthedocs.org/projects/pymodbus/builds/ +* on local repo + * update github pages + git checkout gh-pages + git checkout origin/master -- README.rst + * Convert README.rst to index.md + https://cloudconvert.com/rst-to-md + * commit and push index.md * Make an announcement in discussions. diff --git a/pymodbus/server/async_io.py b/pymodbus/server/async_io.py index da791c0e9..6f27a368e 100755 --- a/pymodbus/server/async_io.py +++ b/pymodbus/server/async_io.py @@ -2,6 +2,7 @@ # pylint: disable=missing-type-doc import asyncio import logging +import platform import ssl import traceback from binascii import b2a_hex @@ -34,12 +35,6 @@ # --------------------------------------------------------------------------- # _logger = logging.getLogger(__name__) -# --------------------------------------------------------------------------- # -# Allow access to server object, to e.g. make a shutdown -# --------------------------------------------------------------------------- # -_server_stopped = None # pylint: disable=invalid-name -_server_stop = None # pylint: disable=invalid-name - def sslctx_provider( sslctx=None, certfile=None, keyfile=None, password=None, reqclicert=False @@ -916,31 +911,82 @@ async def serve_forever(self): # --------------------------------------------------------------------------- # -async def _helper_run_server(server, custom_functions): - """Help starting/stopping server.""" - global _server_stopped, _server_stop # pylint: disable=global-statement,invalid-name +class _serverList: + """Maintains a list of active servers. - for func in custom_functions: - server.decoder.register(func) - _server_stopped = asyncio.Event() - _server_stop = asyncio.Event() - try: - server_task = asyncio.create_task(server.serve_forever()) - except Exception as exc: # pylint: disable=broad-except - txt = f"Server caught exception: {exc}" - _logger.error(txt) - await _server_stop.wait() - await server.shutdown() - server_task.cancel() - owntask = asyncio.current_task() - for task in asyncio.all_tasks(): - if task != owntask: - task.cancel() - try: - await task - except asyncio.CancelledError: - pass - _server_stopped.set() + The list allows applications to have multiple servers and + being able to do shutdown gracefully. + """ + + _servers = [] + + def __init__(self, server, custom_functions, register): + """Register new server.""" + for func in custom_functions: + server.decoder.register(func) + self.server = server + if register: + self._servers.append(self) + self.job_stop = asyncio.Event() + self.job_is_stopped = asyncio.Event() + self.task = None + + @classmethod + def get_server(cls, index: int): + """Get server at index.""" + return cls._servers[index] + + def _remove(self): + """Remove server from active list.""" + for i in range(len(self._servers)): # pylint: disable=consider-using-enumerate + if self._servers[i] == self: + del self._servers[i] + break + + async def run(self): + """Help starting/stopping server.""" + try: + self.task = asyncio.create_task(self.server.serve_forever()) + except Exception as exc: # pylint: disable=broad-except + txt = f"Server caught exception: {exc}" + _logger.error(txt) + await self.job_stop.wait() + await self.server.shutdown() + self.task.cancel() + try: + await asyncio.wait_for(self.task, 10) + except asyncio.CancelledError: + pass + if platform.system().lower() == "windows": + owntask = asyncio.current_task() + for task in asyncio.all_tasks(): + if task != owntask: + task.cancel() + try: + await asyncio.wait_for(task, 10) + except asyncio.CancelledError: + pass + self.job_is_stopped.set() + + def request_stop(self): + """Request server stop.""" + self.job_stop.set() + + async def async_await_stop(self): + """Wait for server stop.""" + try: + await self.job_is_stopped.wait() + except asyncio.exceptions.CancelledError: + pass + self._remove() + + def await_stop(self): + """Wait for server stop.""" + for i in range(30): # Loop for 3 seconds + sleep(0.1) # in steps of 100 milliseconds. + if self.job_is_stopped.is_set(): + break + self._remove() async def StartAsyncTcpServer( # pylint: disable=invalid-name,dangerous-default-value @@ -964,12 +1010,13 @@ async def StartAsyncTcpServer( # pylint: disable=invalid-name,dangerous-default :param kwargs: The rest :return: an initialized but inactive server object coroutine """ - framer = kwargs.pop("framer", ModbusSocketFramer) - server = ModbusTcpServer(context, framer, identity, address, **kwargs) - + server = ModbusTcpServer( + context, kwargs.pop("framer", ModbusSocketFramer), identity, address, **kwargs + ) + job = _serverList(server, custom_functions, not defer_start) if defer_start: return server - await _helper_run_server(server, custom_functions) + await job.run() async def StartAsyncTlsServer( # pylint: disable=invalid-name,dangerous-default-value,too-many-arguments @@ -1008,10 +1055,9 @@ async def StartAsyncTlsServer( # pylint: disable=invalid-name,dangerous-default :param kwargs: The rest :return: an initialized but inactive server object coroutine """ - framer = kwargs.pop("framer", ModbusTlsFramer) server = ModbusTlsServer( context, - framer, + kwargs.pop("framer", ModbusTlsFramer), identity, address, sslctx, @@ -1023,9 +1069,10 @@ async def StartAsyncTlsServer( # pylint: disable=invalid-name,dangerous-default allow_reuse_port=allow_reuse_port, **kwargs, ) + job = _serverList(server, custom_functions, not defer_start) if defer_start: return server - await _helper_run_server(server, custom_functions) + await job.run() async def StartAsyncUdpServer( # pylint: disable=invalid-name,dangerous-default-value @@ -1048,11 +1095,13 @@ async def StartAsyncUdpServer( # pylint: disable=invalid-name,dangerous-default up without the ability to shut it off :param kwargs: """ - framer = kwargs.pop("framer", ModbusSocketFramer) - server = ModbusUdpServer(context, framer, identity, address, **kwargs) + server = ModbusUdpServer( + context, kwargs.pop("framer", ModbusSocketFramer), identity, address, **kwargs + ) + job = _serverList(server, custom_functions, not defer_start) if defer_start: return server - await _helper_run_server(server, custom_functions) + await job.run() async def StartAsyncSerialServer( # pylint: disable=invalid-name,dangerous-default-value @@ -1073,12 +1122,14 @@ async def StartAsyncSerialServer( # pylint: disable=invalid-name,dangerous-defa up without the ability to shut it off :param kwargs: The rest """ - framer = kwargs.pop("framer", ModbusAsciiFramer) - server = ModbusSerialServer(context, framer, identity=identity, **kwargs) + server = ModbusSerialServer( + context, kwargs.pop("framer", ModbusAsciiFramer), identity=identity, **kwargs + ) + job = _serverList(server, custom_functions, not defer_start) if defer_start: return server await server.start() - await _helper_run_server(server, custom_functions) + await job.run() def StartSerialServer(**kwargs): # pylint: disable=invalid-name @@ -1101,20 +1152,15 @@ def StartUdpServer(**kwargs): # pylint: disable=invalid-name return asyncio.run(StartAsyncUdpServer(**kwargs)) -async def ServerAsyncStop(): # pylint: disable=invalid-name +async def ServerAsyncStop(index: int = -1): # pylint: disable=invalid-name """Terminate server.""" - global _server_stopped, _server_stop # pylint: disable=invalid-name,global-variable-not-assigned + my_job = _serverList.get_server(index) + my_job.request_stop() + await my_job.async_await_stop() - _server_stop.set() - try: - await _server_stopped.wait() - except asyncio.exceptions.CancelledError: - pass - -def ServerStop(): # pylint: disable=invalid-name +def ServerStop(index: int = -1): # pylint: disable=invalid-name """Terminate server.""" - global _server_stopped, _server_stop # pylint: disable=invalid-name,global-variable-not-assigned - - _server_stop.set() - sleep(3) + my_job = _serverList.get_server(index) + my_job.request_stop() + my_job.await_stop() diff --git a/test/test_examples.py b/test/test_examples.py index f41d4288a..a219e1bc8 100755 --- a/test/test_examples.py +++ b/test/test_examples.py @@ -67,6 +67,9 @@ async def _helper_server( test_port, ): """Run server.""" + if pytest.IS_WINDOWS and test_comm == "serial": + yield + return args = Commandline args.comm = test_comm args.framer = test_framer @@ -102,6 +105,8 @@ async def test_exp_async_server_client( mock_run_server, ): """Run async client and server.""" + if pytest.IS_WINDOWS and test_comm == "serial": + return assert not mock_run_server args = Commandline args.framer = test_framer @@ -109,8 +114,7 @@ async def test_exp_async_server_client( args.port = test_port if isinstance(test_port, int): args.port += test_port_offset - if not pytest.IS_WINDOWS and test_comm == "serial": - await run_client(test_comm, None, args=args) + await run_client(test_comm, None, args=args) def test_exp_sync_server_client(): @@ -139,6 +143,8 @@ async def test_exp_client_calls( # pylint: disable=unused-argument mock_run_server, ): """Test client-server async with different framers and calls.""" + if pytest.IS_WINDOWS: + return if test_comm == "serial" and test_framer in (ModbusAsciiFramer, ModbusBinaryFramer): return args = Commandline @@ -163,7 +169,6 @@ async def test_exp_forwarder( # pylint: disable=unused-argument mock_run_server, ): """Test modbus forwarder.""" - pymodbus_apply_logging_config() cmd_args = Commandline cmd_args.comm = test_comm