Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Type DecodePDU. #2392

Merged
merged 8 commits into from
Oct 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions pymodbus/client/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ async def connect(self) -> bool:
)
return await self.ctx.connect()

def register(self, custom_response_class: ModbusPDU) -> None:
def register(self, custom_response_class: type[ModbusPDU]) -> None:
"""Register a custom response class with the decoder (call **sync**).

:param custom_response_class: (optional) Modbus response class.
Expand Down Expand Up @@ -197,7 +197,7 @@ def __init__(
# ----------------------------------------------------------------------- #
# Client external interface
# ----------------------------------------------------------------------- #
def register(self, custom_response_class: ModbusPDU) -> None:
def register(self, custom_response_class: type[ModbusPDU]) -> None:
"""Register a custom response class with the decoder.

:param custom_response_class: (optional) Modbus response class.
Expand Down
58 changes: 26 additions & 32 deletions pymodbus/pdu/decoders.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
"""Modbus Request/Response Decoders."""
from collections.abc import Callable
from __future__ import annotations

import pymodbus.pdu.bit_read_message as bit_r_msg
import pymodbus.pdu.bit_write_message as bit_w_msg
Expand All @@ -17,7 +17,7 @@
class DecodePDU:
"""Decode pdu requests/responses (server/client)."""

_pdu_class_table = {
_pdu_class_table: set[tuple[type[base.ModbusPDU], type[base.ModbusPDU]]] = {
(reg_r_msg.ReadHoldingRegistersRequest, reg_r_msg.ReadHoldingRegistersResponse),
(bit_r_msg.ReadDiscreteInputsRequest, bit_r_msg.ReadDiscreteInputsResponse),
(reg_r_msg.ReadInputRegistersRequest, reg_r_msg.ReadInputRegistersResponse),
Expand All @@ -39,7 +39,7 @@ class DecodePDU:
(mei_msg.ReadDeviceInformationRequest, mei_msg.ReadDeviceInformationResponse),
}

_pdu_sub_class_table = [
_pdu_sub_class_table: set[tuple[type[base.ModbusPDU], type[base.ModbusPDU]]] = {
(diag_msg.ReturnQueryDataRequest, diag_msg.ReturnQueryDataResponse),
(diag_msg.RestartCommunicationsOptionRequest, diag_msg.RestartCommunicationsOptionResponse),
(diag_msg.ReturnDiagnosticRegisterRequest, diag_msg.ReturnDiagnosticRegisterResponse),
Expand All @@ -58,57 +58,51 @@ class DecodePDU:
(diag_msg.ClearOverrunCountRequest, diag_msg.ClearOverrunCountResponse),
(diag_msg.GetClearModbusPlusRequest, diag_msg.GetClearModbusPlusResponse),
(mei_msg.ReadDeviceInformationRequest, mei_msg.ReadDeviceInformationResponse),
]
}

def __init__(self, is_server: bool) -> None:
"""Initialize function_tables."""
inx = 0 if is_server else 1
self.lookup: dict[int, Callable] = {cl[inx].function_code: cl[inx] for cl in self._pdu_class_table} # type: ignore[attr-defined]
self.sub_lookup: dict[int, dict[int, Callable]] = {f: {} for f in self.lookup}
self.lookup: dict[int, type[base.ModbusPDU]] = {cl[inx].function_code: cl[inx] for cl in self._pdu_class_table}
self.sub_lookup: dict[int, dict[int, type[base.ModbusPDU]]] = {f: {} for f in self.lookup}
for f in self._pdu_sub_class_table:
self.sub_lookup[f[inx].function_code][f[inx].sub_function_code] = f[inx] # type: ignore[attr-defined]
self.sub_lookup[f[inx].function_code][f[inx].sub_function_code] = f[inx]

def lookupPduClass(self, function_code):
def lookupPduClass(self, function_code: int) -> type[base.ModbusPDU]:
"""Use `function_code` to determine the class of the PDU."""
return self.lookup.get(function_code, base.ExceptionResponse)

def register(self, function):
def register(self, custom_class: type[base.ModbusPDU]) -> None:
"""Register a function and sub function class with the decoder."""
if not issubclass(function, base.ModbusPDU):
if not issubclass(custom_class, base.ModbusPDU):
raise MessageRegisterException(
f'"{function.__class__.__name__}" is Not a valid Modbus Message'
f'"{custom_class.__class__.__name__}" is Not a valid Modbus Message'
". Class needs to be derived from "
"`pymodbus.pdu.ModbusPDU` "
)
self.lookup[function.function_code] = function
if hasattr(function, "sub_function_code"):
if function.function_code not in self.sub_lookup:
self.sub_lookup[function.function_code] = {}
self.sub_lookup[function.function_code][
function.sub_function_code
] = function
self.lookup[custom_class.function_code] = custom_class
if custom_class.sub_function_code >= 0:
if custom_class.function_code not in self.sub_lookup:
self.sub_lookup[custom_class.function_code] = {}
self.sub_lookup[custom_class.function_code][
custom_class.sub_function_code
] = custom_class

def decode(self, frame):
def decode(self, frame: bytes) -> base.ModbusPDU | None:
"""Decode a frame."""
try:
if (function_code := int(frame[0])) > 0x80:
pdu = base.ExceptionResponse(function_code & 0x7F)
pdu.decode(frame[1:])
return pdu
if (pdu := self.lookup.get(function_code, lambda: None)()):
fc_string = "{}: {}".format( # pylint: disable=consider-using-f-string
str(self.lookup[function_code]) # pylint: disable=use-maxsplit-arg
.split(".")[-1]
.rstrip('">"'),
function_code,
)
Log.debug("decode PDU for {}", fc_string)
else:
pdu_exp = base.ExceptionResponse(function_code & 0x7F)
pdu_exp.decode(frame[1:])
return pdu_exp
if not (pdu_type := self.lookup.get(function_code, None)):
Log.debug("decode PDU failed for function code {}", function_code)
raise ModbusException(f"Unknown response {function_code}")
pdu = pdu_type(0, 0, False)
Log.debug("decode PDU for {}", function_code)
pdu.decode(frame[1:])

if hasattr(pdu, "sub_function_code"):
if pdu.sub_function_code >= 0:
lookup = self.sub_lookup.get(pdu.function_code, {})
if subtype := lookup.get(pdu.sub_function_code, None):
pdu.__class__ = subtype
Expand Down
8 changes: 2 additions & 6 deletions pymodbus/pdu/diag_message.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,7 @@ def decode(self, data):

:param data: The data to decode into the function code
"""
(
self.sub_function_code, # pylint: disable=attribute-defined-outside-init
) = struct.unpack(">H", data[:2])
(self.sub_function_code, ) = struct.unpack(">H", data[:2])
if self.sub_function_code == ReturnQueryDataRequest.sub_function_code:
self.message = data[2:]
else:
Expand Down Expand Up @@ -123,9 +121,7 @@ def decode(self, data):

:param data: The data to decode into the function code
"""
(
self.sub_function_code, # pylint: disable=attribute-defined-outside-init
) = struct.unpack(">H", data[:2])
(self.sub_function_code, ) = struct.unpack(">H", data[:2])
data = data[2:]
if self.sub_function_code == ReturnQueryDataRequest.sub_function_code:
self.message = data
Expand Down
1 change: 1 addition & 0 deletions pymodbus/pdu/pdu.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ class ModbusPDU:
"""Base class for all Modbus messages."""

function_code: int = 0
sub_function_code: int = -1
_rtu_frame_size: int = 0
_rtu_byte_count_pos: int = 0

Expand Down
6 changes: 3 additions & 3 deletions pymodbus/server/simulator/http_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -390,7 +390,7 @@ def build_html_calls(self, params: dict, html: str) -> str:
for function in self.request_lookup.values():
selected = (
"selected"
if function.function_code == self.call_monitor.function #type: ignore[attr-defined]
if function.function_code == self.call_monitor.function
else ""
)
function_codes += f"<option value={function.function_code} {selected}>{function.function_code_name}</option>" #type: ignore[attr-defined]
Expand Down Expand Up @@ -556,9 +556,9 @@ def build_json_calls(self, params: dict) -> dict:
function_codes = []
for function in self.request_lookup.values():
function_codes.append({
"value": function.function_code, # type: ignore[attr-defined]
"value": function.function_code,
"text": function.function_code_name, # type: ignore[attr-defined]
"selected": function.function_code == self.call_monitor.function # type: ignore[attr-defined]
"selected": function.function_code == self.call_monitor.function
})

simulation_action = "ACTIVE" if self.call_response.active != RESPONSE_INACTIVE else ""
Expand Down