Skip to content

Commit

Permalink
#82: Using error response API to check for error
Browse files Browse the repository at this point in the history
  • Loading branch information
rahul committed Jan 10, 2018
1 parent bac869e commit b64b342
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 38 deletions.
16 changes: 8 additions & 8 deletions examples/common/changing_framers.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,23 +14,23 @@
tcp transport with an RTU framer). However, please let us know of any
success cases that are not documented!
"""
# --------------------------------------------------------------------------- #
# --------------------------------------------------------------------------- #
# import the modbus client and the framers
# --------------------------------------------------------------------------- #
# --------------------------------------------------------------------------- #
from pymodbus.client.sync import ModbusTcpClient as ModbusClient

# --------------------------------------------------------------------------- #
# --------------------------------------------------------------------------- #
# Import the modbus framer that you want
# --------------------------------------------------------------------------- #
# --------------------------------------------------------------------------- #
# --------------------------------------------------------------------------- #
# --------------------------------------------------------------------------- #
#from pymodbus.transaction import ModbusSocketFramer as ModbusFramer
from pymodbus.transaction import ModbusRtuFramer as ModbusFramer
#from pymodbus.transaction import ModbusBinaryFramer as ModbusFramer
#from pymodbus.transaction import ModbusAsciiFramer as ModbusFramer

# --------------------------------------------------------------------------- #
# --------------------------------------------------------------------------- #
# configure the client logging
# --------------------------------------------------------------------------- #
# --------------------------------------------------------------------------- #
import logging
logging.basicConfig()
log = logging.getLogger()
Expand All @@ -48,7 +48,7 @@
# ----------------------------------------------------------------------- #
rq = client.write_coil(1, True)
rr = client.read_coils(1,1)
assert(rq.function_code < 0x80) # test that we are not an error
assert(rq.isError() is False) # test that we are not an error
assert(rr.bits[0] == True) # test the expected value

# ----------------------------------------------------------------------- #
Expand Down
60 changes: 30 additions & 30 deletions examples/common/synchronous_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,16 @@
result = client.read_coils(1,10)
print result
"""
# --------------------------------------------------------------------------- #
# --------------------------------------------------------------------------- #
# import the various server implementations
# --------------------------------------------------------------------------- #
# --------------------------------------------------------------------------- #
from pymodbus.client.sync import ModbusTcpClient as ModbusClient
#from pymodbus.client.sync import ModbusUdpClient as ModbusClient
# from pymodbus.client.sync import ModbusSerialClient as ModbusClient

# --------------------------------------------------------------------------- #
# --------------------------------------------------------------------------- #
# configure the client logging
# --------------------------------------------------------------------------- #
# --------------------------------------------------------------------------- #
import logging
logging.basicConfig()
log = logging.getLogger()
Expand All @@ -32,9 +32,9 @@


def run_sync_client():
# ------------------------------------------------------------------------#
# ------------------------------------------------------------------------#
# choose the client you want
# ------------------------------------------------------------------------#
# ------------------------------------------------------------------------#
# make sure to start an implementation to hit against. For this
# you can use an existing device, the reference implementation in the tools
# directory, or start a pymodbus server.
Expand All @@ -58,23 +58,23 @@ def run_sync_client():
# Here is an example of using these options::
#
# client = ModbusClient('localhost', retries=3, retry_on_empty=True)
# ------------------------------------------------------------------------#
# ------------------------------------------------------------------------#
client = ModbusClient('localhost', port=5020)
# client = ModbusClient(method='ascii', port='/dev/pts/2', timeout=1)
# client = ModbusClient(method='rtu', port='/dev/ttyp0', timeout=1)
client.connect()
# ------------------------------------------------------------------------#

# ------------------------------------------------------------------------#
# specify slave to query
# ------------------------------------------------------------------------#
# ------------------------------------------------------------------------#
# The slave to query is specified in an optional parameter for each
# individual request. This can be done by specifying the `unit` parameter
# which defaults to `0x00`
# ----------------------------------------------------------------------- #
log.debug("Reading Coils")
rr = client.read_coils(1, 1, unit=0x01)
print(rr)

# ----------------------------------------------------------------------- #
# example requests
# ----------------------------------------------------------------------- #
Expand All @@ -90,49 +90,49 @@ def run_sync_client():
log.debug("Write to a Coil and read back")
rq = client.write_coil(0, True, unit=UNIT)
rr = client.read_coils(0, 1, unit=UNIT)
assert(rq.function_code < 0x80) # test that we are not an error
assert(rq.isError() is False) # test that we are not an error
assert(rr.bits[0] == True) # test the expected value

log.debug("Write to multiple coils and read back- test 1")
rq = client.write_coils(1, [True]*8, unit=UNIT)
assert(rq.function_code < 0x80) # test that we are not an error
assert(rq.isError() is False) # test that we are not an error
rr = client.read_coils(1, 21, unit=UNIT)
assert(rr.function_code < 0x80) # test that we are not an error
assert(rr.isError() is False) # test that we are not an error
resp = [True]*21

# If the returned output quantity is not a multiple of eight,
# the remaining bits in the final data byte will be padded with zeros
# (toward the high order end of the byte).

resp.extend([False]*3)
assert(rr.bits == resp) # test the expected value

log.debug("Write to multiple coils and read back - test 2")
rq = client.write_coils(1, [False]*8, unit=UNIT)
rr = client.read_coils(1, 8, unit=UNIT)
assert(rq.function_code < 0x80) # test that we are not an error
assert(rq.isError() is False) # test that we are not an error
assert(rr.bits == [False]*8) # test the expected value

log.debug("Read discrete inputs")
rr = client.read_discrete_inputs(0, 8, unit=UNIT)
assert(rq.function_code < 0x80) # test that we are not an error
assert(rq.isError() is False) # test that we are not an error

log.debug("Write to a holding register and read back")
rq = client.write_register(1, 10, unit=UNIT)
rr = client.read_holding_registers(1, 1, unit=UNIT)
assert(rq.function_code < 0x80) # test that we are not an error
assert(rq.isError() is False) # test that we are not an error
assert(rr.registers[0] == 10) # test the expected value

log.debug("Write to multiple holding registers and read back")
rq = client.write_registers(1, [10]*8, unit=UNIT)
rr = client.read_holding_registers(1, 8, unit=UNIT)
assert(rq.function_code < 0x80) # test that we are not an error
assert(rq.isError() is False) # test that we are not an error
assert(rr.registers == [10]*8) # test the expected value

log.debug("Read input registers")
rr = client.read_input_registers(1, 8, unit=UNIT)
assert(rq.function_code < 0x80) # test that we are not an error
assert(rq.isError() is False) # test that we are not an error

arguments = {
'read_address': 1,
'read_count': 8,
Expand All @@ -142,10 +142,10 @@ def run_sync_client():
log.debug("Read write registeres simulataneously")
rq = client.readwrite_registers(unit=UNIT, **arguments)
rr = client.read_holding_registers(1, 8, unit=UNIT)
assert(rq.function_code < 0x80) # test that we are not an error
assert(rq.isError() is False) # test that we are not an error
assert(rq.registers == [20]*8) # test the expected value
assert(rr.registers == [20]*8) # test the expected value

# ----------------------------------------------------------------------- #
# close the client
# ----------------------------------------------------------------------- #
Expand Down

0 comments on commit b64b342

Please sign in to comment.