import sys
import time
import serial
from multiprocessing import Process, Queue
from .rtumessage import RtuMessage, RtuRequest, RtuResponse, RtuMessageError
__author__ = "Keil Hubbard"
__license__ = "MIT"
__version__ = "0.1.0"
if sys.version_info < (3, 6, 0):
raise ImportError("Python Version Must be >=3.6.0")
STANDARD_FUNCTION_CODES = {
"read_coils": 0x01,
"read_discrete_inputs": 0x02,
"read_holding_registers": 0x03,
"read_input_registers": 0x04,
"write_single_coil": 0x05,
"write_single_register": 0x06,
"diagnostics": 0x08,
"comm_event_counter": 0x0B,
"write_multiple_coils": 0x0F,
"write_multiple_registers": 0x10,
"report_device_id": 0x11,
"mask_write_register": 0x16,
"read_write_multiple_registers": 0x17}
MINIMUM_MESSAGE_BYTES = 2
MAXIMUM_MESSAGE_BYTES = 256
MINIMUM_ONE_BYTE_VALUE = 0x00
MAXIMUM_ONE_BYTE_VALUE = 0xFF
MINIMUM_TWO_BYTE_VALUE = 0x0000
MAXIMUM_TWO_BYTE_VALUE = 0xFFFF
COIL_ON = 0xFF00
COIL_OFF = 0x0000
[docs]class TinyModbusRtu:
"""
TinyModbusRtu Class for communicating with Modbus RTU
Supports Modbus RTU Protocol over Serial ONLY.
NOT SUPPORTED: ASCII Protocol, Modbus over TCP
Allows the following messages:
Standard Modbus RTU Read requests,
Standard Modbus RTU Write requests,
Fully custom Modbus RTU requests,
Custom function code trigger requests
Allows Optionally Disabling the CRC16 Validity check for Hobbyist Development
Note: Disables CRC16 Check for both requests and responses
See Official Modbus Documentation for RTU request formatting:
https://www.modbustools.com/modbus.html
"""
# Values Per Modbus Protocol
_MINIMUM_FRAME_TIME_SECONDS = 0.00175
_MINIMUM_CHARACTER_TIME = 3.5
_BITS_PER_CHARACTER = 8
_CRC_TABLE = [0, 49345, 49537, 320, 49921, 960, 640, 49729, 50689, 1728, 1920, 51009, 1280, 50625, 50305, 1088,
52225, 3264, 3456, 52545, 3840, 53185, 52865, 3648, 2560, 51905, 52097, 2880, 51457, 2496, 2176,
51265, 55297, 6336, 6528, 55617, 6912, 56257, 55937, 6720, 7680, 57025, 57217, 8000, 56577, 7616,
7296, 56385, 5120, 54465, 54657, 5440, 55041, 6080, 5760, 54849, 53761, 4800, 4992, 54081, 4352,
53697, 53377, 4160, 61441, 12480, 12672, 61761, 13056, 62401, 62081, 12864, 13824, 63169, 63361,
14144, 62721, 13760, 13440, 62529, 15360, 64705, 64897, 15680, 65281, 16320, 16000, 65089, 64001,
15040, 15232, 64321, 14592, 63937, 63617, 14400, 10240, 59585, 59777, 10560, 60161, 11200, 10880,
59969, 60929, 11968, 12160, 61249, 11520, 60865, 60545, 11328, 58369, 9408, 9600, 58689, 9984, 59329,
59009, 9792, 8704, 58049, 58241, 9024, 57601, 8640, 8320, 57409, 40961, 24768, 24960, 41281, 25344,
41921, 41601, 25152, 26112, 42689, 42881, 26432, 42241, 26048, 25728, 42049, 27648, 44225, 44417,
27968, 44801, 28608, 28288, 44609, 43521, 27328, 27520, 43841, 26880, 43457, 43137, 26688, 30720,
47297, 47489, 31040, 47873, 31680, 31360, 47681, 48641, 32448, 32640, 48961, 32000, 48577, 48257,
31808, 46081, 29888, 30080, 46401, 30464, 47041, 46721, 30272, 29184, 45761, 45953, 29504, 45313,
29120, 28800, 45121, 20480, 37057, 37249, 20800, 37633, 21440, 21120, 37441, 38401, 22208, 22400,
38721, 21760, 38337, 38017, 21568, 39937, 23744, 23936, 40257, 24320, 40897, 40577, 24128, 23040,
39617, 39809, 23360, 39169, 22976, 22656, 38977, 34817, 18624, 18816, 35137, 19200, 35777, 35457,
19008, 19968, 36545, 36737, 20288, 36097, 19904, 19584, 35905, 17408, 33985, 34177, 17728, 34561,
18368, 18048, 34369, 33281, 17088, 17280, 33601, 16640, 33217, 32897, 16448]
_connection: serial.Serial
_frame_time: float
_crc_enabled: bool
_timeout: float
def __init__(self,
serial_connection: serial.Serial = None,
crc_enabled: bool = True,
timeout: float = 0.5):
"""
:param serial_connection: Active PySerial Object to use as transport layer
:param crc_enabled: Whether to include crc16 integrity check bytes
:param timeout: Time to wait for messages before closing transport layer
"""
self._frame_time = TinyModbusRtu._calculate_frame_time(serial_connection.baudrate)
self._crc_enabled = crc_enabled
self._connection = serial_connection
self._timeout = timeout
self._connection.close()
# --- Initialization Methods --- #
@staticmethod
def _calculate_frame_time(baudrate: int) -> float:
"""
Calculates the appropriate silent frame time to be enforced between messages
:param baudrate: serial baudrate
:return: silent frame time
"""
bit_time = 1 / baudrate
return max((bit_time * TinyModbusRtu._BITS_PER_CHARACTER * TinyModbusRtu._MINIMUM_CHARACTER_TIME),
TinyModbusRtu._MINIMUM_FRAME_TIME_SECONDS)
"""
Method used to generate the crc16 lookup table
def _generate_crc_table() -> list[int]:
generator_polynomial = 0xA001
crc_table = []
for byte in range(256):
crc = 0x0000
for _ in range(8):
if (byte ^ crc) & 0x0001:
crc = (crc >> 1) ^ generator_polynomial
else:
crc >>= 1
byte >>= 1
crc_table.append(crc)
return crc_table
"""
[docs] def send(self, message: RtuMessage) -> None:
"""
Send Byte Message on underlying serial connection
:param message: message to send
"""
message.set_crc(self._calculate_crc(message))
self._validate_message_length(message)
time.sleep(self._frame_time)
self._connection.open()
self._connection.flush()
self._connection.write(bytearray(message.encode(crc_enabled=self._crc_enabled)))
self._connection.close()
def _run_listener(self, queue: Queue) -> None:
"""Background listening process"""
self._connection.open()
message = RtuMessage()
incoming_message = b''
listening = True
reading_message = False
message_complete = False
while listening and not message_complete:
first_byte = self._connection.read()
if len(first_byte) != 0:
incoming_message += first_byte
reading_message = True
while reading_message and not message_complete:
message_complete = True
byte_read = self._connection.read()
if len(byte_read) != 0:
message_complete = False
incoming_message += byte_read
message.decode(message=incoming_message, crc_enabled=self._crc_enabled)
queue.put(message)
self._connection.close()
[docs] def listen(self) -> RtuMessage:
"""
Listen for incoming messages
Terminates upon receiving a complete message, must call listen() again to receive another message
:return: message received
"""
listening_queue = Queue()
serial_listener = Process(target=self._run_listener, args=(listening_queue,), name="ModbusSerialListener")
serial_listener.start()
serial_listener.join(timeout=self._timeout)
serial_listener.terminate()
self._connection.close()
if serial_listener.exitcode == 0:
message = listening_queue.get()
self._validate_crc(message)
self._validate_message_length(message)
return message
raise NoMessageReceived
def _calculate_crc(self, message: RtuMessage) -> bytes:
"""
Calculates Modbus crc16 for a given message
:param message: Message to generate crc16
:return: Crc16 to be appended to message
"""
if self._crc_enabled:
message_bytes = message.encode(crc_enabled=False)
crc = 0xFFFF
for byte in message_bytes:
index = self._CRC_TABLE[(crc ^ int(byte)) & 0xFF]
crc = ((crc >> 8) & 0xFF) ^ index
crc = ((crc << 8) & 0xFF00) | ((crc >> 8) & 0x00FF)
return crc.to_bytes(2, "big")
return b''
def _validate_crc(self, message: RtuMessage) -> None:
""" Message Integrity - Check message CRC16"""
if self._crc_enabled:
if message.crc_bytes != self._calculate_crc(message):
raise FailedCRCValidation()
@staticmethod
def _validate_message_length(message: RtuMessage) -> None:
""" Error Handling - Invalid Message Length """
if message.length < MINIMUM_MESSAGE_BYTES or message.length > MAXIMUM_MESSAGE_BYTES:
raise IllegalMessageSize(MINIMUM_MESSAGE_BYTES, MAXIMUM_MESSAGE_BYTES, message.length)
@staticmethod
def _validate_function_code(message: RtuMessage) -> None:
""" Error Handling - Invalid Function Code """
if message.function_code < MINIMUM_ONE_BYTE_VALUE or message.function_code > MAXIMUM_ONE_BYTE_VALUE:
raise IllegalFunctionCode(MINIMUM_ONE_BYTE_VALUE, MAXIMUM_ONE_BYTE_VALUE, message.function_code)
@staticmethod
def _validate_server_id(message: RtuMessage) -> None:
""" Error Handling - Invalid Server Id """
if message.server_id < MINIMUM_ONE_BYTE_VALUE or message.server_id > MAXIMUM_ONE_BYTE_VALUE:
raise IllegalFunctionCode(MINIMUM_ONE_BYTE_VALUE, MAXIMUM_ONE_BYTE_VALUE, message.server_id)
@property
def crc_enabled(self) -> bool:
return self._crc_enabled
[docs]class TinyModbusClient(TinyModbusRtu):
""" Client Object for communicating with one or more MODBUS RTU Servers """
def _read(self, server_id: int, function_code: int, address: int, count: int) -> bytes:
"""
Perform a Modbus Rtu read action
:param server_id: Intended recipient
:param function_code: Function code of request
:param address: Starting register address
:param count: Count of registers to read
:return: Data bytes of response
"""
TinyModbusClient._validate_address(address)
TinyModbusClient._validate_register_count(count)
message = RtuRequest(server_id=server_id,
function_code=function_code,
address=address,
count=count)
self.send(message)
return self.listen().data_bytes
def _write(self, server_id: int, function_code: int, address: int, value: int) -> bool:
"""
Perform a Modbus Rtu write action
:param server_id: Intended recipient
:param function_code: Function code of request
:param address: Destination register address
:param value: Value to write to register
:return: True if write request succeeded, false otherwise
"""
TinyModbusClient._validate_address(address)
TinyModbusClient._validate_write_value(value)
message = RtuRequest(server_id=server_id,
function_code=function_code,
address=address,
value=value)
self.send(message)
return self.listen() == message
def _send_custom(self, server_id: int, function_code: int, data_bytes: bytes = b'') -> bytes:
"""
Send a fully custom rtu message
:param server_id: Intended Recipient
:param function_code: Function code of request
:param data_bytes: Raw data bytes to send
:return: data bytes of response message
"""
message = RtuMessage(server_id=server_id,
function_code=function_code,
data_bytes=data_bytes)
self.send(message)
return self.listen().data_bytes
@staticmethod
def _validate_address(address: int) -> None:
""" Error Handling - Invalid Register Address"""
if address < MINIMUM_TWO_BYTE_VALUE or address > MAXIMUM_TWO_BYTE_VALUE:
raise IllegalAddress(MINIMUM_TWO_BYTE_VALUE, MAXIMUM_TWO_BYTE_VALUE, address)
@staticmethod
def _validate_register_count(count: int) -> None:
""" Error Handling - Invalid Register Count"""
if count < MINIMUM_TWO_BYTE_VALUE or count > MAXIMUM_TWO_BYTE_VALUE:
raise IllegalWriteValue(MINIMUM_TWO_BYTE_VALUE, MAXIMUM_TWO_BYTE_VALUE, count)
@staticmethod
def _validate_write_value(value: int) -> None:
""" Error Handling - Invalid Write Value"""
if value < MINIMUM_TWO_BYTE_VALUE or value > MAXIMUM_TWO_BYTE_VALUE:
raise IllegalWriteValue(MINIMUM_TWO_BYTE_VALUE, MAXIMUM_TWO_BYTE_VALUE, value)
# --- Read Request Methods --- #
[docs] def read_coils(self, server_id: int, address: int, count: int) -> bytes:
"""
Sends Modbus RTU request to read server coils
:param server_id: Intended recipient
:param address: Starting coil address
:param count: Count of coils to read
:return: Data of response
"""
return self._read(server_id,
STANDARD_FUNCTION_CODES.get("read_coils"),
address,
count)
[docs] def read_holding_registers(self, server_id: int, address: int, count: int) -> bytes:
"""
Sends Modbus RTU request to read server holding registers
:param server_id: Intended recipient
:param address: Starting holding register address
:param count: Count of holding registers to read
:return: Data of response
"""
return self._read(server_id,
STANDARD_FUNCTION_CODES.get("read_holding_registers"),
address,
count)
# --- Write Request Methods --- #
[docs] def write_single_coil(self, server_id: int, address: int, coil_status: int) -> bool:
"""
Sends Modbus RTU request to write a single coil to "ON"(65280) or "OFF"(0)
Recommend using module attributes COIL_ON and COIL_OFF
:param server_id: Intended recipient
:param address: Coil Address to be written
:param coil_status: Coil Status to be set
:return: True if coil written successfully, False otherwise
"""
if coil_status == COIL_ON:
value = COIL_ON
elif coil_status == COIL_OFF:
value = COIL_OFF
else:
raise IllegalCoilStatus()
return self._write(server_id,
STANDARD_FUNCTION_CODES.get("write_single_coil"),
address,
value)
[docs] def write_single_register(self, server_id: int, address: int, value: int) -> bool:
"""
Sends Modbus RTU request to write a single register to a given value
:param server_id: Intended recipient
:param address: Register Address to Write
:param value: Value to be written to Register
:return: True if register written successfully, False otherwise
"""
return self._write(server_id,
STANDARD_FUNCTION_CODES.get("write_single_register"),
address,
value)
[docs] def write_multiple_coils(self, server_id: int, starting_address: int, values: list[bool]) -> bool:
"""
Sends Modbus RTU request to write multiple coils to 'ON' or 'OFF from a given list
Not Supported in this version of TinyModbusRtu
:param server_id: Intended recipient
:param starting_address: Starting Address of Coils to Write
:param values: List of values to write to Coils in Order
:return: True if write operation is successful, False otherwise
"""
function_code = STANDARD_FUNCTION_CODES.get("write_multiple_coils")
# TODO: Develop Logic to handle writing multiple coils
raise TinyModbusError("Writing Multiple Coils Not Supported in this version of TinyModbusRtu")
[docs] def write_multiple_registers(self, server_id: int, starting_address: int, values: list[int]) -> bool:
"""
Sends Modbus RTU request to write multiple registers to values in a given list
:param server_id: Intended recipient
:param starting_address: Starting Address of Registers to Write
:param values: List of values to write to Registers in Order
:return: True if write operation is successful, False otherwise
"""
function_code = STANDARD_FUNCTION_CODES.get("write_multiple_registers")
# TODO: Develop Logic to handle writing multiple registers
raise TinyModbusError("Writing Multiple Registers Not Supported in this version of TinyModbusRtu")
[docs] def send_function_code(self, server_id: int, function_code: int) -> bytes:
"""
Send only a custom function code to a server
:param server_id: Intended recipient
:param function_code: Custom function code
"""
return self._send_custom(server_id=server_id, function_code=function_code)
[docs] def send_custom_message(self, server_id: int, function_code: int, data_bytes: bytes) -> bytes:
"""
Send a fully custom message to a server
:param server_id: Intended recipient
:param function_code: Custom function code
:param data_bytes: Custom message contents
"""
return self._send_custom(server_id=server_id, function_code=function_code, data_bytes=data_bytes)
[docs]class TinyModbusServer(TinyModbusRtu):
_server_id: int
def __init__(self,
serial_connection=None,
server_id=0x00,
crc_enabled=True):
self._server_id = server_id
super().__init__(serial_connection=serial_connection,
crc_enabled=crc_enabled,
timeout=None)
[docs] def listen(self) -> RtuMessage:
message = super().listen()
if message.server_id == self._server_id:
return message
return self.listen()
def respond_to_read_request(self, function_code: int, byte_count: int, data: list[int]):
if len(data) == byte_count:
pass
def respond_to_write_request(self):
pass
class TinyModbusError(Exception):
"""Base Class for all TinyModbus Related Exceptions"""
pass
class IllegalCoilStatus(TinyModbusError):
def __str__(self):
return f"Coils may only be set to {COIL_ON} \'TinyModbusRtu.COIL_ON\' " + \
f"or {COIL_OFF} \'TinyModbusRtu.COIL_OFF\'"
class IllegalValue(TinyModbusError):
"""Base Class for all TinyModbus Illegal Value Exceptions"""
def __init__(self, minimum: int, maximum: int, value: int):
self._min = minimum
self._max = maximum
self._value = value
def __str__(self):
return f"{self._value} is not in the allowable range {self._min, self._max}"
class IllegalServerId(IllegalValue):
"""TinyModbusRtu class has been passed a Server ID outside the allowed range"""
class IllegalFunctionCode(IllegalValue):
"""TinyModbusRtu has been passed a Function Code outside the allowed range"""
class IllegalByteCount(IllegalValue):
"""TinyModbusRtu has been passed a Byte Count outside the allowed range"""
class IllegalAddress(IllegalValue):
"""TinyModbusRtu has been passed a Register Address outside the allowed range"""
class IllegalRegisterCount(IllegalValue):
"""TinyModbusRtu has been passed a Register Count outside the allowed range"""
class IllegalWriteValue(IllegalValue):
"""TinyModbusRtu has been passed a Value to Write outside the allowed range"""
class IllegalMessageSize(IllegalValue):
"""The Size of the Message exceeds the count allowed per the Modbus Standard"""
class FailedCRCValidation(TinyModbusError):
"""Response From the Server Failed CRC16 Data Integrity Check"""
def __str__(self):
return "Response From the Server Failed CRC16 Data Integrity Check"
class MessageMalformed(TinyModbusError):
"""Response From the Server Could Not be Processed"""
def __str__(self):
return "Response From the Server Could Not be Processed"
class NoMessageReceived(TinyModbusError):
"""No Message Received Within Specified Timeout"""
def __str__(self):
return "No Message Received Within Specified Timeout"
class ServerNoResponse(TinyModbusError):
"""No Response Received From Server Within Specified Timeout"""
def __str__(self):
return "No Response Received From Server Within Specified Timeout"