Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Start/stop multiple servers. #1138

Merged
merged 1 commit into from
Oct 24, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions MAKE_RELEASE.rst
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ Prepare/make release on dev.
* Update README.rst "Supported versions"
* Update CHANGELOG.rst
* Add commits from last release, but selectively !
git log --oneline v3.0.0..HEAD > commit.log
git log v3.0.0..HEAD | grep Author > contributors.log
* Commit, push and merge.
* Checkout master locally
* git merge dev
Expand All @@ -30,6 +32,13 @@ Prepare/make release on dev.
* twine upload dist/* (upload to pypi)
* Double check Read me docs are updated
* trigger build https://readthedocs.org/projects/pymodbus/builds/
* on local repo
* update github pages
git checkout gh-pages
git checkout origin/master -- README.rst
* Convert README.rst to index.md
https://cloudconvert.com/rst-to-md
* commit and push index.md
* Make an announcement in discussions.


Expand Down
158 changes: 102 additions & 56 deletions pymodbus/server/async_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
# pylint: disable=missing-type-doc
import asyncio
import logging
import platform
import ssl
import traceback
from binascii import b2a_hex
Expand Down Expand Up @@ -34,12 +35,6 @@
# --------------------------------------------------------------------------- #
_logger = logging.getLogger(__name__)

# --------------------------------------------------------------------------- #
# Allow access to server object, to e.g. make a shutdown
# --------------------------------------------------------------------------- #
_server_stopped = None # pylint: disable=invalid-name
_server_stop = None # pylint: disable=invalid-name


def sslctx_provider(
sslctx=None, certfile=None, keyfile=None, password=None, reqclicert=False
Expand Down Expand Up @@ -916,31 +911,82 @@ async def serve_forever(self):
# --------------------------------------------------------------------------- #


async def _helper_run_server(server, custom_functions):
"""Help starting/stopping server."""
global _server_stopped, _server_stop # pylint: disable=global-statement,invalid-name
class _serverList:
"""Maintains a list of active servers.

for func in custom_functions:
server.decoder.register(func)
_server_stopped = asyncio.Event()
_server_stop = asyncio.Event()
try:
server_task = asyncio.create_task(server.serve_forever())
except Exception as exc: # pylint: disable=broad-except
txt = f"Server caught exception: {exc}"
_logger.error(txt)
await _server_stop.wait()
await server.shutdown()
server_task.cancel()
owntask = asyncio.current_task()
for task in asyncio.all_tasks():
if task != owntask:
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
_server_stopped.set()
The list allows applications to have multiple servers and
being able to do shutdown gracefully.
"""

_servers = []

def __init__(self, server, custom_functions, register):
"""Register new server."""
for func in custom_functions:
server.decoder.register(func)
self.server = server
if register:
self._servers.append(self)
self.job_stop = asyncio.Event()
self.job_is_stopped = asyncio.Event()
self.task = None

@classmethod
def get_server(cls, index: int):
"""Get server at index."""
return cls._servers[index]

def _remove(self):
"""Remove server from active list."""
for i in range(len(self._servers)): # pylint: disable=consider-using-enumerate
if self._servers[i] == self:
del self._servers[i]
break

async def run(self):
"""Help starting/stopping server."""
try:
self.task = asyncio.create_task(self.server.serve_forever())
except Exception as exc: # pylint: disable=broad-except
txt = f"Server caught exception: {exc}"
_logger.error(txt)
await self.job_stop.wait()
await self.server.shutdown()
self.task.cancel()
try:
await asyncio.wait_for(self.task, 10)
except asyncio.CancelledError:
pass
if platform.system().lower() == "windows":
owntask = asyncio.current_task()
for task in asyncio.all_tasks():
if task != owntask:
task.cancel()
try:
await asyncio.wait_for(task, 10)
except asyncio.CancelledError:
pass
self.job_is_stopped.set()

def request_stop(self):
"""Request server stop."""
self.job_stop.set()

async def async_await_stop(self):
"""Wait for server stop."""
try:
await self.job_is_stopped.wait()
except asyncio.exceptions.CancelledError:
pass
self._remove()

def await_stop(self):
"""Wait for server stop."""
for i in range(30): # Loop for 3 seconds
sleep(0.1) # in steps of 100 milliseconds.
if self.job_is_stopped.is_set():
break
self._remove()


async def StartAsyncTcpServer( # pylint: disable=invalid-name,dangerous-default-value
Expand All @@ -964,12 +1010,13 @@ async def StartAsyncTcpServer( # pylint: disable=invalid-name,dangerous-default
:param kwargs: The rest
:return: an initialized but inactive server object coroutine
"""
framer = kwargs.pop("framer", ModbusSocketFramer)
server = ModbusTcpServer(context, framer, identity, address, **kwargs)

server = ModbusTcpServer(
context, kwargs.pop("framer", ModbusSocketFramer), identity, address, **kwargs
)
job = _serverList(server, custom_functions, not defer_start)
if defer_start:
return server
await _helper_run_server(server, custom_functions)
await job.run()


async def StartAsyncTlsServer( # pylint: disable=invalid-name,dangerous-default-value,too-many-arguments
Expand Down Expand Up @@ -1008,10 +1055,9 @@ async def StartAsyncTlsServer( # pylint: disable=invalid-name,dangerous-default
:param kwargs: The rest
:return: an initialized but inactive server object coroutine
"""
framer = kwargs.pop("framer", ModbusTlsFramer)
server = ModbusTlsServer(
context,
framer,
kwargs.pop("framer", ModbusTlsFramer),
identity,
address,
sslctx,
Expand All @@ -1023,9 +1069,10 @@ async def StartAsyncTlsServer( # pylint: disable=invalid-name,dangerous-default
allow_reuse_port=allow_reuse_port,
**kwargs,
)
job = _serverList(server, custom_functions, not defer_start)
if defer_start:
return server
await _helper_run_server(server, custom_functions)
await job.run()


async def StartAsyncUdpServer( # pylint: disable=invalid-name,dangerous-default-value
Expand All @@ -1048,11 +1095,13 @@ async def StartAsyncUdpServer( # pylint: disable=invalid-name,dangerous-default
up without the ability to shut it off
:param kwargs:
"""
framer = kwargs.pop("framer", ModbusSocketFramer)
server = ModbusUdpServer(context, framer, identity, address, **kwargs)
server = ModbusUdpServer(
context, kwargs.pop("framer", ModbusSocketFramer), identity, address, **kwargs
)
job = _serverList(server, custom_functions, not defer_start)
if defer_start:
return server
await _helper_run_server(server, custom_functions)
await job.run()


async def StartAsyncSerialServer( # pylint: disable=invalid-name,dangerous-default-value
Expand All @@ -1073,12 +1122,14 @@ async def StartAsyncSerialServer( # pylint: disable=invalid-name,dangerous-defa
up without the ability to shut it off
:param kwargs: The rest
"""
framer = kwargs.pop("framer", ModbusAsciiFramer)
server = ModbusSerialServer(context, framer, identity=identity, **kwargs)
server = ModbusSerialServer(
context, kwargs.pop("framer", ModbusAsciiFramer), identity=identity, **kwargs
)
job = _serverList(server, custom_functions, not defer_start)
if defer_start:
return server
await server.start()
await _helper_run_server(server, custom_functions)
await job.run()


def StartSerialServer(**kwargs): # pylint: disable=invalid-name
Expand All @@ -1101,20 +1152,15 @@ def StartUdpServer(**kwargs): # pylint: disable=invalid-name
return asyncio.run(StartAsyncUdpServer(**kwargs))


async def ServerAsyncStop(): # pylint: disable=invalid-name
async def ServerAsyncStop(index: int = -1): # pylint: disable=invalid-name
"""Terminate server."""
global _server_stopped, _server_stop # pylint: disable=invalid-name,global-variable-not-assigned
my_job = _serverList.get_server(index)
my_job.request_stop()
await my_job.async_await_stop()

_server_stop.set()
try:
await _server_stopped.wait()
except asyncio.exceptions.CancelledError:
pass


def ServerStop(): # pylint: disable=invalid-name
def ServerStop(index: int = -1): # pylint: disable=invalid-name
"""Terminate server."""
global _server_stopped, _server_stop # pylint: disable=invalid-name,global-variable-not-assigned

_server_stop.set()
sleep(3)
my_job = _serverList.get_server(index)
my_job.request_stop()
my_job.await_stop()
11 changes: 8 additions & 3 deletions test/test_examples.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,9 @@ async def _helper_server(
test_port,
):
"""Run server."""
if pytest.IS_WINDOWS and test_comm == "serial":
yield
return
args = Commandline
args.comm = test_comm
args.framer = test_framer
Expand Down Expand Up @@ -102,15 +105,16 @@ async def test_exp_async_server_client(
mock_run_server,
):
"""Run async client and server."""
if pytest.IS_WINDOWS and test_comm == "serial":
return
assert not mock_run_server
args = Commandline
args.framer = test_framer
args.comm = test_comm
args.port = test_port
if isinstance(test_port, int):
args.port += test_port_offset
if not pytest.IS_WINDOWS and test_comm == "serial":
await run_client(test_comm, None, args=args)
await run_client(test_comm, None, args=args)


def test_exp_sync_server_client():
Expand Down Expand Up @@ -139,6 +143,8 @@ async def test_exp_client_calls( # pylint: disable=unused-argument
mock_run_server,
):
"""Test client-server async with different framers and calls."""
if pytest.IS_WINDOWS:
return
if test_comm == "serial" and test_framer in (ModbusAsciiFramer, ModbusBinaryFramer):
return
args = Commandline
Expand All @@ -163,7 +169,6 @@ async def test_exp_forwarder( # pylint: disable=unused-argument
mock_run_server,
):
"""Test modbus forwarder."""

pymodbus_apply_logging_config()
cmd_args = Commandline
cmd_args.comm = test_comm
Expand Down