Skip to content

Modbus¤

Prerequisites¤

See this section if you are communicating with a Modbus device on Linux or macOS via the Serial interface. Otherwise, there are no prerequisites to follow on Windows or if using a network Socket for the interface.

Modbus (Interface) ¤

Modbus(equipment: Equipment)

Base class for the Modbus protocol (specification v1.1b3).

Parameters:

Name Type Description Default
equipment Equipment

An Equipment instance.

required

A Connection instance supports the same properties as either Serial or Socket, depending on which underlying interface is used for the connection.

Source code in src/msl/equipment/interfaces/modbus.py
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
def __init__(self, equipment: Equipment) -> None:
    """Base class for the Modbus protocol (specification v1.1b3).

    Args:
        equipment: An [Equipment][] instance.

    A [Connection][msl.equipment.schema.Connection] instance supports the same _properties_
    as either [Serial][msl.equipment.interfaces.serial.Serial] or [Socket][msl.equipment.interfaces.socket.Socket],
    depending on which underlying interface is used for the connection.
    """
    super().__init__(equipment)

    assert equipment.connection is not None  # noqa: S101
    parsed = parse_modbus_address(equipment.connection.address)
    if parsed is None:
        msg = f"Invalid Modbus address {equipment.connection.address!r}"
        raise ValueError(msg)

    self._lock: Lock = Lock()
    self._repr: str = self._str[:-1] + f" at {parsed.address}>"
    self._parsed: ParsedModbusAddress = parsed
    self._connect()

equipment property ¤

equipment: Equipment

The Equipment associated with the interface.

interface property ¤

interface: Serial | Socket

Serial | Socket — Returns the underlying communication interface instance.

timeout property writable ¤

timeout: float | None

The timeout, in seconds, for read and write operations.

A value <0 will set the timeout to be None (blocking mode).

disconnect ¤

disconnect() -> None

Disconnect from the Modbus equipment.

Source code in src/msl/equipment/interfaces/modbus.py
90
91
92
93
94
def disconnect(self) -> None:  # pyright: ignore[reportImplicitOverride]
    """Disconnect from the Modbus equipment."""
    if hasattr(self, "_framer"):
        self._framer.disconnect()
        super().disconnect()

mask_write_register ¤

mask_write_register(
    address: int, *, and_mask: int = 65535, or_mask: int = 0, device_id: int = 1
) -> ModbusResponse

Mask Write Register (function code 0x016).

Modifies the contents of the specified holding-register address using a combination of an AND mask, an OR mask and the register's current contents. This method can be used to set or clear individual bits in the holding register.

Parameters:

Name Type Description Default
address int

Holding-register address. Must be in the range [0, 65535].

required
and_mask int

The AND bitmask to apply to the register address. Must be in the range [0, 65535].

65535
or_mask int

The OR bitmask to apply to the register address. Must be in the range [0, 65535].

0
device_id int

Modbus device ID.

1

Returns:

Type Description
ModbusResponse

Modbus response. The response data is the result after the masks have been written.

Source code in src/msl/equipment/interfaces/modbus.py
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
def mask_write_register(
    self, address: int, *, and_mask: int = 65535, or_mask: int = 0, device_id: int = 1
) -> ModbusResponse:
    """Mask Write Register (function code `0x016`).

    Modifies the contents of the specified holding-register address using a combination of an AND mask,
    an OR mask and the register's current contents. This method can be used to set or clear individual
    bits in the holding register.

    Args:
        address: Holding-register address. Must be in the range [0, 65535].
        and_mask: The AND bitmask to apply to the register address. Must be in the range [0, 65535].
        or_mask: The OR bitmask to apply to the register address. Must be in the range [0, 65535].
        device_id: Modbus device ID.

    Returns:
        Modbus response. The response data is the result after the masks have been written.
    """
    with self._lock:
        function_code = 0x16
        _ = self.write(function_code, data=pack(">HHH", address, and_mask, or_mask), device_id=device_id)
        device_id, response = self.read(10)
        mr = ModbusResponse(device_id, response[0], response[1:])
        self._check_function_code(function_code, mr)
        return mr

read ¤

read(size: int | None = None) -> tuple[int, bytes]

Read a Modbus message.

Parameters:

Name Type Description Default
size int | None

The number of bytes to read, i.e., the size of the Application Data Unit (ADU). Only used with RTU frames. If the third byte in the response message specifies the Byte Count (e.g., first: Device ID, second: Function Code, third: Byte Count), then the size parameter does not need to be specified. The size parameter is ignored for ASCII frames or if using the TCP/UDP interface.

None

Returns:

Type Description
tuple[int, bytes]

The Modbus device ID and the Protocol Data Unit of the response, i.e., (ID, PDU).

Source code in src/msl/equipment/interfaces/modbus.py
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
def read(self, size: int | None = None) -> tuple[int, bytes]:
    """Read a Modbus message.

    Args:
        size: The number of bytes to read, i.e., the size of the Application Data Unit (ADU).
            Only used with RTU frames. If the third byte in the response message specifies the
            *Byte Count* (e.g., first: *Device ID*, second: *Function Code*, third: *Byte Count*),
            then the `size` parameter does not need to be specified. The `size` parameter is
            ignored for ASCII frames or if using the TCP/UDP interface.

    Returns:
        The Modbus device ID and the Protocol Data Unit of the response, i.e., `(ID, PDU)`.
    """
    device_id, pdu = self._framer.read(size)
    if pdu[0] <= 0x80:  # noqa: PLR2004
        return device_id, pdu

    # Consider handling exception code 5 and 6 differently (i.e., don't raise an error)
    msg = EXCEPTIONS.get(pdu[1], f"Unknown Modbus exception code 0x{pdu[1]:02X}")
    raise MSLConnectionError(self, msg)

read_coils ¤

read_coils(
    address: int, *, count: int = 1, device_id: int = 1
) -> ModbusResponse

Read coils (function code 0x01).

Parameters:

Name Type Description Default
address int

Starting register address to read from. Must be in the range [0, 65535].

required
count int

The number of coils to read. Must be in the range [1, 2000].

1
device_id int

Modbus device ID.

1

Returns:

Type Description
ModbusResponse

Modbus response. Call the bits method to get the ON/OFF state of each coil.

Source code in src/msl/equipment/interfaces/modbus.py
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
def read_coils(self, address: int, *, count: int = 1, device_id: int = 1) -> ModbusResponse:
    """Read coils (function code `0x01`).

    Args:
        address: Starting register address to read from. Must be in the range [0, 65535].
        count: The number of coils to read. Must be in the range [1, 2000].
        device_id: Modbus device ID.

    Returns:
        Modbus response. Call the [bits][msl.equipment.interfaces.modbus.ModbusResponse.bits]
            method to get the ON/OFF state of each coil.
    """
    if count > 2000:  # noqa: PLR2004
        msg = f"Requesting to read {count} coils, maximum allowed is 2000"
        raise ValueError(msg)

    with self._lock:
        function_code = 0x01
        _ = self.write(function_code, data=pack(">HH", address, count), device_id=device_id)
        device_id, response = self.read()
        mr = ModbusResponse(device_id, response[0], response[2:], count=count)
        self._check_function_code(function_code, mr)
        return mr

read_device_identification ¤

read_device_identification(
    *, code_id: Literal[1, 2, 3, 4] = 1, object_id: int = 0, device_id: int = 1
) -> ModbusIdentification

Read device Identification (function code 0x2B, Modbus Encapsulated Interface type 0x0E).

The read device identification interface is modelled as an address space composed of a set of addressable data elements. The data elements are called objects and an object ID identifies them.

Parameters:

Name Type Description Default
code_id Literal[1, 2, 3, 4]

Read device ID code.

  • 1Basic
  • 2Regular (also includes Basic)
  • 3Extended (also includes Regular)
  • 4 — A specific identification object
1
object_id int

The object ID to read.

  • 0 — Vendor name (Basic)
  • 1 — Product code (Basic)
  • 2 — Major/minor revision (Basic)
  • 3 — Vendor url (Regular)
  • 4 — Product name (Regular)
  • 5 — Model name (Regular)
  • 6 — User application name (Regular)
  • 7 to 127 — Reserved for future use
  • 128 to 255 — Device dependant (Extended)
0
device_id int

Modbus device ID.

1

Returns:

Type Description
ModbusIdentification

Modbus device identification.

Source code in src/msl/equipment/interfaces/modbus.py
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
def read_device_identification(
    self, *, code_id: Literal[1, 2, 3, 4] = 1, object_id: int = 0, device_id: int = 1
) -> ModbusIdentification:
    """Read device Identification (function code `0x2B`, Modbus Encapsulated Interface type `0x0E`).

    The read device identification interface is modelled as an address space composed
    of a set of addressable data elements. The data elements are called objects and an
    object ID identifies them.

    Args:
        code_id: Read device ID code.

            * `1` &mdash; *Basic*
            * `2` &mdash; *Regular* (also includes *Basic*)
            * `3` &mdash; *Extended* (also includes *Regular*)
            * `4` &mdash; A *specific* identification object

        object_id: The object ID to read.

            * `0` &mdash; Vendor name (*Basic*)
            * `1` &mdash; Product code (*Basic*)
            * `2` &mdash; Major/minor revision (*Basic*)
            * `3` &mdash; Vendor url (*Regular*)
            * `4` &mdash; Product name (*Regular*)
            * `5` &mdash; Model name (*Regular*)
            * `6` &mdash; User application name (*Regular*)
            * `7` to `127` &mdash; Reserved for future use
            * `128` to `255` &mdash; Device dependant (*Extended*)

        device_id: Modbus device ID.

    Returns:
        Modbus device identification.
    """
    with self._lock:
        function_code = 0x2B
        _ = self.write(function_code, data=pack(">BBB", 0x0E, code_id, object_id), device_id=device_id)
        identification = ModbusIdentification(*self.read())
        self._check_function_code(function_code, identification)
        return identification

read_discrete_inputs ¤

read_discrete_inputs(
    address: int, *, count: int = 1, device_id: int = 1
) -> ModbusResponse

Read discrete inputs (function code 0x02).

Parameters:

Name Type Description Default
address int

Starting register address to read from. Must be in the range [0, 65535].

required
count int

The number of discrete inputs to read. Must be in the range [1, 2000].

1
device_id int

Modbus device ID.

1

Returns:

Type Description
ModbusResponse

Modbus response. Call the bits method to get the ON/OFF state of each discrete input.

Source code in src/msl/equipment/interfaces/modbus.py
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
def read_discrete_inputs(self, address: int, *, count: int = 1, device_id: int = 1) -> ModbusResponse:
    """Read discrete inputs (function code `0x02`).

    Args:
        address: Starting register address to read from. Must be in the range [0, 65535].
        count: The number of discrete inputs to read. Must be in the range [1, 2000].
        device_id: Modbus device ID.

    Returns:
        Modbus response. Call the [bits][msl.equipment.interfaces.modbus.ModbusResponse.bits]
            method to get the ON/OFF state of each discrete input.
    """
    if count > 2000:  # noqa: PLR2004
        msg = f"Requesting to read {count} discrete inputs, maximum allowed is 2000"
        raise ValueError(msg)

    with self._lock:
        function_code = 0x02
        _ = self.write(function_code, data=pack(">HH", address, count), device_id=device_id)
        device_id, response = self.read()
        mr = ModbusResponse(device_id, response[0], response[2:], count=count)
        self._check_function_code(function_code, mr)
        return mr

read_exception_status ¤

read_exception_status(*, device_id: int = 1) -> ModbusResponse

Read exception status (function code 0x07).

Parameters:

Name Type Description Default
device_id int

Modbus device ID.

1

Returns:

Type Description
ModbusResponse

Modbus response. Call the bits method to get the state of each exception-status bit.

Source code in src/msl/equipment/interfaces/modbus.py
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
def read_exception_status(self, *, device_id: int = 1) -> ModbusResponse:
    """Read exception status (function code `0x07`).

    Args:
        device_id: Modbus device ID.

    Returns:
        Modbus response. Call the [bits][msl.equipment.interfaces.modbus.ModbusResponse.bits]
            method to get the state of each exception-status bit.
    """
    with self._lock:
        function_code = 0x07
        _ = self.write(function_code, device_id=device_id)
        device_id, response = self.read(5)
        mr = ModbusResponse(device_id, response[0], data=response[1:], count=8)
        self._check_function_code(function_code, mr)
        return mr

read_holding_registers ¤

read_holding_registers(
    address: int, *, count: int = 1, device_id: int = 1
) -> ModbusResponse

Read holding registers (function code 0x03).

Parameters:

Name Type Description Default
address int

Starting register address to read from. Must be in the range [0, 65535].

required
count int

The number of 16-bit registers to read. Must be in the range [1, 125].

1
device_id int

Modbus device ID.

1

Returns:

Type Description
ModbusResponse

Modbus response.

Source code in src/msl/equipment/interfaces/modbus.py
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
def read_holding_registers(self, address: int, *, count: int = 1, device_id: int = 1) -> ModbusResponse:
    """Read holding registers (function code `0x03`).

    Args:
        address: Starting register address to read from. Must be in the range [0, 65535].
        count: The number of 16-bit registers to read. Must be in the range [1, 125].
        device_id: Modbus device ID.

    Returns:
        Modbus response.
    """
    if count > 125:  # noqa: PLR2004
        msg = f"Requesting to read {count} holding registers, maximum allowed is 125"
        raise ValueError(msg)

    with self._lock:
        function_code = 0x03
        _ = self.write(function_code, data=pack(">HH", address, count), device_id=device_id)
        device_id, response = self.read()
        mr = ModbusResponse(device_id, response[0], response[2:], count=count)
        self._check_function_code(function_code, mr)
        return mr

read_input_registers ¤

read_input_registers(
    address: int, *, count: int = 1, device_id: int = 1
) -> ModbusResponse

Read input registers (function code 0x04).

Parameters:

Name Type Description Default
address int

Starting register address to read from. Must be in the range [0, 65535].

required
count int

The number of 16-bit registers to read. Must be in the range [1, 125].

1
device_id int

Modbus device ID.

1

Returns:

Type Description
ModbusResponse

Modbus response.

Source code in src/msl/equipment/interfaces/modbus.py
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
def read_input_registers(self, address: int, *, count: int = 1, device_id: int = 1) -> ModbusResponse:
    """Read input registers (function code `0x04`).

    Args:
        address: Starting register address to read from. Must be in the range [0, 65535].
        count: The number of 16-bit registers to read. Must be in the range [1, 125].
        device_id: Modbus device ID.

    Returns:
        Modbus response.
    """
    if count > 125:  # noqa: PLR2004
        msg = f"Requesting to read {count} input registers, maximum allowed is 125"
        raise ValueError(msg)

    with self._lock:
        function_code = 0x04
        _ = self.write(function_code, data=pack(">HH", address, count), device_id=device_id)
        device_id, response = self.read()
        mr = ModbusResponse(device_id, response[0], response[2:], count=count)
        self._check_function_code(function_code, mr)
        return mr

read_write_registers ¤

read_write_registers(
    *,
    read_address: int = 0,
    read_count: int = 0,
    write_address: int = 0,
    address: int | None = None,
    values: int | Sequence[int] | NDArray[uint16] | None = None,
    device_id: int = 1
) -> ModbusResponse

Read/Write registers (function code 0x17).

Performs a combination of one read operation and one write operation in a single Modbus transaction. The write operation is performed before the read operation.

Parameters:

Name Type Description Default
read_address int

Starting holding-register address to read from. Must be in the range [0, 65535].

0
read_count int

The number of 16-bit registers to read. Must be in the range [1, 125].

0
write_address int

Starting holding-register address to write to. Must be in the range [0, 65535].

0
address int | None

Use as both the read and write address. Must be in the range [0, 65535].

None
values int | Sequence[int] | NDArray[uint16] | None

A single value to write or a sequence of values to write. The maximum sequence length is 121. Each value must be in the range [0, 65535]. See also to_register_values.

None
device_id int

Modbus device ID.

1

Returns:

Type Description
ModbusResponse

Modbus response.

Source code in src/msl/equipment/interfaces/modbus.py
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
def read_write_registers(
    self,
    *,
    read_address: int = 0,
    read_count: int = 0,
    write_address: int = 0,
    address: int | None = None,
    values: int | Sequence[int] | NDArray[np.uint16] | None = None,
    device_id: int = 1,
) -> ModbusResponse:
    """Read/Write registers (function code `0x17`).

    Performs a combination of one read operation and one write operation in a single Modbus transaction.
    The write operation is performed before the read operation.

    Args:
        read_address: Starting holding-register address to read from. Must be in the range [0, 65535].
        read_count: The number of 16-bit registers to read. Must be in the range [1, 125].
        write_address: Starting holding-register address to write to. Must be in the range [0, 65535].
        address: Use as both the read and write address. Must be in the range [0, 65535].
        values: A single value to write or a sequence of values to write. The maximum sequence
            length is 121. Each value must be in the range [0, 65535]. See also
            [to_register_values][msl.equipment.interfaces.modbus.Modbus.to_register_values].
        device_id: Modbus device ID.

    Returns:
        Modbus response.
    """
    if read_count > 125:  # noqa: PLR2004
        msg = f"Requesting to read {read_count} holding registers, maximum allowed is 125"
        raise ValueError(msg)

    if values is None:
        values = []
    elif isinstance(values, int):
        values = [values]

    n = len(values)
    if n > 121:  # noqa: PLR2004
        msg = f"Too many values, {n}, to write to the Modbus registers, must be <= 121"
        raise ValueError(msg)

    if isinstance(values, np.ndarray):
        if values.dtype.str != ">u2":
            msg = f"numpy array must have a dtype of '>u2', got {values.dtype.str!r}"
            raise ValueError(msg)
        data = values.tobytes()
    elif n == 0:
        data = b""
    else:
        data = pack(f">{n}H", *values)

    if address is not None:
        read_address = address
        write_address = address

    with self._lock:
        function_code = 0x17
        _ = self.write(
            function_code,
            data=pack(">HHHHB", read_address, read_count, write_address, n, 2 * n) + data,
            device_id=device_id,
        )
        device_id, response = self.read()
        mr = ModbusResponse(device_id, response[0], data=response[2:], count=read_count)
        self._check_function_code(function_code, mr)
        return mr

reconnect ¤

reconnect(max_attempts: int = 1) -> None

Reconnect to the Modbus equipment.

Parameters:

Name Type Description Default
max_attempts int

The maximum number of attempts to try to reconnect with the equipment. If ≤ 0, keep trying until a connection is successful. If the maximum number of attempts has been reached then an exception is raised.

1
Source code in src/msl/equipment/interfaces/modbus.py
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
def reconnect(self, max_attempts: int = 1) -> None:
    """Reconnect to the Modbus equipment.

    Args:
        max_attempts: The maximum number of attempts to try to reconnect with the equipment.
            If &le; 0, keep trying until a connection is successful. If the maximum number
            of attempts has been reached then an exception is raised.
    """
    self._framer.disconnect()

    attempt = 0
    while True:
        attempt += 1
        try:
            return self._connect()
        except (MSLConnectionError, MSLTimeoutError):
            if 0 < max_attempts <= attempt:
                raise

to_register_values staticmethod ¤

to_register_values(
    data: float | Sequence[float] | NDArray[number], dtype: DTypeLike = uint16
) -> NDArray[uint16]

Convert a value or a sequence of values to an unsigned, big-endian, 16-bit integer array.

Parameters:

Name Type Description Default
data float | Sequence[float] | NDArray[number]

The value(s) to convert. If a numpy array, the data type must be the same that the Modbus register address(es) require the value(s) to be in.

required
dtype DTypeLike

The numpy data type to use to initially create a numpy array. This should be the same data type that the Modbus register address(es) require the value(s) to be in. Only used if value is not already a numpy array.

uint16

Returns:

Type Description
NDArray[uint16]

An array that can be passed to write_registers or read_write_registers.

Source code in src/msl/equipment/interfaces/modbus.py
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
@staticmethod
def to_register_values(
    data: float | Sequence[float] | NDArray[np.number], dtype: DTypeLike = np.uint16
) -> NDArray[np.uint16]:
    """Convert a value or a sequence of values to an unsigned, big-endian, 16-bit integer array.

    Args:
        data: The value(s) to convert. If a numpy array, the data type must be the same that the
            Modbus register address(es) require the value(s) to be in.
        dtype: The numpy data type to use to initially create a numpy array. This should be the
            same data type that the Modbus register address(es) require the value(s) to be in.
            Only used if `value` is not already a numpy array.

    Returns:
        An array that can be passed to [write_registers][msl.equipment.interfaces.modbus.Modbus.write_registers]
            or [read_write_registers][msl.equipment.interfaces.modbus.Modbus.read_write_registers].
    """
    if isinstance(data, (float, int)):
        data = [data]

    dtype = data.dtype if isinstance(data, np.ndarray) else np.dtype(dtype)
    return np.asarray(data, dtype=dtype.newbyteorder(">")).view(">u2")

write ¤

write(
    function_code: int, *, data: bytes | None = None, device_id: int = 1
) -> int

Write a Modbus message.

Parameters:

Name Type Description Default
function_code int

Modbus function code.

required
data bytes | None

The data associated with the function_code.

None
device_id int

Modbus device ID.

1

Returns:

Type Description
int

The number of bytes written.

Source code in src/msl/equipment/interfaces/modbus.py
411
412
413
414
415
416
417
418
419
420
421
422
def write(self, function_code: int, *, data: bytes | None = None, device_id: int = 1) -> int:
    """Write a Modbus message.

    Args:
        function_code: Modbus function code.
        data: The data associated with the `function_code`.
        device_id: Modbus device ID.

    Returns:
        The number of bytes written.
    """
    return self._framer.write(device_id, function_code.to_bytes(1, "big") + (data or b""))

write_coil ¤

write_coil(address: int, value: bool, *, device_id: int = 1) -> ModbusResponse

Write single coil (function code 0x05).

Parameters:

Name Type Description Default
address int

Register address to write to. Must be in the range [0, 65535].

required
value bool

Boolean to write. Sets the ON/OFF state of a single coil in the device.

required
device_id int

Modbus device ID.

1

Returns:

Type Description
ModbusResponse

An echo of the request, after the register contents have been written.

Source code in src/msl/equipment/interfaces/modbus.py
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
def write_coil(self, address: int, value: bool, *, device_id: int = 1) -> ModbusResponse:  # noqa: FBT001
    """Write single coil (function code `0x05`).

    Args:
        address: Register address to write to. Must be in the range [0, 65535].
        value: Boolean to write. Sets the ON/OFF state of a single coil in the device.
        device_id: Modbus device ID.

    Returns:
        An echo of the request, after the register contents have been written.
    """
    with self._lock:
        function_code = 0x05
        _ = self.write(function_code, data=pack(">HH", address, 0xFF00 if value else 0x0000), device_id=device_id)
        device_id, response = self.read(8)
        mr = ModbusResponse(device_id, response[0], response[1:])
        self._check_function_code(function_code, mr)
        return mr

write_coils ¤

write_coils(
    address: int, values: Sequence[bool] | NDArray[bool], *, device_id: int = 1
) -> ModbusResponse

Write multiple coils (function code 0x0F).

Parameters:

Name Type Description Default
address int

Starting register address to write to. Must be in the range [0, 65535].

required
values Sequence[bool] | NDArray[bool]

A sequence of booleans to write. Sets the ON/OFF state of multiple coils in the device. The maximum sequence length is 1968.

required
device_id int

Modbus device ID.

1

Returns:

Type Description
ModbusResponse

The response. The data attribute is composed of the starting register address and the number of registers that were written to.

Source code in src/msl/equipment/interfaces/modbus.py
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
def write_coils(
    self, address: int, values: Sequence[bool] | NDArray[np.bool], *, device_id: int = 1
) -> ModbusResponse:
    """Write multiple coils (function code `0x0F`).

    Args:
        address: Starting register address to write to. Must be in the range [0, 65535].
        values: A sequence of booleans to write. Sets the ON/OFF state of multiple coils
            in the device. The maximum sequence length is 1968.
        device_id: Modbus device ID.

    Returns:
        The response. The [data][msl.equipment.interfaces.modbus.ModbusResponse.data] attribute is
            composed of the starting register address and the number of registers that were written to.
    """
    n = len(values)
    if n > 1968:  # noqa: PLR2004
        msg = f"Too many values, {n}, to write to the Modbus coils, must be <= 1968"
        raise ValueError(msg)

    data = np.packbits(np.asarray(values, dtype=bool), bitorder="little").tobytes()
    data = pack(">HHB", address, n, len(data)) + data

    with self._lock:
        function_code = 0x0F
        _ = self.write(function_code, data=data, device_id=device_id)
        device_id, response = self.read(8)
        mr = ModbusResponse(device_id, response[0], response[1:])
        self._check_function_code(function_code, mr)
        return mr

write_register ¤

write_register(
    address: int, value: int, *, device_id: int = 1
) -> ModbusResponse

Write a single holding-register value (function code 0x06).

Parameters:

Name Type Description Default
address int

Register address to write to. Must be in the range [0, 65535].

required
value int

Value to write. Must be in the range [0, 65535].

required
device_id int

Modbus device ID.

1

Returns:

Type Description
ModbusResponse

An echo of the request, after the register contents have been written.

Source code in src/msl/equipment/interfaces/modbus.py
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
def write_register(self, address: int, value: int, *, device_id: int = 1) -> ModbusResponse:
    """Write a single holding-register value (function code `0x06`).

    Args:
        address: Register address to write to. Must be in the range [0, 65535].
        value: Value to write. Must be in the range [0, 65535].
        device_id: Modbus device ID.

    Returns:
        An echo of the request, after the register contents have been written.
    """
    with self._lock:
        function_code = 0x06
        _ = self.write(function_code, data=pack(">HH", address, value), device_id=device_id)
        device_id, response = self.read(8)
        mr = ModbusResponse(device_id, response[0], response[1:])
        self._check_function_code(function_code, mr)
        return mr

write_registers ¤

write_registers(
    address: int, values: Sequence[int] | NDArray[uint16], *, device_id: int = 1
) -> ModbusResponse

Write to a contiguous block of holding registers (function code 0x10).

Parameters:

Name Type Description Default
address int

Starting register address to write to. Must be in the range [0, 65535].

required
values Sequence[int] | NDArray[uint16]

A sequence of values to write. The maximum sequence length is 123. Each value must be in the range [0, 65535]. See also to_register_values.

required
device_id int

Modbus device ID.

1

Returns:

Type Description
ModbusResponse

The response. The data attribute is composed of the starting register address and the number of registers that were written to.

Source code in src/msl/equipment/interfaces/modbus.py
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
def write_registers(
    self, address: int, values: Sequence[int] | NDArray[np.uint16], *, device_id: int = 1
) -> ModbusResponse:
    """Write to a contiguous block of holding registers (function code `0x10`).

    Args:
        address: Starting register address to write to. Must be in the range [0, 65535].
        values: A sequence of values to write. The maximum sequence length is 123.
            Each value must be in the range [0, 65535]. See also
            [to_register_values][msl.equipment.interfaces.modbus.Modbus.to_register_values].
        device_id: Modbus device ID.

    Returns:
        The response. The [data][msl.equipment.interfaces.modbus.ModbusResponse.data] attribute is
            composed of the starting register address and the number of registers that were written to.
    """
    n = len(values)
    if n > 123:  # noqa: PLR2004
        msg = f"Too many values, {n}, to write to the Modbus registers, must be <= 123"
        raise ValueError(msg)

    if isinstance(values, np.ndarray):
        if values.dtype.str != ">u2":
            msg = f"numpy array must have a dtype of '>u2', got {values.dtype.str!r}"
            raise ValueError(msg)
        data = pack(">HHB", address, n, 2 * n) + values.tobytes()
    else:
        data = pack(f">HHB{n}H", address, n, 2 * n, *values)

    with self._lock:
        function_code = 0x010
        _ = self.write(function_code, data=data, device_id=device_id)
        device_id, response = self.read(8)
        mr = ModbusResponse(device_id, response[0], response[1:])
        self._check_function_code(function_code, mr)
        return mr

ModbusIdentification ¤

ModbusIdentification(device_id: int, response: bytes)

Modbus device identification.

Do not instantiate directly. This class is returned by the read_device_identification method.

Examples:

>>> identification
ModbusIdentification(code_id=1, conformity=0x83, more_follows=False, next_object_id=0, ids=[0, 1, 2])

>>> identification.objects
[ModbusObject(id=0, value=b'MSL'), ModbusObject(id=1, value=b'NZ'), ModbusObject(id=2, value=b'5.16')]

>>> for obj in identification:
...     print(f"{obj.id}: {obj.value}")
0: b'MSL'
1: b'NZ'
2: b'5.16'

>>> identification[0]  # object ID = 0, Manufacturer
b'MSL'

>>> identification[1]  # object ID = 1, Product code
b'NZ'

>>> identification[2]  # object ID = 2, Revision
b'5.16'

>>> identification[3]
Traceback (most recent call last):
...
KeyError: 'A device-identification object with id 3 is not in the Modbus response'

>>> assert identification.get(3) is None  # returns None instead of raising an error
Source code in src/msl/equipment/interfaces/modbus.py
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
def __init__(self, device_id: int, response: bytes) -> None:
    r"""Modbus device identification.

    Do not instantiate directly. This class is returned by the
    [read_device_identification][msl.equipment.interfaces.modbus.Modbus.read_device_identification] method.

    **_Examples_**:

    <!--
    >>> from msl.equipment.interfaces.modbus import ModbusIdentification
    >>> identification = ModbusIdentification(1, b"\x2b\x0e\x01\x83\x00\x00\x03\x00\x03MSL\x01\x02NZ\x02\x045.16")

    -->

    ```pycon
    >>> identification
    ModbusIdentification(code_id=1, conformity=0x83, more_follows=False, next_object_id=0, ids=[0, 1, 2])

    >>> identification.objects
    [ModbusObject(id=0, value=b'MSL'), ModbusObject(id=1, value=b'NZ'), ModbusObject(id=2, value=b'5.16')]

    >>> for obj in identification:
    ...     print(f"{obj.id}: {obj.value}")
    0: b'MSL'
    1: b'NZ'
    2: b'5.16'

    >>> identification[0]  # object ID = 0, Manufacturer
    b'MSL'

    >>> identification[1]  # object ID = 1, Product code
    b'NZ'

    >>> identification[2]  # object ID = 2, Revision
    b'5.16'

    >>> identification[3]
    Traceback (most recent call last):
    ...
    KeyError: 'A device-identification object with id 3 is not in the Modbus response'

    >>> assert identification.get(3) is None  # returns None instead of raising an error

    ```

    """
    self.function_code: int = response[0]
    """[int][] &mdash; Modbus function code."""

    self.mei_type: int = response[1]
    """[int][] &mdash; Modbus Encapsulated Interface (MEI) type."""

    self.code_id: int = response[2]
    """[int][] &mdash; Read device ID code of the request."""

    self.conformity: int = response[3]
    """[int][] &mdash; Identification conformity level of the device and type of supported access."""

    self.device_id: int = device_id
    """[int][] &mdash; Modbus device ID."""

    self.more_follows: bool = bool(response[4])
    """[bool][] &mdash; Whether the identification data does not fit into a single response and several
    request/response transactions are required."""

    self.next_object_id: int = response[5]
    """[int][] &mdash; If [more_follows][msl.equipment.interfaces.modbus.ModbusIdentification.more_follows]
    is `True`, the identification of the next object to be asked."""

    self.objects: list[ModbusObject] = []
    """[list][][[ModbusObject][msl.equipment.interfaces.modbus.ModbusObject]] &mdash; The
    device-identification objects."""

    offset = 7
    while offset < len(response):
        oid, length = response[offset : offset + 2]
        value = response[offset + 2 : offset + 2 + length]
        self.objects.append(ModbusObject(id=oid, value=value))
        offset += 2 + length

code_id instance-attribute ¤

code_id: int

int — Read device ID code of the request.

conformity instance-attribute ¤

conformity: int

int — Identification conformity level of the device and type of supported access.

device_id instance-attribute ¤

device_id: int

int — Modbus device ID.

function_code instance-attribute ¤

function_code: int

int — Modbus function code.

mei_type instance-attribute ¤

mei_type: int

int — Modbus Encapsulated Interface (MEI) type.

more_follows instance-attribute ¤

more_follows: bool

bool — Whether the identification data does not fit into a single response and several request/response transactions are required.

next_object_id instance-attribute ¤

next_object_id: int

int — If more_follows is True, the identification of the next object to be asked.

objects instance-attribute ¤

objects: list[ModbusObject]

list[ModbusObject] — The device-identification objects.

get ¤

get(object_id: int) -> bytes | None

Get the value of an object ID.

Parameters:

Name Type Description Default
object_id int

The ID of a device-identification object.

required

Returns:

Type Description
bytes | None

The corresponding value or None if a device-identification object with id object_id is not in the Modbus response.

Source code in src/msl/equipment/interfaces/modbus.py
792
793
794
795
796
797
798
799
800
801
802
803
804
805
def get(self, object_id: int) -> bytes | None:
    """Get the value of an object ID.

    Args:
        object_id: The ID of a device-identification object.

    Returns:
        The corresponding value or `None` if a device-identification object with id `object_id`
            is not in the Modbus response.
    """
    for o in self:
        if o.id == object_id:
            return o.value
    return None

ModbusObject (NamedTuple) ¤

Modbus device-identification object.

id instance-attribute ¤

id: int

int — The device-identification object ID.

value instance-attribute ¤

value: bytes

bytes — The value associated with the device-identification object ID.

ModbusResponse ¤

ModbusResponse(
    device_id: int, function_code: int, data: bytes, count: int | None = None
)

Modbus response.

Do not instantiate directly. This class is returned by most Modbus methods.

Examples:

>>> mr = device.read_holding_registers(1, count=4)
>>> mr.array("uint16")
array([ 1000,    16, 52100,  4325], dtype='>u2')

>>> mr = device.read_input_registers(780, count=2)
>>> mr.float32()
21.5

>>> mr = device.read_coils(1250, count=6)
>>> mr.bits()
array([False,  True, False, False, False,  True])
Source code in src/msl/equipment/interfaces/modbus.py
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
def __init__(self, device_id: int, function_code: int, data: bytes, count: int | None = None) -> None:
    """Modbus response.

    Do not instantiate directly. This class is returned by most [Modbus][msl.equipment.interfaces.modbus.Modbus]
    methods.

    **_Examples_**:

    <!--
    >>> import numpy
    >>> if tuple(map(int, numpy.__version__.split(".")))[0] < 2:
    ...     import pytest
    ...     pytest.skip("numpy too old, prints dtype='uint16'")
    >>> device = Connection("Modbus::/mock://").connect()
    >>> device._framer.interface.serial.add_response(bytes([1, 3, 8, 3, 232, 0, 16, 203, 132, 16, 229, 206, 73]))
    >>> device._framer.interface.serial.add_response(bytes([1, 4, 4, 65, 172, 0, 0, 47, 153]))
    >>> device._framer.interface.serial.add_response(bytes([1, 1, 1, 98, 208, 97]))

    -->

    ```pycon
    >>> mr = device.read_holding_registers(1, count=4)
    >>> mr.array("uint16")
    array([ 1000,    16, 52100,  4325], dtype='>u2')

    >>> mr = device.read_input_registers(780, count=2)
    >>> mr.float32()
    21.5

    >>> mr = device.read_coils(1250, count=6)
    >>> mr.bits()
    array([False,  True, False, False, False,  True])

    ```

    """
    self.count: int | None = count
    """[int][] &mdash; The number of registers/coils that were requested to read."""

    self.device_id: int = device_id
    """[int][] &mdash; Modbus device ID."""

    self.function_code: int = function_code
    """[int][] &mdash; Modbus function code."""

    self.data: bytes = data
    """[bytes][] &mdash; Modbus data."""

count instance-attribute ¤

count: int | None

int — The number of registers/coils that were requested to read.

data instance-attribute ¤

data: bytes

bytes — Modbus data.

device_id instance-attribute ¤

device_id: int

int — Modbus device ID.

function_code instance-attribute ¤

function_code: int

int — Modbus function code.

array ¤

array(dtype: DTypeLike) -> NDArray[Any]

numpy.ndarray — Returns the register data as a numpy.ndarray of the specified dtype.

Source code in src/msl/equipment/interfaces/modbus.py
602
603
604
605
606
def array(self, dtype: DTypeLike) -> NDArray[Any]:
    """[numpy.ndarray][] &mdash; Returns the register data as a [numpy.ndarray][] of the specified `dtype`."""
    if isinstance(dtype, type) or (isinstance(dtype, str) and dtype[0] not in "<>=|"):
        dtype = np.dtype(dtype).newbyteorder(">")  # force big endian
    return np.frombuffer(self.data, dtype=">u2").view(dtype)

bits ¤

bits(bit_order: Literal['big', 'little'] = 'little') -> NDArray[bool]

numpy.ndarray — Returns the states of the register bits for the specified bit_order.

Source code in src/msl/equipment/interfaces/modbus.py
608
609
610
611
def bits(self, bit_order: Literal["big", "little"] = "little") -> NDArray[np.bool]:
    """[numpy.ndarray][] &mdash; Returns the states of the register bits for the specified `bit_order`."""
    data = np.frombuffer(self.data, dtype=np.uint8)
    return np.unpackbits(data, count=self.count, bitorder=bit_order).astype(bool)

decode ¤

decode(encoding: str = 'utf-8') -> str

str — Returns the decoded response data using the encoding codec.

Source code in src/msl/equipment/interfaces/modbus.py
613
614
615
def decode(self, encoding: str = "utf-8") -> str:
    """[str][] &mdash; Returns the decoded response data using the `encoding` codec."""
    return self.data.decode(encoding)

float32 ¤

float32(byte_order: Literal['big', 'little'] = 'big') -> float

float — Returns the register data as a 32-bit, floating-point number for the specified byte_order.

Source code in src/msl/equipment/interfaces/modbus.py
617
618
619
620
621
def float32(self, byte_order: Literal["big", "little"] = "big") -> float:
    """[float][] &mdash; Returns the register data as a 32-bit, floating-point number for the specified `byte_order`."""  # noqa: E501
    b = ">" if byte_order == "big" else "<"
    f32: float = unpack(b + "f", self.data)[0]
    return f32

float64 ¤

float64(byte_order: Literal['big', 'little'] = 'big') -> float

float — Returns the register data as a 64-bit, floating-point number for the specified byte_order.

Source code in src/msl/equipment/interfaces/modbus.py
623
624
625
626
627
def float64(self, byte_order: Literal["big", "little"] = "big") -> float:
    """[float][] &mdash; Returns the register data as a 64-bit, floating-point number for the specified `byte_order`."""  # noqa: E501
    b = ">" if byte_order == "big" else "<"
    f64: float = unpack(b + "d", self.data)[0]
    return f64

int16 ¤

int16(byte_order: Literal['big', 'little'] = 'big') -> int

int — Returns the register data as a signed, 16-bit integer for the specified byte_order.

Source code in src/msl/equipment/interfaces/modbus.py
629
630
631
632
633
def int16(self, byte_order: Literal["big", "little"] = "big") -> int:
    """[int][] &mdash; Returns the register data as a signed, 16-bit integer for the specified `byte_order`."""
    b = ">" if byte_order == "big" else "<"
    i16: int = unpack(b + "h", self.data)[0]
    return i16

int32 ¤

int32(byte_order: Literal['big', 'little'] = 'big') -> int

int — Returns the register data as a signed, 32-bit integer for the specified byte_order.

Source code in src/msl/equipment/interfaces/modbus.py
635
636
637
638
639
def int32(self, byte_order: Literal["big", "little"] = "big") -> int:
    """[int][] &mdash; Returns the register data as a signed, 32-bit integer for the specified `byte_order`."""
    b = ">" if byte_order == "big" else "<"
    i32: int = unpack(b + "i", self.data)[0]
    return i32

int64 ¤

int64(byte_order: Literal['big', 'little'] = 'big') -> int

int — Returns the register data as a signed, 64-bit integer for the specified byte_order.

Source code in src/msl/equipment/interfaces/modbus.py
641
642
643
644
645
def int64(self, byte_order: Literal["big", "little"] = "big") -> int:
    """[int][] &mdash; Returns the register data as a signed, 64-bit integer for the specified `byte_order`."""
    b = ">" if byte_order == "big" else "<"
    i64: int = unpack(b + "q", self.data)[0]
    return i64

uint16 ¤

uint16(byte_order: Literal['big', 'little'] = 'big') -> int

int — Returns the register data as an unsigned, 16-bit integer for the specified byte_order.

Source code in src/msl/equipment/interfaces/modbus.py
647
648
649
650
651
def uint16(self, byte_order: Literal["big", "little"] = "big") -> int:
    """[int][] &mdash; Returns the register data as an unsigned, 16-bit integer for the specified `byte_order`."""
    b = ">" if byte_order == "big" else "<"
    u16: int = unpack(b + "H", self.data)[0]
    return u16

uint32 ¤

uint32(byte_order: Literal['big', 'little'] = 'big') -> int

int — Returns the register data as an unsigned, 32-bit integer for the specified byte_order.

Source code in src/msl/equipment/interfaces/modbus.py
653
654
655
656
657
def uint32(self, byte_order: Literal["big", "little"] = "big") -> int:
    """[int][] &mdash; Returns the register data as an unsigned, 32-bit integer for the specified `byte_order`."""
    b = ">" if byte_order == "big" else "<"
    u32: int = unpack(b + "I", self.data)[0]
    return u32

uint64 ¤

uint64(byte_order: Literal['big', 'little'] = 'big') -> int

int — Returns the register data as an unsigned, 64-bit integer for the specified byte_order.

Source code in src/msl/equipment/interfaces/modbus.py
659
660
661
662
663
def uint64(self, byte_order: Literal["big", "little"] = "big") -> int:
    """[int][] &mdash; Returns the register data as an unsigned, 64-bit integer for the specified `byte_order`."""
    b = ">" if byte_order == "big" else "<"
    u64: int = unpack(b + "Q", self.data)[0]
    return u64

unpack ¤

unpack(format: str) -> tuple[Any, ...]

tuple — Returns a tuple containing the register data unpacked according to the format string.

See Byte Order, Size, and Alignment and Format Characters for more details about the format parameter. Modbus data is typically in big-endian byte order.

Source code in src/msl/equipment/interfaces/modbus.py
665
666
667
668
669
670
671
def unpack(self, format: str) -> tuple[Any, ...]:  # noqa: A002
    """[tuple][] &mdash; Returns a tuple containing the register data unpacked according to the `format` string.

    See [Byte Order, Size, and Alignment][struct-alignment] and [Format Characters][format-characters]
    for more details about the `format` parameter. Modbus data is typically in big-endian byte order.
    """
    return unpack(format, self.data)