Skip to content

MultiMessageBased¤

A resource that supports multiple interfaces for message-based communication.

MultiMessageBased (MessageBased) ¤

MultiMessageBased(equipment: Equipment)

A resource that supports multiple interfaces for message-based communication.

A Connection instance supports the same properties as MessageBased.

Parameters:

Name Type Description Default
equipment Equipment

An Equipment instance.

required
Source code in packages/resources/src/msl/equipment_resources/multi_message_based.py
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
def __init__(self, equipment: Equipment) -> None:
    """A resource that supports multiple interfaces for message-based communication.

    A [Connection][msl.equipment.schema.Connection] instance supports the same _properties_ as
    [MessageBased][msl.equipment.interfaces.message_based.MessageBased].

    Args:
        equipment: An [Equipment][] instance.
    """
    self._connected: bool = False
    super().__init__(equipment)

    c = equipment.connection
    assert c is not None  # noqa: S101

    try:
        # Let the address (not the manufacturer/model) decide which interface to use
        self._interface: MessageBased = Connection(c.address, **c.properties).connect()
    except MSLConnectionError as e:
        lines = str(e).splitlines()
        raise MSLConnectionError(self, message="\n".join(lines[1:])) from None

    self._connected = True
    self._set_interface_max_read_size()
    self._set_interface_timeout()

encoding property writable ¤

encoding: str

The encoding that is used for read and write operations.

equipment property ¤

equipment: Equipment

The Equipment associated with the interface.

max_read_size property writable ¤

max_read_size: int

The maximum number of bytes that can be read.

read_termination property writable ¤

read_termination: bytes | None

The termination character sequence that is used for a read operation.

Reading stops when the equipment stops sending data or the read_termination character sequence is detected. If you set the read_termination to be equal to a variable of type str, it will be encoded as bytes.

rstrip property writable ¤

rstrip: bool

Whether to remove trailing whitespace from read messages.

timeout property writable ¤

timeout: float | None

The timeout, in seconds, for read and write operations.

A value <0 will set the timeout to be None (blocking mode).

write_termination property writable ¤

write_termination: bytes | None

The termination character sequence that is appended to write messages.

If you set the write_termination to be equal to a variable of type str, it will be encoded as bytes.

disconnect ¤

disconnect() -> None

Disconnect from the equipment.

Source code in packages/resources/src/msl/equipment_resources/multi_message_based.py
69
70
71
72
73
74
def disconnect(self) -> None:  # pyright: ignore[reportImplicitOverride]
    """Disconnect from the equipment."""
    if self._connected:
        self._interface.disconnect()
        super().disconnect()
        self._connected = False

query ¤

query(
    message: bytes | str,
    *,
    delay: float = 0.0,
    decode: Literal[True] = True,
    dtype: None = None,
    fmt: MessageFormat = ...,
    size: int | None = ...
) -> str
query(
    message: bytes | str,
    *,
    delay: float = 0.0,
    decode: Literal[False] = False,
    dtype: None = None,
    fmt: MessageFormat = ...,
    size: int | None = ...
) -> bytes
query(
    message: bytes | str,
    *,
    delay: float = 0.0,
    decode: bool = ...,
    dtype: MessageDataType = ...,
    fmt: MessageFormat = ...,
    size: int | None = ...
) -> NumpyArray1D
query(
    message: bytes | str,
    *,
    delay: float = 0.0,
    decode: bool = True,
    dtype: MessageDataType | None = None,
    fmt: MessageFormat = None,
    size: int | None = None
) -> bytes | str | NumpyArray1D

Convenience method for performing a write followed by a read.

Parameters:

Name Type Description Default
message bytes | str

The message to write to the equipment.

required
delay float

Time delay, in seconds, to wait between the write and read operations.

0.0
decode bool

Whether to decode the returned message (i.e., convert the message to a str) or keep the message as bytes. Ignored if dtype is not None.

True
dtype MessageDataType | None

The data type of the elements in the returned message. Can be any object that numpy dtype supports. For messages that are of scalar type (i.e., a single number) it is more efficient to not specify dtype but to pass the returned message to the int or float class to convert the message to the appropriate numeric type. See MessageDataType for more details.

None
fmt MessageFormat

The format that the returned message data is in. Ignored if dtype is None. See MessageFormat for more details.

None
size int | None

The number of bytes to read. Ignored if the value is None.

None

Returns:

Type Description
bytes | str | NumpyArray1D

The message from the equipment. If dtype is specified, then the message is returned as a numpy ndarray, if decode is True then the message is returned as a str, otherwise the message is returned as bytes.

Source code in src/msl/equipment/interfaces/message_based.py
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
def query(
    self,
    message: bytes | str,
    *,
    delay: float = 0.0,
    decode: bool = True,
    dtype: MessageDataType | None = None,
    fmt: MessageFormat = None,
    size: int | None = None,
) -> bytes | str | NumpyArray1D:
    """Convenience method for performing a [write][msl.equipment.interfaces.message_based.MessageBased.write]
    followed by a [read][msl.equipment.interfaces.message_based.MessageBased.read].

    Args:
        message: The message to write to the equipment.
        delay: Time delay, in seconds, to wait between the _write_ and _read_ operations.
        decode: Whether to decode the returned message (i.e., convert the message to a [str][])
            or keep the message as [bytes][]. Ignored if `dtype` is not `None`.
        dtype: The data type of the elements in the returned message. Can be any object that numpy
            [dtype][numpy.dtype] supports. For messages that are of scalar type (i.e., a single number)
            it is more efficient to not specify `dtype` but to pass the returned message to the
            [int][] or [float][] class to convert the message to the appropriate numeric type.
            See [MessageDataType][msl.equipment._types.MessageDataType] for more details.
        fmt: The format that the returned message data is in. Ignored if `dtype` is `None`.
            See [MessageFormat][msl.equipment._types.MessageFormat] for more details.
        size: The number of bytes to read. Ignored if the value is `None`.

    Returns:
        The message from the equipment. If `dtype` is specified, then the message is
            returned as a numpy [ndarray][numpy.ndarray], if `decode` is `True` then the message
            is returned as a [str][], otherwise the message is returned as [bytes][].
    """  # noqa: D205
    _ = self.write(message)
    if delay > 0:
        time.sleep(delay)
    if dtype:
        return self.read(dtype=dtype, fmt=fmt, size=size)
    return self.read(decode=decode, size=size)

read ¤

read(
    *,
    decode: Literal[True] = True,
    dtype: None = None,
    fmt: MessageFormat = ...,
    size: int | None = ...
) -> str
read(
    *,
    decode: Literal[False] = False,
    dtype: None = None,
    fmt: MessageFormat = ...,
    size: int | None = ...
) -> bytes
read(
    *,
    decode: bool = ...,
    dtype: MessageDataType = ...,
    fmt: MessageFormat = ...,
    size: int | None = ...
) -> NumpyArray1D
read(
    *,
    decode: bool = True,
    dtype: MessageDataType | None = None,
    fmt: MessageFormat = None,
    size: int | None = None
) -> bytes | str | NumpyArray1D

Read a message from the equipment.

This method will block until one of the following conditions is fulfilled:

  1. size bytes have been received — only if size is not None.
  2. the read_termination byte(s) is(are) received — only if read_termination is not None.
  3. a timeout occurs — only if timeout is not None. If a timeout occurs, an MSLTimeoutError is raised.
  4. max_read_size bytes have been received. If the maximum number of bytes have been read, an MSLConnectionError is raised.

Tip

You may also want to set the rstrip value for the class instance.

Parameters:

Name Type Description Default
decode bool

Whether to decode the message (i.e., convert the message to a str) or keep the message as bytes. Ignored if dtype is not None.

True
dtype MessageDataType | None

The data type of the elements in the returned message. Can be any object that numpy dtype supports. For messages that are of scalar type (i.e., a single number) it is more efficient to not specify dtype but to pass the returned message to the int or float class to convert the message to the appropriate numeric type. See MessageDataType for more details.

None
fmt MessageFormat

The format that the returned message data is in. Ignored if dtype is None. See MessageFormat for more details.

None
size int | None

The number of bytes to read. Ignored if the value is None.

None

Returns:

Type Description
bytes | str | NumpyArray1D

The message from the equipment. If dtype is specified, then the message is returned as a numpy ndarray, if decode is True then the message is returned as a str, otherwise the message is returned as bytes.

Source code in src/msl/equipment/interfaces/message_based.py
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
def read(
    self,
    *,
    decode: bool = True,
    dtype: MessageDataType | None = None,
    fmt: MessageFormat = None,
    size: int | None = None,
) -> bytes | str | NumpyArray1D:
    """Read a message from the equipment.

    This method will block until one of the following conditions is fulfilled:

    1. `size` bytes have been received &mdash; only if `size` is not `None`.
    2. the [read_termination][msl.equipment.interfaces.message_based.MessageBased.read_termination]
       byte(s) is(are) received &mdash; only if
       [read_termination][msl.equipment.interfaces.message_based.MessageBased.read_termination]
       is not `None`.
    3. a timeout occurs &mdash; only if [timeout][msl.equipment.interfaces.message_based.MessageBased.timeout]
       is not `None`. If a timeout occurs, an
       [MSLTimeoutError][msl.equipment.interfaces.message_based.MSLTimeoutError] is raised.
    4. [max_read_size][msl.equipment.interfaces.message_based.MessageBased.max_read_size]
       bytes have been received. If the maximum number of bytes have been read, an
       [MSLConnectionError][msl.equipment.interfaces.message_based.MSLConnectionError] is raised.

    !!! tip
        You may also want to set the [rstrip][msl.equipment.interfaces.message_based.MessageBased.rstrip]
        value for the class instance.

    Args:
        decode: Whether to decode the message (i.e., convert the message to a [str][])
            or keep the message as [bytes][]. Ignored if `dtype` is not `None`.
        dtype: The data type of the elements in the returned message. Can be any object that numpy
            [dtype][numpy.dtype] supports. For messages that are of scalar type (i.e., a single number)
            it is more efficient to not specify `dtype` but to pass the returned message to the
            [int][] or [float][] class to convert the message to the appropriate numeric type.
            See [MessageDataType][msl.equipment._types.MessageDataType] for more details.
        fmt: The format that the returned message data is in. Ignored if `dtype` is `None`.
            See [MessageFormat][msl.equipment._types.MessageFormat] for more details.
        size: The number of bytes to read. Ignored if the value is `None`.

    Returns:
        The message from the equipment. If `dtype` is specified, then the message is returned
            as a numpy [ndarray][numpy.ndarray], if `decode` is `True` then the message
            is returned as a [str][], otherwise the message is returned as [bytes][].
    """
    if size is not None and size > self._max_read_size:
        msg = f"max_read_size is {self._max_read_size} bytes, requesting {size} bytes"
        raise MSLConnectionError(self, msg)

    try:
        message = self._read(size)
    except (serial.SerialTimeoutException, socket.timeout, TimeoutError, USBTimeoutError):
        raise MSLTimeoutError(self) from None
    except Exception as e:  # noqa: BLE001
        msg = f"{e.__class__.__name__}: {e}"
        raise MSLConnectionError(self, msg) from None

    if size is None:
        if dtype:
            logger.debug("%s.read(dtype=%r, fmt=%r) -> %r", self, dtype, fmt, message)
        else:
            logger.debug("%s.read() -> %r", self, message)
    else:
        if len(message) != size:
            msg = f"received {len(message)} bytes, requested {size} bytes"
            raise MSLConnectionError(self, msg)
        logger.debug("%s.read(size=%s) -> %r", self, size, message)

    if self._rstrip:
        message = message.rstrip()

    if dtype:
        return from_bytes(message, fmt=fmt, dtype=dtype)

    if decode:
        return message.decode(encoding=self._encoding)

    return message

write ¤

write(
    message: bytes | str,
    *,
    data: Sequence1D | None = None,
    dtype: MessageDataType = "<f",
    fmt: MessageFormat = "ieee"
) -> int

Write a message to the equipment.

Parameters:

Name Type Description Default
message bytes | str

The message to write to the equipment.

required
data Sequence1D | None

The data to append to message.

None
dtype MessageDataType

The data type to use to convert each element in data to bytes. Ignored if data is None. See MessageDataType for more details.

'<f'
fmt MessageFormat

The format to use to convert data to bytes. Ignored if data is None. See MessageFormat for more details.

'ieee'

Returns:

Type Description
int

The number of bytes written.

Source code in src/msl/equipment/interfaces/message_based.py
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
def write(
    self,
    message: bytes | str,
    *,
    data: Sequence1D | None = None,
    dtype: MessageDataType = "<f",
    fmt: MessageFormat = "ieee",
) -> int:
    """Write a message to the equipment.

    Args:
        message: The message to write to the equipment.
        data: The data to append to `message`.
        dtype: The data type to use to convert each element in `data` to bytes. Ignored
            if `data` is `None`. See [MessageDataType][msl.equipment._types.MessageDataType]
            for more details.
        fmt: The format to use to convert `data` to bytes. Ignored if `data` is `None`.
            See [MessageFormat][msl.equipment._types.MessageFormat] for more details.

    Returns:
        The number of bytes written.
    """
    if not isinstance(message, bytes):
        message = message.encode(encoding=self._encoding)

    if data is not None:
        message += to_bytes(data, fmt=fmt, dtype=dtype)

    if self._write_termination and not message.endswith(self._write_termination):
        message += self._write_termination

    logger.debug("%s.write(%r)", self, message)

    try:
        return self._write(message)
    except (serial.SerialTimeoutException, socket.timeout, TimeoutError, USBTimeoutError):
        raise MSLTimeoutError(self) from None
    except Exception as e:  # noqa: BLE001
        raise MSLConnectionError(self, str(e)) from None