Skip to content

Commit

Permalink
Type DecodePDU. (#2392)
Browse files Browse the repository at this point in the history
  • Loading branch information
janiversen authored Oct 18, 2024
1 parent b907133 commit 209bdff
Show file tree
Hide file tree
Showing 5 changed files with 34 additions and 43 deletions.
4 changes: 2 additions & 2 deletions pymodbus/client/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ async def connect(self) -> bool:
)
return await self.ctx.connect()

def register(self, custom_response_class: ModbusPDU) -> None:
def register(self, custom_response_class: type[ModbusPDU]) -> None:
"""Register a custom response class with the decoder (call **sync**).
:param custom_response_class: (optional) Modbus response class.
Expand Down Expand Up @@ -197,7 +197,7 @@ def __init__(
# ----------------------------------------------------------------------- #
# Client external interface
# ----------------------------------------------------------------------- #
def register(self, custom_response_class: ModbusPDU) -> None:
def register(self, custom_response_class: type[ModbusPDU]) -> None:
"""Register a custom response class with the decoder.
:param custom_response_class: (optional) Modbus response class.
Expand Down
58 changes: 26 additions & 32 deletions pymodbus/pdu/decoders.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
"""Modbus Request/Response Decoders."""
from collections.abc import Callable
from __future__ import annotations

import pymodbus.pdu.bit_read_message as bit_r_msg
import pymodbus.pdu.bit_write_message as bit_w_msg
Expand All @@ -17,7 +17,7 @@
class DecodePDU:
"""Decode pdu requests/responses (server/client)."""

_pdu_class_table = {
_pdu_class_table: set[tuple[type[base.ModbusPDU], type[base.ModbusPDU]]] = {
(reg_r_msg.ReadHoldingRegistersRequest, reg_r_msg.ReadHoldingRegistersResponse),
(bit_r_msg.ReadDiscreteInputsRequest, bit_r_msg.ReadDiscreteInputsResponse),
(reg_r_msg.ReadInputRegistersRequest, reg_r_msg.ReadInputRegistersResponse),
Expand All @@ -39,7 +39,7 @@ class DecodePDU:
(mei_msg.ReadDeviceInformationRequest, mei_msg.ReadDeviceInformationResponse),
}

_pdu_sub_class_table = [
_pdu_sub_class_table: set[tuple[type[base.ModbusPDU], type[base.ModbusPDU]]] = {
(diag_msg.ReturnQueryDataRequest, diag_msg.ReturnQueryDataResponse),
(diag_msg.RestartCommunicationsOptionRequest, diag_msg.RestartCommunicationsOptionResponse),
(diag_msg.ReturnDiagnosticRegisterRequest, diag_msg.ReturnDiagnosticRegisterResponse),
Expand All @@ -58,57 +58,51 @@ class DecodePDU:
(diag_msg.ClearOverrunCountRequest, diag_msg.ClearOverrunCountResponse),
(diag_msg.GetClearModbusPlusRequest, diag_msg.GetClearModbusPlusResponse),
(mei_msg.ReadDeviceInformationRequest, mei_msg.ReadDeviceInformationResponse),
]
}

def __init__(self, is_server: bool) -> None:
"""Initialize function_tables."""
inx = 0 if is_server else 1
self.lookup: dict[int, Callable] = {cl[inx].function_code: cl[inx] for cl in self._pdu_class_table} # type: ignore[attr-defined]
self.sub_lookup: dict[int, dict[int, Callable]] = {f: {} for f in self.lookup}
self.lookup: dict[int, type[base.ModbusPDU]] = {cl[inx].function_code: cl[inx] for cl in self._pdu_class_table}
self.sub_lookup: dict[int, dict[int, type[base.ModbusPDU]]] = {f: {} for f in self.lookup}
for f in self._pdu_sub_class_table:
self.sub_lookup[f[inx].function_code][f[inx].sub_function_code] = f[inx] # type: ignore[attr-defined]
self.sub_lookup[f[inx].function_code][f[inx].sub_function_code] = f[inx]

def lookupPduClass(self, function_code):
def lookupPduClass(self, function_code: int) -> type[base.ModbusPDU]:
"""Use `function_code` to determine the class of the PDU."""
return self.lookup.get(function_code, base.ExceptionResponse)

def register(self, function):
def register(self, custom_class: type[base.ModbusPDU]) -> None:
"""Register a function and sub function class with the decoder."""
if not issubclass(function, base.ModbusPDU):
if not issubclass(custom_class, base.ModbusPDU):
raise MessageRegisterException(
f'"{function.__class__.__name__}" is Not a valid Modbus Message'
f'"{custom_class.__class__.__name__}" is Not a valid Modbus Message'
". Class needs to be derived from "
"`pymodbus.pdu.ModbusPDU` "
)
self.lookup[function.function_code] = function
if hasattr(function, "sub_function_code"):
if function.function_code not in self.sub_lookup:
self.sub_lookup[function.function_code] = {}
self.sub_lookup[function.function_code][
function.sub_function_code
] = function
self.lookup[custom_class.function_code] = custom_class
if custom_class.sub_function_code >= 0:
if custom_class.function_code not in self.sub_lookup:
self.sub_lookup[custom_class.function_code] = {}
self.sub_lookup[custom_class.function_code][
custom_class.sub_function_code
] = custom_class

def decode(self, frame):
def decode(self, frame: bytes) -> base.ModbusPDU | None:
"""Decode a frame."""
try:
if (function_code := int(frame[0])) > 0x80:
pdu = base.ExceptionResponse(function_code & 0x7F)
pdu.decode(frame[1:])
return pdu
if (pdu := self.lookup.get(function_code, lambda: None)()):
fc_string = "{}: {}".format( # pylint: disable=consider-using-f-string
str(self.lookup[function_code]) # pylint: disable=use-maxsplit-arg
.split(".")[-1]
.rstrip('">"'),
function_code,
)
Log.debug("decode PDU for {}", fc_string)
else:
pdu_exp = base.ExceptionResponse(function_code & 0x7F)
pdu_exp.decode(frame[1:])
return pdu_exp
if not (pdu_type := self.lookup.get(function_code, None)):
Log.debug("decode PDU failed for function code {}", function_code)
raise ModbusException(f"Unknown response {function_code}")
pdu = pdu_type(0, 0, False)
Log.debug("decode PDU for {}", function_code)
pdu.decode(frame[1:])

if hasattr(pdu, "sub_function_code"):
if pdu.sub_function_code >= 0:
lookup = self.sub_lookup.get(pdu.function_code, {})
if subtype := lookup.get(pdu.sub_function_code, None):
pdu.__class__ = subtype
Expand Down
8 changes: 2 additions & 6 deletions pymodbus/pdu/diag_message.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,7 @@ def decode(self, data):
:param data: The data to decode into the function code
"""
(
self.sub_function_code, # pylint: disable=attribute-defined-outside-init
) = struct.unpack(">H", data[:2])
(self.sub_function_code, ) = struct.unpack(">H", data[:2])
if self.sub_function_code == ReturnQueryDataRequest.sub_function_code:
self.message = data[2:]
else:
Expand Down Expand Up @@ -123,9 +121,7 @@ def decode(self, data):
:param data: The data to decode into the function code
"""
(
self.sub_function_code, # pylint: disable=attribute-defined-outside-init
) = struct.unpack(">H", data[:2])
(self.sub_function_code, ) = struct.unpack(">H", data[:2])
data = data[2:]
if self.sub_function_code == ReturnQueryDataRequest.sub_function_code:
self.message = data
Expand Down
1 change: 1 addition & 0 deletions pymodbus/pdu/pdu.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ class ModbusPDU:
"""Base class for all Modbus messages."""

function_code: int = 0
sub_function_code: int = -1
_rtu_frame_size: int = 0
_rtu_byte_count_pos: int = 0

Expand Down
6 changes: 3 additions & 3 deletions pymodbus/server/simulator/http_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -390,7 +390,7 @@ def build_html_calls(self, params: dict, html: str) -> str:
for function in self.request_lookup.values():
selected = (
"selected"
if function.function_code == self.call_monitor.function #type: ignore[attr-defined]
if function.function_code == self.call_monitor.function
else ""
)
function_codes += f"<option value={function.function_code} {selected}>{function.function_code_name}</option>" #type: ignore[attr-defined]
Expand Down Expand Up @@ -556,9 +556,9 @@ def build_json_calls(self, params: dict) -> dict:
function_codes = []
for function in self.request_lookup.values():
function_codes.append({
"value": function.function_code, # type: ignore[attr-defined]
"value": function.function_code,
"text": function.function_code_name, # type: ignore[attr-defined]
"selected": function.function_code == self.call_monitor.function # type: ignore[attr-defined]
"selected": function.function_code == self.call_monitor.function
})

simulation_action = "ACTIVE" if self.call_response.active != RESPONSE_INACTIVE else ""
Expand Down

0 comments on commit 209bdff

Please sign in to comment.