Skip to content

Interfaces¤

Generic interface classes for computer control

  • Interface — Base class for all interfaces
  • MessageBased — Base class for all message-based interfaces

Interface ¤

Interface(equipment: Equipment)

Base class for all interfaces.

Parameters:

Name Type Description Default
equipment Equipment

An Equipment instance.

required
Source code in src/msl/equipment/schema.py
3219
3220
3221
3222
3223
3224
3225
3226
3227
3228
3229
3230
3231
3232
3233
3234
3235
3236
def __init__(self, equipment: Equipment) -> None:
    """Base class for all interfaces.

    Args:
        equipment: An [Equipment][] instance.
    """
    # __str__ and __repr__ can be called often for logging message, cache the values
    self._str: str = f"{self.__class__.__name__}<{equipment.manufacturer}|{equipment.model}|{equipment.serial}>"
    self._repr: str = self._str  # updated later

    self._equipment: Equipment = equipment

    if equipment.connection is None:
        msg = f"A Connection is not associated with {equipment}"
        raise TypeError(msg)

    self._repr = self._repr[:-1] + f" at {equipment.connection.address}>"
    logger.debug("Connecting to %r", self)

equipment property ¤

equipment: Equipment

The Equipment associated with the interface.

disconnect ¤

disconnect() -> None

Disconnect from the equipment.

This method can be overridden in the subclass if the subclass must implement tasks that need to be performed in order to safely disconnect from the equipment.

For example,

  • to clean up system resources from memory (e.g., if using a manufacturer's SDK)
  • to configure the equipment to be in a state that is safe for people working in the lab when the equipment is not in use

Tip

This method gets called automatically when the Interface instance gets garbage collected, which happens when the reference count is 0.

Source code in src/msl/equipment/schema.py
3297
3298
3299
3300
3301
3302
3303
3304
3305
3306
3307
3308
3309
3310
3311
3312
3313
def disconnect(self) -> None:
    """Disconnect from the equipment.

    This method can be overridden in the subclass if the subclass must implement
    tasks that need to be performed in order to safely disconnect from the equipment.

    For example,

    * to clean up system resources from memory (e.g., if using a manufacturer's SDK)
    * to configure the equipment to be in a state that is safe for people
      working in the lab when the equipment is not in use

    !!! tip
        This method gets called automatically when the [Interface][msl.equipment.schema.Interface]
        instance gets garbage collected, which happens when the reference count is 0.
    """
    logger.debug("Disconnected from %r", self)

MessageBased (Interface) ¤

MessageBased(equipment: Equipment)

Base class for equipment that use message-based communication.

Parameters:

Name Type Description Default
equipment Equipment

An Equipment instance.

required

A Connection instance supports the following properties for message-based communication.

Connection Properties:

Name Type Description
encoding str

Encoding to used for read and write operations. Default: utf-8

max_read_size int

Maximum number of bytes that can be read. Default: 1048576 (1 MB)

read_termination bytes | str

Termination character(s) to use for read messages. Default: \n

rstrip bool

Whether to remove trailing whitespace from read messages. Default: False

termination bytes | str

Sets both read_termination and write_termination to the same termination character(s).

timeout float | None

Timeout, in seconds, for read and write operations. Default: None

write_termination bytes | str

Termination character(s) to use for write messages. Default: \r\n

Source code in src/msl/equipment/interfaces/message_based.py
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
def __init__(self, equipment: Equipment) -> None:
    r"""Base class for equipment that use message-based communication.

    Args:
        equipment: An [Equipment][] instance.

    A [Connection][msl.equipment.schema.Connection] instance supports the following
    _properties_ for message-based communication.

    Attributes: Connection Properties:
        encoding (str): Encoding to used for
            [read][msl.equipment.interfaces.message_based.MessageBased.read] and
            [write][msl.equipment.interfaces.message_based.MessageBased.write] operations.
            _Default: `utf-8`_
        max_read_size (int): Maximum number of bytes that can be
            [read][msl.equipment.interfaces.message_based.MessageBased.read].
            _Default: `1048576` (1 MB)_
        read_termination (bytes | str): Termination character(s) to use for
            [read][msl.equipment.interfaces.message_based.MessageBased.read] messages.
            _Default: `\n`_
        rstrip (bool): Whether to remove trailing whitespace from
            [read][msl.equipment.interfaces.message_based.MessageBased.read] messages.
            _Default: `False`_
        termination (bytes | str): Sets both `read_termination` and `write_termination`
            to the same termination character(s).
        timeout (float | None): Timeout, in seconds, for
            [read][msl.equipment.interfaces.message_based.MessageBased.read] and
            [write][msl.equipment.interfaces.message_based.MessageBased.write] operations.
            _Default: `None`_
        write_termination (bytes | str): Termination character(s) to use for
            [write][msl.equipment.interfaces.message_based.MessageBased.write] messages.
            _Default: `\r\n`_
    """
    super().__init__(equipment)
    assert equipment.connection is not None  # noqa: S101

    self._encoding: str = "utf-8"
    self._read_termination: bytes | None = None
    self._write_termination: bytes | None = None
    self._max_read_size: int = 1048576  # 1 << 20 (1 MB)
    self._timeout: float | None = None
    self._rstrip: bool = False

    p = equipment.connection.properties

    self.max_read_size = p.get("max_read_size", self._max_read_size)
    self.timeout = p.get("timeout", self._timeout)
    self.encoding = p.get("encoding", self._encoding)
    self.rstrip = p.get("rstrip", self._rstrip)

    if "termination" in p:
        self.read_termination = p["termination"]
        self.write_termination = p["termination"]
    else:
        self.read_termination = p.get("read_termination", b"\n")
        self.write_termination = p.get("write_termination", b"\r\n")

encoding property writable ¤

encoding: str

The encoding that is used for read and write operations.

equipment property ¤

equipment: Equipment

The Equipment associated with the interface.

max_read_size property writable ¤

max_read_size: int

The maximum number of bytes that can be read.

read_termination property writable ¤

read_termination: bytes | None

The termination character sequence that is used for a read operation.

Reading stops when the equipment stops sending data or the read_termination character sequence is detected. If you set the read_termination to be equal to a variable of type str, it will be encoded as bytes.

rstrip property writable ¤

rstrip: bool

Whether to remove trailing whitespace from read messages.

timeout property writable ¤

timeout: float | None

The timeout, in seconds, for read and write operations.

A value <0 will set the timeout to be None (blocking mode).

write_termination property writable ¤

write_termination: bytes | None

The termination character sequence that is appended to write messages.

If you set the write_termination to be equal to a variable of type str, it will be encoded as bytes.

disconnect ¤

disconnect() -> None

Disconnect from the equipment.

This method can be overridden in the subclass if the subclass must implement tasks that need to be performed in order to safely disconnect from the equipment.

For example,

  • to clean up system resources from memory (e.g., if using a manufacturer's SDK)
  • to configure the equipment to be in a state that is safe for people working in the lab when the equipment is not in use

Tip

This method gets called automatically when the Interface instance gets garbage collected, which happens when the reference count is 0.

Source code in src/msl/equipment/schema.py
3297
3298
3299
3300
3301
3302
3303
3304
3305
3306
3307
3308
3309
3310
3311
3312
3313
def disconnect(self) -> None:
    """Disconnect from the equipment.

    This method can be overridden in the subclass if the subclass must implement
    tasks that need to be performed in order to safely disconnect from the equipment.

    For example,

    * to clean up system resources from memory (e.g., if using a manufacturer's SDK)
    * to configure the equipment to be in a state that is safe for people
      working in the lab when the equipment is not in use

    !!! tip
        This method gets called automatically when the [Interface][msl.equipment.schema.Interface]
        instance gets garbage collected, which happens when the reference count is 0.
    """
    logger.debug("Disconnected from %r", self)

query ¤

query(
    message: bytes | str,
    *,
    delay: float = 0.0,
    decode: Literal[True] = True,
    dtype: None = None,
    fmt: MessageFormat = ...,
    size: int | None = ...
) -> str
query(
    message: bytes | str,
    *,
    delay: float = 0.0,
    decode: Literal[False] = False,
    dtype: None = None,
    fmt: MessageFormat = ...,
    size: int | None = ...
) -> bytes
query(
    message: bytes | str,
    *,
    delay: float = 0.0,
    decode: bool = ...,
    dtype: MessageDataType = ...,
    fmt: MessageFormat = ...,
    size: int | None = ...
) -> NumpyArray1D
query(
    message: bytes | str,
    *,
    delay: float = 0.0,
    decode: bool = True,
    dtype: MessageDataType | None = None,
    fmt: MessageFormat = None,
    size: int | None = None
) -> bytes | str | NumpyArray1D

Convenience method for performing a write followed by a read.

Parameters:

Name Type Description Default
message bytes | str

The message to write to the equipment.

required
delay float

Time delay, in seconds, to wait between the write and read operations.

0.0
decode bool

Whether to decode the returned message (i.e., convert the message to a str) or keep the message as bytes. Ignored if dtype is not None.

True
dtype MessageDataType | None

The data type of the elements in the returned message. Can be any object that numpy dtype supports. For messages that are of scalar type (i.e., a single number) it is more efficient to not specify dtype but to pass the returned message to the int or float class to convert the message to the appropriate numeric type. See MessageDataType for more details.

None
fmt MessageFormat

The format that the returned message data is in. Ignored if dtype is None. See MessageFormat for more details.

None
size int | None

The number of bytes to read. Ignored if the value is None.

None

Returns:

Type Description
bytes | str | NumpyArray1D

The message from the equipment. If dtype is specified, then the message is returned as a numpy ndarray, if decode is True then the message is returned as a str, otherwise the message is returned as bytes.

Source code in src/msl/equipment/interfaces/message_based.py
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
def query(
    self,
    message: bytes | str,
    *,
    delay: float = 0.0,
    decode: bool = True,
    dtype: MessageDataType | None = None,
    fmt: MessageFormat = None,
    size: int | None = None,
) -> bytes | str | NumpyArray1D:
    """Convenience method for performing a [write][msl.equipment.interfaces.message_based.MessageBased.write]
    followed by a [read][msl.equipment.interfaces.message_based.MessageBased.read].

    Args:
        message: The message to write to the equipment.
        delay: Time delay, in seconds, to wait between the _write_ and _read_ operations.
        decode: Whether to decode the returned message (i.e., convert the message to a [str][])
            or keep the message as [bytes][]. Ignored if `dtype` is not `None`.
        dtype: The data type of the elements in the returned message. Can be any object that numpy
            [dtype][numpy.dtype] supports. For messages that are of scalar type (i.e., a single number)
            it is more efficient to not specify `dtype` but to pass the returned message to the
            [int][] or [float][] class to convert the message to the appropriate numeric type.
            See [MessageDataType][msl.equipment._types.MessageDataType] for more details.
        fmt: The format that the returned message data is in. Ignored if `dtype` is `None`.
            See [MessageFormat][msl.equipment._types.MessageFormat] for more details.
        size: The number of bytes to read. Ignored if the value is `None`.

    Returns:
        The message from the equipment. If `dtype` is specified, then the message is
            returned as a numpy [ndarray][numpy.ndarray], if `decode` is `True` then the message
            is returned as a [str][], otherwise the message is returned as [bytes][].
    """  # noqa: D205
    _ = self.write(message)
    if delay > 0:
        time.sleep(delay)
    if dtype:
        return self.read(dtype=dtype, fmt=fmt, size=size)
    return self.read(decode=decode, size=size)

read ¤

read(
    *,
    decode: Literal[True] = True,
    dtype: None = None,
    fmt: MessageFormat = ...,
    size: int | None = ...
) -> str
read(
    *,
    decode: Literal[False] = False,
    dtype: None = None,
    fmt: MessageFormat = ...,
    size: int | None = ...
) -> bytes
read(
    *,
    decode: bool = ...,
    dtype: MessageDataType = ...,
    fmt: MessageFormat = ...,
    size: int | None = ...
) -> NumpyArray1D
read(
    *,
    decode: bool = True,
    dtype: MessageDataType | None = None,
    fmt: MessageFormat = None,
    size: int | None = None
) -> bytes | str | NumpyArray1D

Read a message from the equipment.

This method will block until one of the following conditions is fulfilled:

  1. size bytes have been received — only if size is not None.
  2. the read_termination byte(s) is(are) received — only if read_termination is not None.
  3. a timeout occurs — only if timeout is not None. If a timeout occurs, an MSLTimeoutError is raised.
  4. max_read_size bytes have been received. If the maximum number of bytes have been read, an MSLConnectionError is raised.

Tip

You may also want to set the rstrip value for the class instance.

Parameters:

Name Type Description Default
decode bool

Whether to decode the message (i.e., convert the message to a str) or keep the message as bytes. Ignored if dtype is not None.

True
dtype MessageDataType | None

The data type of the elements in the returned message. Can be any object that numpy dtype supports. For messages that are of scalar type (i.e., a single number) it is more efficient to not specify dtype but to pass the returned message to the int or float class to convert the message to the appropriate numeric type. See MessageDataType for more details.

None
fmt MessageFormat

The format that the returned message data is in. Ignored if dtype is None. See MessageFormat for more details.

None
size int | None

The number of bytes to read. Ignored if the value is None.

None

Returns:

Type Description
bytes | str | NumpyArray1D

The message from the equipment. If dtype is specified, then the message is returned as a numpy ndarray, if decode is True then the message is returned as a str, otherwise the message is returned as bytes.

Source code in src/msl/equipment/interfaces/message_based.py
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
def read(
    self,
    *,
    decode: bool = True,
    dtype: MessageDataType | None = None,
    fmt: MessageFormat = None,
    size: int | None = None,
) -> bytes | str | NumpyArray1D:
    """Read a message from the equipment.

    This method will block until one of the following conditions is fulfilled:

    1. `size` bytes have been received &mdash; only if `size` is not `None`.
    2. the [read_termination][msl.equipment.interfaces.message_based.MessageBased.read_termination]
       byte(s) is(are) received &mdash; only if
       [read_termination][msl.equipment.interfaces.message_based.MessageBased.read_termination]
       is not `None`.
    3. a timeout occurs &mdash; only if [timeout][msl.equipment.interfaces.message_based.MessageBased.timeout]
       is not `None`. If a timeout occurs, an
       [MSLTimeoutError][msl.equipment.interfaces.message_based.MSLTimeoutError] is raised.
    4. [max_read_size][msl.equipment.interfaces.message_based.MessageBased.max_read_size]
       bytes have been received. If the maximum number of bytes have been read, an
       [MSLConnectionError][msl.equipment.interfaces.message_based.MSLConnectionError] is raised.

    !!! tip
        You may also want to set the [rstrip][msl.equipment.interfaces.message_based.MessageBased.rstrip]
        value for the class instance.

    Args:
        decode: Whether to decode the message (i.e., convert the message to a [str][])
            or keep the message as [bytes][]. Ignored if `dtype` is not `None`.
        dtype: The data type of the elements in the returned message. Can be any object that numpy
            [dtype][numpy.dtype] supports. For messages that are of scalar type (i.e., a single number)
            it is more efficient to not specify `dtype` but to pass the returned message to the
            [int][] or [float][] class to convert the message to the appropriate numeric type.
            See [MessageDataType][msl.equipment._types.MessageDataType] for more details.
        fmt: The format that the returned message data is in. Ignored if `dtype` is `None`.
            See [MessageFormat][msl.equipment._types.MessageFormat] for more details.
        size: The number of bytes to read. Ignored if the value is `None`.

    Returns:
        The message from the equipment. If `dtype` is specified, then the message is returned
            as a numpy [ndarray][numpy.ndarray], if `decode` is `True` then the message
            is returned as a [str][], otherwise the message is returned as [bytes][].
    """
    if size is not None and size > self._max_read_size:
        msg = f"max_read_size is {self._max_read_size} bytes, requesting {size} bytes"
        raise MSLConnectionError(self, msg)

    try:
        message = self._read(size)
    except (serial.SerialTimeoutException, socket.timeout, TimeoutError, USBTimeoutError):
        raise MSLTimeoutError(self) from None
    except Exception as e:  # noqa: BLE001
        msg = f"{e.__class__.__name__}: {e}"
        raise MSLConnectionError(self, msg) from None

    if size is None:
        if dtype:
            logger.debug("%s.read(dtype=%r, fmt=%r) -> %r", self, dtype, fmt, message)
        else:
            logger.debug("%s.read() -> %r", self, message)
    else:
        if len(message) != size:
            msg = f"received {len(message)} bytes, requested {size} bytes"
            raise MSLConnectionError(self, msg)
        logger.debug("%s.read(size=%s) -> %r", self, size, message)

    if self._rstrip:
        message = message.rstrip()

    if dtype:
        return from_bytes(message, fmt=fmt, dtype=dtype)

    if decode:
        return message.decode(encoding=self._encoding)

    return message

write ¤

write(
    message: bytes | str,
    *,
    data: Sequence1D | None = None,
    dtype: MessageDataType = "<f",
    fmt: MessageFormat = "ieee"
) -> int

Write a message to the equipment.

Parameters:

Name Type Description Default
message bytes | str

The message to write to the equipment.

required
data Sequence1D | None

The data to append to message.

None
dtype MessageDataType

The data type to use to convert each element in data to bytes. Ignored if data is None. See MessageDataType for more details.

'<f'
fmt MessageFormat

The format to use to convert data to bytes. Ignored if data is None. See MessageFormat for more details.

'ieee'

Returns:

Type Description
int

The number of bytes written.

Source code in src/msl/equipment/interfaces/message_based.py
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
def write(
    self,
    message: bytes | str,
    *,
    data: Sequence1D | None = None,
    dtype: MessageDataType = "<f",
    fmt: MessageFormat = "ieee",
) -> int:
    """Write a message to the equipment.

    Args:
        message: The message to write to the equipment.
        data: The data to append to `message`.
        dtype: The data type to use to convert each element in `data` to bytes. Ignored
            if `data` is `None`. See [MessageDataType][msl.equipment._types.MessageDataType]
            for more details.
        fmt: The format to use to convert `data` to bytes. Ignored if `data` is `None`.
            See [MessageFormat][msl.equipment._types.MessageFormat] for more details.

    Returns:
        The number of bytes written.
    """
    if not isinstance(message, bytes):
        message = message.encode(encoding=self._encoding)

    if data is not None:
        message += to_bytes(data, fmt=fmt, dtype=dtype)

    if self._write_termination and not message.endswith(self._write_termination):
        message += self._write_termination

    logger.debug("%s.write(%r)", self, message)

    try:
        return self._write(message)
    except (serial.SerialTimeoutException, socket.timeout, TimeoutError, USBTimeoutError):
        raise MSLTimeoutError(self) from None
    except Exception as e:  # noqa: BLE001
        raise MSLConnectionError(self, str(e)) from None