Skip to content

interfaces¤

Interface classes for computer control.

Generic interface classes

  • Interface — Base class for all interfaces
  • MessageBased — Base class for all message-based interfaces

Specific interfaces

  • GPIB — Base class for GPIB communication
  • HiSLIP — Base class for the HiSLIP communication protocol
  • Prologix — Use Prologix hardware to establish a connection
  • SDK — Base class for equipment that use the manufacturer's Software Development Kit
  • Serial — Base class for equipment that is connected through a serial port
  • Socket — Base class for equipment that is connected through a socket
  • VXI11 — Base class for the VXI-11 communication protocol
  • ZeroMQ — Base class for equipment that use the ZeroMQ communication protocol

Backend (external package) interfaces

  • NIDAQ — Use NI-DAQmx as the backend to communicate with the equipment
  • PyVISA — Use PyVISA as the backend to communicate with the equipment

Interface ¤

Interface(equipment: Equipment)

Base class for all interfaces.

Parameters:

Name Type Description Default
equipment Equipment

An Equipment instance.

required
Source code in src/msl/equipment/schema.py
2976
2977
2978
2979
2980
2981
2982
2983
2984
2985
2986
2987
2988
2989
2990
2991
2992
def __init__(self, equipment: Equipment) -> None:
    """Base class for all interfaces.

    Args:
        equipment: An [Equipment][] instance.
    """
    assert equipment.connection is not None  # noqa: S101
    self._equipment: Equipment = equipment

    # __str__ and __repr__ can be called often for logging message, cache values
    self.__str: str = f"{self.__class__.__name__}<{equipment.manufacturer}|{equipment.model}|{equipment.serial}>"
    self.__repr: str = (
        f"{self.__class__.__name__}"
        f"<{equipment.manufacturer}|{equipment.model}|{equipment.serial} at {equipment.connection.address}>"
    )

    logger.debug("Connecting to %r", self)

equipment property ¤

equipment: Equipment

The Equipment associated with the interface.

disconnect ¤

disconnect() -> None

Disconnect from the equipment.

This method can be overridden in the subclass if the subclass must implement tasks that need to be performed in order to safely disconnect from the equipment.

For example,

  • to clean up system resources from memory (e.g., if using a manufacturer's SDK)
  • to configure the equipment to be in a state that is safe for people working in the lab when the equipment is not in use

Tip

This method gets called automatically when the Interface instance gets garbage collected, which happens when the reference count is 0.

Source code in src/msl/equipment/schema.py
3053
3054
3055
3056
3057
3058
3059
3060
3061
3062
3063
3064
3065
3066
3067
3068
3069
def disconnect(self) -> None:
    """Disconnect from the equipment.

    This method can be overridden in the subclass if the subclass must implement
    tasks that need to be performed in order to safely disconnect from the equipment.

    For example,

    * to clean up system resources from memory (e.g., if using a manufacturer's SDK)
    * to configure the equipment to be in a state that is safe for people
      working in the lab when the equipment is not in use

    !!! tip
        This method gets called automatically when the [Interface][msl.equipment.schema.Interface]
        instance gets garbage collected, which happens when the reference count is 0.
    """
    logger.debug("Disconnected from %r", self)

MessageBased ¤

MessageBased(equipment: Equipment)

Bases: Interface

Base class for equipment that use message-based communication.

Parameters:

Name Type Description Default
equipment Equipment

An Equipment instance.

required

A Connection instance supports the following properties for message-based communication.

Connection Properties:

Name Type Description
encoding str

Encoding to used for read and write operations. Default: utf-8

max_read_size int

Maximum number of bytes that can be read. Default: 1048576 (1 MB)

read_termination bytes | str

Termination character(s) to use for read messages. Default: \n

rstrip bool

Whether to remove trailing whitespace from read messages. Default: False

termination bytes | str

Sets both read_termination and write_termination to the same termination character(s).

timeout float | None

Timeout, in seconds, for read and write operations. Default: None

write_termination bytes | str

Termination character(s) to use for write messages. Default: \r\n

Source code in src/msl/equipment/interfaces/message_based.py
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
def __init__(self, equipment: Equipment) -> None:
    r"""Base class for equipment that use message-based communication.

    Args:
        equipment: An [Equipment][] instance.

    A [Connection][msl.equipment.schema.Connection] instance supports the following
    _properties_ for message-based communication.

    Attributes: Connection Properties:
        encoding (str): Encoding to used for
            [read][msl.equipment.interfaces.message_based.MessageBased.read] and
            [write][msl.equipment.interfaces.message_based.MessageBased.write] operations.
            _Default: `utf-8`_
        max_read_size (int): Maximum number of bytes that can be
            [read][msl.equipment.interfaces.message_based.MessageBased.read].
            _Default: `1048576` (1 MB)_
        read_termination (bytes | str): Termination character(s) to use for
            [read][msl.equipment.interfaces.message_based.MessageBased.read] messages.
            _Default: `\n`_
        rstrip (bool): Whether to remove trailing whitespace from
            [read][msl.equipment.interfaces.message_based.MessageBased.read] messages.
            _Default: `False`_
        termination (bytes | str): Sets both `read_termination` and `write_termination`
            to the same termination character(s).
        timeout (float | None): Timeout, in seconds, for
            [read][msl.equipment.interfaces.message_based.MessageBased.read] and
            [write][msl.equipment.interfaces.message_based.MessageBased.write] operations.
            _Default: `None`_
        write_termination (bytes | str): Termination character(s) to use for
            [write][msl.equipment.interfaces.message_based.MessageBased.write] messages.
            _Default: `\r\n`_
    """
    super().__init__(equipment)
    assert equipment.connection is not None  # noqa: S101

    self._encoding: str = "utf-8"
    self._read_termination: bytes | None = None
    self._write_termination: bytes | None = None
    self._max_read_size: int = 1048576  # 1 << 20 (1 MB)
    self._timeout: float | None = None
    self._rstrip: bool = False

    p = equipment.connection.properties

    self.max_read_size = p.get("max_read_size", self._max_read_size)
    self.timeout = p.get("timeout", self._timeout)
    self.encoding = p.get("encoding", self._encoding)
    self.rstrip = p.get("rstrip", self._rstrip)

    if "termination" in p:
        self.read_termination = p["termination"]
        self.write_termination = p["termination"]
    else:
        self.read_termination = p.get("read_termination", b"\n")
        self.write_termination = p.get("write_termination", b"\r\n")

encoding property writable ¤

encoding: str

The encoding that is used for read and write operations.

equipment property ¤

equipment: Equipment

The Equipment associated with the interface.

max_read_size property writable ¤

max_read_size: int

The maximum number of bytes that can be read.

read_termination property writable ¤

read_termination: bytes | None

The termination character sequence that is used for a read operation.

Reading stops when the equipment stops sending data or the read_termination character sequence is detected. If you set the read_termination to be equal to a variable of type str, it will be encoded as bytes.

rstrip property writable ¤

rstrip: bool

Whether to remove trailing whitespace from read messages.

timeout property writable ¤

timeout: float | None

The timeout, in seconds, for read and write operations.

A value <0 will set the timeout to be None (blocking mode).

write_termination property writable ¤

write_termination: bytes | None

The termination character sequence that is appended to write messages.

If you set the write_termination to be equal to a variable of type str, it will be encoded as bytes.

disconnect ¤

disconnect() -> None

Disconnect from the equipment.

This method can be overridden in the subclass if the subclass must implement tasks that need to be performed in order to safely disconnect from the equipment.

For example,

  • to clean up system resources from memory (e.g., if using a manufacturer's SDK)
  • to configure the equipment to be in a state that is safe for people working in the lab when the equipment is not in use

Tip

This method gets called automatically when the Interface instance gets garbage collected, which happens when the reference count is 0.

Source code in src/msl/equipment/schema.py
3053
3054
3055
3056
3057
3058
3059
3060
3061
3062
3063
3064
3065
3066
3067
3068
3069
def disconnect(self) -> None:
    """Disconnect from the equipment.

    This method can be overridden in the subclass if the subclass must implement
    tasks that need to be performed in order to safely disconnect from the equipment.

    For example,

    * to clean up system resources from memory (e.g., if using a manufacturer's SDK)
    * to configure the equipment to be in a state that is safe for people
      working in the lab when the equipment is not in use

    !!! tip
        This method gets called automatically when the [Interface][msl.equipment.schema.Interface]
        instance gets garbage collected, which happens when the reference count is 0.
    """
    logger.debug("Disconnected from %r", self)

query ¤

query(
    message: bytes | str,
    *,
    delay: float = 0.0,
    decode: Literal[True] = True,
    dtype: None = None,
    fmt: MessageFormat | None = ...,
    size: int | None = ...
) -> str
query(
    message: bytes | str,
    *,
    delay: float = 0.0,
    decode: Literal[False] = False,
    dtype: None = None,
    fmt: MessageFormat | None = ...,
    size: int | None = ...
) -> bytes
query(
    message: bytes | str,
    *,
    delay: float = 0.0,
    decode: bool = ...,
    dtype: MessageDataType = ...,
    fmt: MessageFormat | None = ...,
    size: int | None = ...
) -> NumpyArray1D
query(
    message: bytes | str,
    *,
    delay: float = 0.0,
    decode: bool = True,
    dtype: MessageDataType | None = None,
    fmt: MessageFormat | None = None,
    size: int | None = None
) -> bytes | str | NumpyArray1D

Convenience method for performing a write followed by a read.

Parameters:

Name Type Description Default
message bytes | str

The message to write to the equipment.

required
delay float

Time delay, in seconds, to wait between the write and read operations.

0.0
decode bool

Whether to decode the returned message (i.e., convert the message to a str) or keep the message as bytes. Ignored if dtype is not None.

True
dtype MessageDataType | None

The data type of the elements in the returned message. Can be any object that numpy dtype supports. See from_bytes for more details. For messages that are of scalar type (i.e., a single number) it is more efficient to not specify dtype but to pass the returned message to the int or float class to convert the message to the appropriate numeric type.

None
fmt MessageFormat | None

The format that the returned message data is in. Ignored if dtype is None. See from_bytes for more details.

None
size int | None

The number of bytes to read. Ignored if the value is None.

None

Returns:

Type Description
bytes | str | NumpyArray1D

The message from the equipment. If dtype is specified, then the message is returned as a numpy ndarray, if decode is True then the message is returned as a str, otherwise the message is returned as bytes.

Source code in src/msl/equipment/interfaces/message_based.py
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
def query(  # noqa: PLR0913
    self,
    message: bytes | str,
    *,
    delay: float = 0.0,
    decode: bool = True,
    dtype: MessageDataType | None = None,
    fmt: MessageFormat | None = None,
    size: int | None = None,
) -> bytes | str | NumpyArray1D:
    """Convenience method for performing a [write][msl.equipment.interfaces.message_based.MessageBased.write]
    followed by a [read][msl.equipment.interfaces.message_based.MessageBased.read].

    Args:
        message: The message to write to the equipment.
        delay: Time delay, in seconds, to wait between the _write_ and _read_ operations.
        decode: Whether to decode the returned message (i.e., convert the message to a [str][])
            or keep the message as [bytes][]. Ignored if `dtype` is not `None`.
        dtype: The data type of the elements in the returned message. Can be any object
            that numpy [dtype][numpy.dtype] supports. See [from_bytes][msl.equipment.utils.from_bytes]
            for more details. For messages that are of scalar type (i.e., a single number)
            it is more efficient to not specify `dtype` but to pass the returned message to the
            [int][] or [float][] class to convert the message to the appropriate numeric type.
        fmt: The format that the returned message data is in. Ignored if `dtype` is `None`.
             See [from_bytes][msl.equipment.utils.from_bytes] for more details.
        size: The number of bytes to read. Ignored if the value is `None`.

    Returns:
        The message from the equipment. If `dtype` is specified, then the message is
            returned as a numpy [ndarray][numpy.ndarray], if `decode` is `True` then the message
            is returned as a [str][], otherwise the message is returned as [bytes][].
    """  # noqa: D205
    _ = self.write(message)
    if delay > 0:
        time.sleep(delay)
    if dtype:
        return self.read(dtype=dtype, fmt=fmt, size=size)
    return self.read(decode=decode, size=size)

read ¤

read(
    *,
    decode: Literal[True] = True,
    dtype: None = None,
    fmt: MessageFormat | None = ...,
    size: int | None = ...
) -> str
read(
    *,
    decode: Literal[False] = False,
    dtype: None = None,
    fmt: MessageFormat | None = ...,
    size: int | None = ...
) -> bytes
read(
    *,
    decode: bool = ...,
    dtype: MessageDataType = ...,
    fmt: MessageFormat | None = ...,
    size: int | None = ...
) -> NumpyArray1D
read(
    *,
    decode: bool = True,
    dtype: MessageDataType | None = None,
    fmt: MessageFormat | None = None,
    size: int | None = None
) -> bytes | str | NumpyArray1D

Read a message from the equipment.

This method will block until one of the following conditions is fulfilled:

  1. size bytes have been received — only if size is not None.
  2. the read_termination byte(s) is(are) received — only if read_termination is not None.
  3. a timeout occurs — only if timeout is not None. If a timeout occurs, an MSLTimeoutError is raised.
  4. max_read_size bytes have been received. If the maximum number of bytes have been read, an MSLConnectionError is raised.

Tip

You may also want to set the rstrip value for the class instance.

Parameters:

Name Type Description Default
decode bool

Whether to decode the message (i.e., convert the message to a str) or keep the message as bytes. Ignored if dtype is not None.

True
dtype MessageDataType | None

The data type of the elements in the message. Can be any object that numpy dtype supports. See from_bytes for more details. For messages that are of scalar type (i.e., a single number) it is more efficient to not specify dtype but to pass the returned message to the int or float class to convert the message to the appropriate numeric type.

None
fmt MessageFormat | None

The format that the message data is in. Ignored if dtype is None. See from_bytes for more details.

None
size int | None

The number of bytes to read. Ignored if the value is None.

None

Returns:

Type Description
bytes | str | NumpyArray1D

The message from the equipment. If dtype is specified, then the message is returned as a numpy ndarray, if decode is True then the message is returned as a str, otherwise the message is returned as bytes.

Source code in src/msl/equipment/interfaces/message_based.py
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
def read(
    self,
    *,
    decode: bool = True,
    dtype: MessageDataType | None = None,
    fmt: MessageFormat | None = None,
    size: int | None = None,
) -> bytes | str | NumpyArray1D:
    """Read a message from the equipment.

    This method will block until one of the following conditions is fulfilled:

    1. `size` bytes have been received &mdash; only if `size` is not `None`.
    2. the [read_termination][msl.equipment.interfaces.message_based.MessageBased.read_termination]
       byte(s) is(are) received &mdash; only if
       [read_termination][msl.equipment.interfaces.message_based.MessageBased.read_termination]
       is not `None`.
    3. a timeout occurs &mdash; only if [timeout][msl.equipment.interfaces.message_based.MessageBased.timeout]
       is not `None`. If a timeout occurs, an
       [MSLTimeoutError][msl.equipment.interfaces.message_based.MSLTimeoutError] is raised.
    4. [max_read_size][msl.equipment.interfaces.message_based.MessageBased.max_read_size]
       bytes have been received. If the maximum number of bytes have been read, an
       [MSLConnectionError][msl.equipment.interfaces.message_based.MSLConnectionError] is raised.

    !!! tip
        You may also want to set the [rstrip][msl.equipment.interfaces.message_based.MessageBased.rstrip]
        value for the class instance.

    Args:
        decode: Whether to decode the message (i.e., convert the message to a [str][])
            or keep the message as [bytes][]. Ignored if `dtype` is not `None`.
        dtype: The data type of the elements in the message. Can be any object
            that numpy [dtype][numpy.dtype] supports. See [from_bytes][msl.equipment.utils.from_bytes]
            for more details. For messages that are of scalar type (i.e., a single number)
            it is more efficient to not specify `dtype` but to pass the returned message to the
            [int][] or [float][] class to convert the message to the appropriate numeric type.
        fmt: The format that the message data is in. Ignored if `dtype` is `None`.
             See [from_bytes][msl.equipment.utils.from_bytes] for more details.
        size: The number of bytes to read. Ignored if the value is `None`.

    Returns:
        The message from the equipment. If `dtype` is specified, then the message is returned
            as a numpy [ndarray][numpy.ndarray], if `decode` is `True` then the message
            is returned as a [str][], otherwise the message is returned as [bytes][].
    """
    if size is not None and size > self._max_read_size:
        msg = f"max_read_size is {self._max_read_size} bytes, requesting {size} bytes"
        raise MSLConnectionError(self, msg)

    try:
        message = self._read(size)
    except (serial.SerialTimeoutException, socket.timeout, TimeoutError):
        raise MSLTimeoutError(self) from None
    except Exception as e:  # noqa: BLE001
        msg = f"{e.__class__.__name__}: {e}"
        raise MSLConnectionError(self, msg) from None

    if size is None:
        if dtype:
            logger.debug("%s.read(dtype=%r, fmt=%r) -> %r", self, dtype, fmt, message)
        else:
            logger.debug("%s.read() -> %r", self, message)
    else:
        if len(message) != size:
            msg = f"received {len(message)} bytes, requested {size} bytes"
            raise MSLConnectionError(self, msg)
        logger.debug("%s.read(size=%s) -> %r", self, size, message)

    if self._rstrip:
        message = message.rstrip()

    if dtype:
        return from_bytes(message, fmt=fmt, dtype=dtype)

    if decode:
        return message.decode(encoding=self._encoding)

    return message

write ¤

write(
    message: bytes | str,
    *,
    data: Sequence1D | None = None,
    dtype: MessageDataType = "<f",
    fmt: MessageFormat | None = "ieee"
) -> int

Write a message to the equipment.

Parameters:

Name Type Description Default
message bytes | str

The message to write to the equipment.

required
data Sequence1D | None

The data to append to message. See to_bytes for more details.

None
dtype MessageDataType

The data type to use to convert each element in data to bytes. Ignored if data is None. See to_bytes for more details.

'<f'
fmt MessageFormat | None

The format to use to convert data to bytes. Ignored if data is None. See to_bytes for more details.

'ieee'

Returns:

Type Description
int

The number of bytes written.

Source code in src/msl/equipment/interfaces/message_based.py
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
def write(
    self,
    message: bytes | str,
    *,
    data: Sequence1D | None = None,
    dtype: MessageDataType = "<f",
    fmt: MessageFormat | None = "ieee",
) -> int:
    """Write a message to the equipment.

    Args:
        message: The message to write to the equipment.
        data: The data to append to `message`. See [to_bytes][msl.equipment.utils.to_bytes]
            for more details.
        dtype: The data type to use to convert each element in `data` to bytes. Ignored
            if `data` is `None`. See [to_bytes][msl.equipment.utils.to_bytes] for more details.
        fmt: The format to use to convert `data` to bytes. Ignored if `data` is `None`.
            See [to_bytes][msl.equipment.utils.to_bytes] for more details.

    Returns:
        The number of bytes written.
    """
    if not isinstance(message, bytes):
        message = message.encode(encoding=self._encoding)

    if data is not None:
        message += to_bytes(data, fmt=fmt, dtype=dtype)

    if self._write_termination and not message.endswith(self._write_termination):
        message += self._write_termination

    logger.debug("%s.write(%r)", self, message)

    try:
        return self._write(message)
    except (serial.SerialTimeoutException, socket.timeout, TimeoutError):
        raise MSLTimeoutError(self) from None
    except Exception as e:  # noqa: BLE001
        raise MSLConnectionError(self, str(e)) from None

GPIB ¤

GPIB(equipment: Equipment)

Bases: MessageBased

Base class for GPIB communication.

Parameters:

Name Type Description Default
equipment Equipment

An Equipment instance.

required

A Connection instance supports the following properties for the GPIB communication protocol, as well as the properties defined in MessageBased.

Connection Properties:

Name Type Description
eos_mode int

The end-of-string mode. Default: 0

send_eoi bool

Whether to enable (True) or disable (False) the assertion of the EOI signal. Default: True

Source code in src/msl/equipment/interfaces/gpib.py
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
def __init__(self, equipment: Equipment) -> None:
    """Base class for GPIB communication.

    Args:
        equipment: An [Equipment][] instance.

    A [Connection][msl.equipment.schema.Connection] instance supports the following _properties_
    for the GPIB communication protocol, as well as the _properties_ defined in
    [MessageBased][msl.equipment.interfaces.message_based.MessageBased].

    Attributes: Connection Properties:
        eos_mode (int): The end-of-string mode. _Default: `0`_
        send_eoi (bool): Whether to enable (`True`) or disable (`False`) the assertion of the EOI signal.
            _Default: `True`_
    """
    self._own: bool = True
    self._handle: int = -1

    assert equipment.connection is not None  # noqa: S101
    info = parse_gpib_address(equipment.connection.address)
    if not info:
        msg = f"Invalid GPIB address {equipment.connection.address!r}"
        raise ValueError(msg)

    props = equipment.connection.properties
    _ = props.setdefault("read_termination", None)

    _load_library()
    assert _gpib_library is not None  # noqa: S101
    self._lib: Any = _gpib_library.lib

    if info.name:
        # a board or device object from a name in a gpib.conf file
        self._handle = self._get_ibfind_handle(info.name)
    elif info.pad is None:
        # a board object with the given board number
        self._handle = info.board
        self._own = False
    else:
        # a device object
        send_eoi = int(props.get("send_eoi", 1))
        eos_mode = int(props.get("eos_mode", 0))
        sad = 0 if info.sad is None else info.sad
        if sad != 0 and sad < 0x60:  # noqa: PLR2004
            # NI's unfortunate convention of adding 0x60 to secondary addresses
            sad += 0x60
        info.sad = sad
        timeout = _convert_timeout(props.get("timeout", None))
        self._handle = self._get_ibdev_handle(info.board, info.pad, sad, timeout, send_eoi, eos_mode)

    # keep this reference assignment after the if/else condition since the
    # value of the secondary address may have been updated
    self._address_info: ParsedGPIBAddress = info

    # check if the handle corresponds to a system controller (INTFC)
    self._is_board: bool
    try:
        self._is_board = bool(self.ask(0xA))  # IbaSC = 0xa
    except GPIBLibraryError:
        # asking IbaSC for a GPIB device raises EHDL error
        self._is_board = False

    super().__init__(equipment)

encoding property writable ¤

encoding: str

The encoding that is used for read and write operations.

equipment property ¤

equipment: Equipment

The Equipment associated with the interface.

handle property ¤

handle: int

Returns the handle of the instantiated board or device.

library_path property ¤

library_path: str

Returns the path to the GPIB library.

max_read_size property writable ¤

max_read_size: int

The maximum number of bytes that can be read.

read_termination property writable ¤

read_termination: bytes | None

The termination character sequence that is used for the read method.

By default, reading stops when the EOI line is asserted.

rstrip property writable ¤

rstrip: bool

Whether to remove trailing whitespace from read messages.

timeout property writable ¤

timeout: float | None

The timeout, in seconds, for read and write operations.

A value <0 will set the timeout to be None (blocking mode).

write_termination property writable ¤

write_termination: bytes | None

The termination character sequence that is appended to write messages.

If you set the write_termination to be equal to a variable of type str, it will be encoded as bytes.

ask ¤

ask(option: int, *, handle: int | None = None) -> int

Get a configuration setting (board or device).

This method is the ibask function, it should not be confused with the query method.

Parameters:

Name Type Description Default
option int

A configuration setting to get the value of.

required
handle int | None

Board or device descriptor. Default is the handle of the instantiated class.

None

Returns:

Type Description
int

The value of the configuration setting.

Source code in src/msl/equipment/interfaces/gpib.py
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
def ask(self, option: int, *, handle: int | None = None) -> int:
    """Get a configuration setting (board or device).

    This method is the [ibask](https://linux-gpib.sourceforge.io/doc_html/reference-function-ibask.html)
    function, it should not be confused with the [query][msl.equipment.interfaces.message_based.MessageBased.query]
    method.

    Args:
        option: A configuration setting to get the value of.
        handle: Board or device descriptor. Default is the handle of the instantiated class.

    Returns:
        The value of the configuration setting.
    """
    if handle is None:
        handle = self._handle
    setting = c_int()
    self._lib.ibask(handle, option, byref(setting))
    return setting.value

clear ¤

clear(*, handle: int | None = None) -> int

Send the clear command (device).

This method is the ibclr function.

Parameters:

Name Type Description Default
handle int | None

Board or device descriptor. Default is the handle of the instantiated class.

None

Returns:

Type Description
int

The status value (ibsta).

Source code in src/msl/equipment/interfaces/gpib.py
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
def clear(self, *, handle: int | None = None) -> int:
    """Send the clear command (device).

    This method is the [ibclr](https://linux-gpib.sourceforge.io/doc_html/reference-function-ibclr.html) function.

    Args:
        handle: Board or device descriptor. Default is the handle of the instantiated class.

    Returns:
        The status value (`ibsta`).
    """
    if handle is None:
        handle = self._handle
    ibsta: int = self._lib.ibclr(handle)
    return ibsta

command ¤

command(data: bytes, *, handle: int | None = None) -> int

Write command bytes (board).

This method is the ibcmd function.

Parameters:

Name Type Description Default
data bytes

The commands to write to the bus.

required
handle int | None

Board or device descriptor. Default is the handle of the instantiated class.

None

Returns:

Type Description
int

The status value (ibsta).

Source code in src/msl/equipment/interfaces/gpib.py
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
def command(self, data: bytes, *, handle: int | None = None) -> int:
    """Write command bytes (board).

    This method is the [ibcmd](https://linux-gpib.sourceforge.io/doc_html/reference-function-ibcmd.html) function.

    Args:
        data: The [commands](https://linux-gpib.sourceforge.io/doc_html/gpib-protocol.html#REFERENCE-COMMAND-BYTES)
            to write to the bus.
        handle: Board or device descriptor. Default is the handle of the instantiated class.

    Returns:
        The status value (`ibsta`).
    """
    if handle is None:
        handle = self._handle
    ibsta: int = self._lib.ibcmd(handle, data, len(data))
    return ibsta

config ¤

config(
    option: int, value: int, *, handle: int | None = None
) -> int

Change configuration settings (board or device).

This method is the ibconfig function.

Parameters:

Name Type Description Default
option int

A configuration setting to change the value of.

required
value int

The new configuration setting value.

required
handle int | None

Board or device descriptor. Default is the handle of the instantiated class.

None

Returns:

Type Description
int

The status value (ibsta).

Source code in src/msl/equipment/interfaces/gpib.py
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
def config(self, option: int, value: int, *, handle: int | None = None) -> int:
    """Change configuration settings (board or device).

    This method is the [ibconfig](https://linux-gpib.sourceforge.io/doc_html/reference-function-ibconfig.html)
    function.

    Args:
        option: A configuration setting to change the value of.
        value: The new configuration setting value.
        handle: Board or device descriptor. Default is the handle of the instantiated class.

    Returns:
        The status value (`ibsta`).
    """
    if handle is None:
        handle = self._handle
    ibsta: int = self._lib.ibconfig(handle, option, value)
    return ibsta

control_atn ¤

control_atn(
    state: int, *, handle: int | None = None
) -> int

Set the state of the ATN line (board).

This method mimics the PyVISA-py implementation.

Parameters:

Name Type Description Default
state int

The state of the ATN line or the active controller. Allowed values are:

  • 0: ATN_DEASSERT
  • 1: ATN_ASSERT
  • 2: ATN_DEASSERT_HANDSHAKE
  • 3: ATN_ASSERT_IMMEDIATE
required

Parameters:

Name Type Description Default
handle int | None

Board or device descriptor. Default is the handle of the instantiated class.

None

Returns:

Type Description
int

The status value (ibsta).

Source code in src/msl/equipment/interfaces/gpib.py
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
def control_atn(self, state: int, *, handle: int | None = None) -> int:
    """Set the state of the ATN line (board).

    This method mimics the PyVISA-py implementation.

    Args:
        state: The state of the ATN line or the active controller. Allowed values are:

            * 0: ATN_DEASSERT
            * 1: ATN_ASSERT
            * 2: ATN_DEASSERT_HANDSHAKE
            * 3: ATN_ASSERT_IMMEDIATE

    Args:
        handle: Board or device descriptor. Default is the handle of the instantiated class.

    Returns:
        The status value (`ibsta`).
    """
    ibsta: int
    if handle is None:
        handle = self._handle
    if state == ATN_DEASSERT:
        ibsta = self._lib.ibgts(handle, 0)
        return ibsta
    if state == ATN_ASSERT:
        ibsta = self._lib.ibcac(handle, 0)
        return ibsta
    if state == ATN_DEASSERT_HANDSHAKE:
        ibsta = self._lib.ibgts(handle, 1)
        return ibsta
    if state == ATN_ASSERT_IMMEDIATE:
        ibsta = self._lib.ibcac(handle, 1)
        return ibsta

    msg = f"Invalid ATN {state=}"
    raise MSLConnectionError(self, message=msg)

control_ren ¤

control_ren(
    state: int, *, handle: int | None = None
) -> int

Controls the state of the GPIB Remote Enable (REN) interface line.

Optionally the remote/local state of the device is also controlled.

This method mimics the PyVISA-py implementation.

Parameters:

Name Type Description Default
state int

Specifies the state of the REN line and optionally the device remote/local state. Allowed values are:

  • 0: REN_DEASSERT
  • 1: REN_ASSERT
  • 2: REN_DEASSERT_GTL
  • 3: REN_ASSERT_ADDRESS
  • 4: REN_ASSERT_LLO
  • 5: REN_ASSERT_ADDRESS_LLO
  • 6: REN_ADDRESS_GTL
required
handle int | None

Board or device descriptor. Default is the handle of the instantiated class.

None

Returns:

Type Description
int

The status value (ibsta).

Source code in src/msl/equipment/interfaces/gpib.py
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
def control_ren(self, state: int, *, handle: int | None = None) -> int:
    """Controls the state of the GPIB Remote Enable (REN) interface line.

    Optionally the remote/local state of the device is also controlled.

    This method mimics the PyVISA-py implementation.

    Args:
        state: Specifies the state of the REN line and optionally the device remote/local state.
            Allowed values are:

            * 0: REN_DEASSERT
            * 1: REN_ASSERT
            * 2: REN_DEASSERT_GTL
            * 3: REN_ASSERT_ADDRESS
            * 4: REN_ASSERT_LLO
            * 5: REN_ASSERT_ADDRESS_LLO
            * 6: REN_ADDRESS_GTL

        handle: Board or device descriptor. Default is the handle of the instantiated class.

    Returns:
        The status value (`ibsta`).
    """
    if handle is None:
        handle = self._handle

    ibsta = 0
    if self._is_board and state not in (REN_ASSERT, REN_DEASSERT, REN_ASSERT_LLO):
        msg = f"Invalid REN {state=} for INTFC"
        raise MSLConnectionError(self, message=msg)

    if state == REN_DEASSERT_GTL:
        ibsta = self.command(b"\x01", handle=handle)  # GTL = 0x1

    if state in (REN_DEASSERT, REN_DEASSERT_GTL):
        ibsta = self.remote_enable(state=False, handle=handle)

    if state == REN_ASSERT_LLO:
        ibsta = self.command(b"\x11", handle=handle)  # LLO = 0x11
    elif state == REN_ADDRESS_GTL:
        ibsta = self.command(b"\x01", handle=handle)  # GTL = 0x1
    elif state == REN_ASSERT_ADDRESS_LLO:
        pass
    elif state in (REN_ASSERT, REN_ASSERT_ADDRESS):
        ibsta = self.remote_enable(state=True, handle=handle)
        if not self._is_board and state == REN_ASSERT_ADDRESS:
            assert self._address_info.pad is not None  # noqa: S101
            ibsta = int(self.listener(self._address_info.pad, sad=self._address_info.sad or 0, handle=handle))

    return ibsta

count ¤

count() -> int

Get the number of bytes sent or received.

This method is the ibcntl function.

Source code in src/msl/equipment/interfaces/gpib.py
689
690
691
692
693
694
def count(self) -> int:
    """Get the number of bytes sent or received.

    This method is the [ibcntl](https://linux-gpib.sourceforge.io/doc_html/reference-globals-ibcnt.html) function.
    """
    return int(self._lib.ibcntl())

disconnect ¤

disconnect() -> None

Close the GPIB connection.

Source code in src/msl/equipment/interfaces/gpib.py
696
697
698
699
700
701
702
def disconnect(self) -> None:  # pyright: ignore[reportImplicitOverride]
    """Close the GPIB connection."""
    if self._own and self._handle > 0:
        with contextlib.suppress(GPIBLibraryError):
            _ = self.online(state=False, handle=self._handle)
        self._own = False
        super().disconnect()

interface_clear ¤

interface_clear(*, handle: int | None = None) -> int

Perform interface clear (board).

Resets the GPIB bus by asserting the interface clear (IFC) bus line for a duration of at least 100 microseconds.

This method is the ibsic function.

Parameters:

Name Type Description Default
handle int | None

Board or device descriptor. Default is the handle of the instantiated class.

None

Returns:

Type Description
int

The status value (ibsta).

Source code in src/msl/equipment/interfaces/gpib.py
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
def interface_clear(self, *, handle: int | None = None) -> int:
    """Perform interface clear (board).

    Resets the GPIB bus by asserting the *interface clear* (IFC) bus line for a duration of at
    least 100 microseconds.

    This method is the [ibsic](https://linux-gpib.sourceforge.io/doc_html/reference-function-ibsic.html)
    function.

    Args:
        handle: Board or device descriptor. Default is the handle of the instantiated class.

    Returns:
        The status value (`ibsta`).
    """
    if handle is None:
        handle = self._handle
    ibsta: int = self._lib.ibsic(handle)
    return ibsta

lines ¤

lines(*, handle: int | None = None) -> int

Returns the status of the control and handshaking bus lines (board).

This method is the iblines function.

Parameters:

Name Type Description Default
handle int | None

Board or device descriptor. Default is the handle of the instantiated class.

None
Source code in src/msl/equipment/interfaces/gpib.py
735
736
737
738
739
740
741
742
743
744
745
746
747
748
def lines(self, *, handle: int | None = None) -> int:
    """Returns the status of the control and handshaking bus lines (board).

    This method is the [iblines](https://linux-gpib.sourceforge.io/doc_html/reference-function-iblines.html)
    function.

    Args:
        handle: Board or device descriptor. Default is the handle of the instantiated class.
    """
    if handle is None:
        handle = self._handle
    status = c_short()
    self._lib.iblines(handle, byref(status))
    return status.value

listener ¤

listener(
    pad: int, sad: int = 0, *, handle: int | None = None
) -> bool

Check if a listener is present (board or device).

This method is the ibln function.

Parameters:

Name Type Description Default
pad int

Primary address of the GPIB device.

required
sad int

Secondary address of the GPIB device.

0
handle int | None

Board or device descriptor. Default is the handle of the instantiated class.

None

Returns:

Type Description
bool

Whether a listener is present.

Source code in src/msl/equipment/interfaces/gpib.py
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
def listener(self, pad: int, sad: int = 0, *, handle: int | None = None) -> bool:
    """Check if a listener is present (board or device).

    This method is the [ibln](https://linux-gpib.sourceforge.io/doc_html/reference-function-ibln.html)
    function.

    Args:
        pad: Primary address of the GPIB device.
        sad: Secondary address of the GPIB device.
        handle: Board or device descriptor. Default is the handle of the instantiated class.

    Returns:
        Whether a listener is present.
    """
    if handle is None:
        handle = self._handle
    listener = c_short()
    self._lib.ibln(handle, pad, sad, byref(listener))
    return bool(listener.value)

local ¤

local(*, handle: int | None = None) -> int

Go to local mode (board or device).

This method is the ibloc function.

Parameters:

Name Type Description Default
handle int | None

Board or device descriptor. Default is the handle of the instantiated class.

None
Return

The status value (ibsta).

Source code in src/msl/equipment/interfaces/gpib.py
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
def local(self, *, handle: int | None = None) -> int:
    """Go to local mode (board or device).

    This method is the [ibloc](https://linux-gpib.sourceforge.io/doc_html/reference-function-ibloc.html)
    function.

    Args:
        handle: Board or device descriptor. Default is the handle of the instantiated class.

    Return:
        The status value (`ibsta`).
    """
    if handle is None:
        handle = self._handle
    ibsta: int = self._lib.ibloc(handle)
    return ibsta

online ¤

online(*, state: bool, handle: int | None = None) -> int

Close or reinitialize descriptor (board or device).

This method is the ibonl function.

If you want to close the connection for the GPIB board or device that was instantiated, use disconnect.

Parameters:

Name Type Description Default
state bool

If False, closes the connection. If True, then all settings associated with the descriptor (GPIB address, end-of-string mode, timeout, etc.) are reset to their default values. The default values are the settings the descriptor had when it was first obtained.

required
handle int | None

Board or device descriptor. Default is the handle of the instantiated class.

None

Returns:

Type Description
int

The status value (ibsta).

Source code in src/msl/equipment/interfaces/gpib.py
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
def online(self, *, state: bool, handle: int | None = None) -> int:
    """Close or reinitialize descriptor (board or device).

    This method is the [ibonl](https://linux-gpib.sourceforge.io/doc_html/reference-function-ibonl.html)
    function.

    If you want to close the connection for the GPIB board or device that was instantiated,
    use [disconnect][msl.equipment.interfaces.gpib.GPIB.disconnect].

    Args:
        state: If `False`, closes the connection. If `True`, then all settings associated with the
            descriptor (GPIB address, end-of-string mode, timeout, etc.) are reset to their *default*
            values. The *default* values are the settings the descriptor had when it was first obtained.
        handle: Board or device descriptor. Default is the handle of the instantiated class.

    Returns:
        The status value (`ibsta`).
    """
    if handle is None:
        handle = self._handle
    ibsta: int = self._lib.ibonl(handle, int(state))
    return ibsta

pass_control ¤

pass_control(
    *,
    handle: int | None = None,
    name: str | None = None,
    board: int | None = None,
    pad: int = 0,
    sad: int = NO_SEC_ADDR
) -> int

Set a GPIB board or device to become the controller-in-charge (CIC).

This method is the ibpct function.

If no arguments are specified, the instantiated class becomes the CIC.

Parameters:

Name Type Description Default
handle int | None

Board or device descriptor. If specified, name, board, pad and sad are ignored.

None
name str | None

The name of a GPIB board or device. If specified, board, pad and sad are ignored.

None
board int | None

Index of the GPIB interface board.

None
pad int

Primary address of the GPIB device.

0
sad int

Secondary address of the GPIB device.

NO_SEC_ADDR

Returns:

Type Description
int

The handle of the board or device that became CIC.

Source code in src/msl/equipment/interfaces/gpib.py
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
def pass_control(
    self,
    *,
    handle: int | None = None,
    name: str | None = None,
    board: int | None = None,
    pad: int = 0,
    sad: int = NO_SEC_ADDR,
) -> int:
    """Set a GPIB board or device to become the controller-in-charge (CIC).

    This method is the [ibpct](https://linux-gpib.sourceforge.io/doc_html/reference-function-ibpct.html)
    function.

    If no arguments are specified, the instantiated class becomes the CIC.

    Args:
        handle: Board or device descriptor. If specified, `name`, `board`, `pad` and `sad` are ignored.
        name: The name of a GPIB board or device. If specified, `board`, `pad` and `sad` are ignored.
        board: Index of the GPIB interface board.
        pad: Primary address of the GPIB device.
        sad: Secondary address of the GPIB device.

    Returns:
        The handle of the board or device that became CIC.
    """
    if handle is not None:
        pass
    elif name is not None:
        handle = self._get_ibfind_handle(name)
    elif board is not None:
        handle = self._get_ibdev_handle(board, pad, sad, 13, 1, 0)  # T10s = 13
    else:
        handle = self._handle

    self._lib.ibpct(handle)
    return handle

query ¤

query(
    message: bytes | str,
    *,
    delay: float = 0.0,
    decode: Literal[True] = True,
    dtype: None = None,
    fmt: MessageFormat | None = ...,
    size: int | None = ...
) -> str
query(
    message: bytes | str,
    *,
    delay: float = 0.0,
    decode: Literal[False] = False,
    dtype: None = None,
    fmt: MessageFormat | None = ...,
    size: int | None = ...
) -> bytes
query(
    message: bytes | str,
    *,
    delay: float = 0.0,
    decode: bool = ...,
    dtype: MessageDataType = ...,
    fmt: MessageFormat | None = ...,
    size: int | None = ...
) -> NumpyArray1D
query(
    message: bytes | str,
    *,
    delay: float = 0.0,
    decode: bool = True,
    dtype: MessageDataType | None = None,
    fmt: MessageFormat | None = None,
    size: int | None = None
) -> bytes | str | NumpyArray1D

Convenience method for performing a write followed by a read.

Parameters:

Name Type Description Default
message bytes | str

The message to write to the equipment.

required
delay float

Time delay, in seconds, to wait between the write and read operations.

0.0
decode bool

Whether to decode the returned message (i.e., convert the message to a str) or keep the message as bytes. Ignored if dtype is not None.

True
dtype MessageDataType | None

The data type of the elements in the returned message. Can be any object that numpy dtype supports. See from_bytes for more details. For messages that are of scalar type (i.e., a single number) it is more efficient to not specify dtype but to pass the returned message to the int or float class to convert the message to the appropriate numeric type.

None
fmt MessageFormat | None

The format that the returned message data is in. Ignored if dtype is None. See from_bytes for more details.

None
size int | None

The number of bytes to read. Ignored if the value is None.

None

Returns:

Type Description
bytes | str | NumpyArray1D

The message from the equipment. If dtype is specified, then the message is returned as a numpy ndarray, if decode is True then the message is returned as a str, otherwise the message is returned as bytes.

Source code in src/msl/equipment/interfaces/message_based.py
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
def query(  # noqa: PLR0913
    self,
    message: bytes | str,
    *,
    delay: float = 0.0,
    decode: bool = True,
    dtype: MessageDataType | None = None,
    fmt: MessageFormat | None = None,
    size: int | None = None,
) -> bytes | str | NumpyArray1D:
    """Convenience method for performing a [write][msl.equipment.interfaces.message_based.MessageBased.write]
    followed by a [read][msl.equipment.interfaces.message_based.MessageBased.read].

    Args:
        message: The message to write to the equipment.
        delay: Time delay, in seconds, to wait between the _write_ and _read_ operations.
        decode: Whether to decode the returned message (i.e., convert the message to a [str][])
            or keep the message as [bytes][]. Ignored if `dtype` is not `None`.
        dtype: The data type of the elements in the returned message. Can be any object
            that numpy [dtype][numpy.dtype] supports. See [from_bytes][msl.equipment.utils.from_bytes]
            for more details. For messages that are of scalar type (i.e., a single number)
            it is more efficient to not specify `dtype` but to pass the returned message to the
            [int][] or [float][] class to convert the message to the appropriate numeric type.
        fmt: The format that the returned message data is in. Ignored if `dtype` is `None`.
             See [from_bytes][msl.equipment.utils.from_bytes] for more details.
        size: The number of bytes to read. Ignored if the value is `None`.

    Returns:
        The message from the equipment. If `dtype` is specified, then the message is
            returned as a numpy [ndarray][numpy.ndarray], if `decode` is `True` then the message
            is returned as a [str][], otherwise the message is returned as [bytes][].
    """  # noqa: D205
    _ = self.write(message)
    if delay > 0:
        time.sleep(delay)
    if dtype:
        return self.read(dtype=dtype, fmt=fmt, size=size)
    return self.read(decode=decode, size=size)

read ¤

read(
    *,
    decode: Literal[True] = True,
    dtype: None = None,
    fmt: MessageFormat | None = ...,
    size: int | None = ...
) -> str
read(
    *,
    decode: Literal[False] = False,
    dtype: None = None,
    fmt: MessageFormat | None = ...,
    size: int | None = ...
) -> bytes
read(
    *,
    decode: bool = ...,
    dtype: MessageDataType = ...,
    fmt: MessageFormat | None = ...,
    size: int | None = ...
) -> NumpyArray1D
read(
    *,
    decode: bool = True,
    dtype: MessageDataType | None = None,
    fmt: MessageFormat | None = None,
    size: int | None = None
) -> bytes | str | NumpyArray1D

Read a message from the equipment.

This method will block until one of the following conditions is fulfilled:

  1. size bytes have been received — only if size is not None.
  2. the read_termination byte(s) is(are) received — only if read_termination is not None.
  3. a timeout occurs — only if timeout is not None. If a timeout occurs, an MSLTimeoutError is raised.
  4. max_read_size bytes have been received. If the maximum number of bytes have been read, an MSLConnectionError is raised.

Tip

You may also want to set the rstrip value for the class instance.

Parameters:

Name Type Description Default
decode bool

Whether to decode the message (i.e., convert the message to a str) or keep the message as bytes. Ignored if dtype is not None.

True
dtype MessageDataType | None

The data type of the elements in the message. Can be any object that numpy dtype supports. See from_bytes for more details. For messages that are of scalar type (i.e., a single number) it is more efficient to not specify dtype but to pass the returned message to the int or float class to convert the message to the appropriate numeric type.

None
fmt MessageFormat | None

The format that the message data is in. Ignored if dtype is None. See from_bytes for more details.

None
size int | None

The number of bytes to read. Ignored if the value is None.

None

Returns:

Type Description
bytes | str | NumpyArray1D

The message from the equipment. If dtype is specified, then the message is returned as a numpy ndarray, if decode is True then the message is returned as a str, otherwise the message is returned as bytes.

Source code in src/msl/equipment/interfaces/message_based.py
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
def read(
    self,
    *,
    decode: bool = True,
    dtype: MessageDataType | None = None,
    fmt: MessageFormat | None = None,
    size: int | None = None,
) -> bytes | str | NumpyArray1D:
    """Read a message from the equipment.

    This method will block until one of the following conditions is fulfilled:

    1. `size` bytes have been received &mdash; only if `size` is not `None`.
    2. the [read_termination][msl.equipment.interfaces.message_based.MessageBased.read_termination]
       byte(s) is(are) received &mdash; only if
       [read_termination][msl.equipment.interfaces.message_based.MessageBased.read_termination]
       is not `None`.
    3. a timeout occurs &mdash; only if [timeout][msl.equipment.interfaces.message_based.MessageBased.timeout]
       is not `None`. If a timeout occurs, an
       [MSLTimeoutError][msl.equipment.interfaces.message_based.MSLTimeoutError] is raised.
    4. [max_read_size][msl.equipment.interfaces.message_based.MessageBased.max_read_size]
       bytes have been received. If the maximum number of bytes have been read, an
       [MSLConnectionError][msl.equipment.interfaces.message_based.MSLConnectionError] is raised.

    !!! tip
        You may also want to set the [rstrip][msl.equipment.interfaces.message_based.MessageBased.rstrip]
        value for the class instance.

    Args:
        decode: Whether to decode the message (i.e., convert the message to a [str][])
            or keep the message as [bytes][]. Ignored if `dtype` is not `None`.
        dtype: The data type of the elements in the message. Can be any object
            that numpy [dtype][numpy.dtype] supports. See [from_bytes][msl.equipment.utils.from_bytes]
            for more details. For messages that are of scalar type (i.e., a single number)
            it is more efficient to not specify `dtype` but to pass the returned message to the
            [int][] or [float][] class to convert the message to the appropriate numeric type.
        fmt: The format that the message data is in. Ignored if `dtype` is `None`.
             See [from_bytes][msl.equipment.utils.from_bytes] for more details.
        size: The number of bytes to read. Ignored if the value is `None`.

    Returns:
        The message from the equipment. If `dtype` is specified, then the message is returned
            as a numpy [ndarray][numpy.ndarray], if `decode` is `True` then the message
            is returned as a [str][], otherwise the message is returned as [bytes][].
    """
    if size is not None and size > self._max_read_size:
        msg = f"max_read_size is {self._max_read_size} bytes, requesting {size} bytes"
        raise MSLConnectionError(self, msg)

    try:
        message = self._read(size)
    except (serial.SerialTimeoutException, socket.timeout, TimeoutError):
        raise MSLTimeoutError(self) from None
    except Exception as e:  # noqa: BLE001
        msg = f"{e.__class__.__name__}: {e}"
        raise MSLConnectionError(self, msg) from None

    if size is None:
        if dtype:
            logger.debug("%s.read(dtype=%r, fmt=%r) -> %r", self, dtype, fmt, message)
        else:
            logger.debug("%s.read() -> %r", self, message)
    else:
        if len(message) != size:
            msg = f"received {len(message)} bytes, requested {size} bytes"
            raise MSLConnectionError(self, msg)
        logger.debug("%s.read(size=%s) -> %r", self, size, message)

    if self._rstrip:
        message = message.rstrip()

    if dtype:
        return from_bytes(message, fmt=fmt, dtype=dtype)

    if decode:
        return message.decode(encoding=self._encoding)

    return message

remote_enable ¤

remote_enable(
    *, state: bool, handle: int | None = None
) -> int

Set remote enable (board).

This method is the ibsre function.

Parameters:

Name Type Description Default
state bool

If True, the board asserts the REN line. Otherwise, the REN line is not asserted. The board must be the system controller.

required
handle int | None

Board or device descriptor. Default is the handle of the instantiated class.

None

Returns: The status value (ibsta).

Source code in src/msl/equipment/interfaces/gpib.py
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
def remote_enable(self, *, state: bool, handle: int | None = None) -> int:
    """Set remote enable (board).

    This method is the [ibsre](https://linux-gpib.sourceforge.io/doc_html/reference-function-ibsre.html)
    function.

    Args:
        state: If `True`, the board asserts the REN line. Otherwise, the REN line is not asserted.
            The board must be the system controller.
        handle: Board or device descriptor. Default is the handle of the instantiated class.

    Returns: The status value (`ibsta`).
    """
    # ibsre was removed from ni4882.dll, use ibconfig instead (IbcSRE = 0xb)
    ibsta: int = self.config(0xB, int(state), handle=handle)
    return ibsta

serial_poll ¤

serial_poll(*, handle: int | None = None) -> int

Read status byte / serial poll (device).

This method is the ibrsp function.

Parameters:

Name Type Description Default
handle int | None

Board or device descriptor. Default is the handle of the instantiated class.

None

Returns:

Type Description
int

The status byte.

Source code in src/msl/equipment/interfaces/gpib.py
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
def serial_poll(self, *, handle: int | None = None) -> int:
    """Read status byte / serial poll (device).

    This method is the [ibrsp](https://linux-gpib.sourceforge.io/doc_html/reference-function-ibrsp.html)
    function.

    Args:
        handle: Board or device descriptor. Default is the handle of the instantiated class.

    Returns:
        The status byte.
    """
    if handle is None:
        handle = self._handle
    status = create_string_buffer(1)
    self._lib.ibrsp(handle, status)
    return ord(status.value)

spoll_bytes ¤

spoll_bytes(*, handle: int | None = None) -> int

Get the length of the serial poll bytes queue (device).

This method is the ibspb function.

Parameters:

Name Type Description Default
handle int | None

Board or device descriptor. Default is the handle of the instantiated class.

None
Source code in src/msl/equipment/interfaces/gpib.py
904
905
906
907
908
909
910
911
912
913
914
915
916
917
def spoll_bytes(self, *, handle: int | None = None) -> int:
    """Get the length of the serial poll bytes queue (device).

    This method is the [ibspb](https://linux-gpib.sourceforge.io/doc_html/reference-function-ibspb.html)
    function.

    Args:
        handle: Board or device descriptor. Default is the handle of the instantiated class.
    """
    if handle is None:
        handle = self._handle
    length = c_short()
    self._lib.ibspb(handle, byref(length))
    return length.value

status ¤

status() -> int

Returns the status value ibsta.

Source code in src/msl/equipment/interfaces/gpib.py
919
920
921
def status(self) -> int:
    """Returns the status value [ibsta](https://linux-gpib.sourceforge.io/doc_html/reference-globals-ibsta.html)."""
    return int(self._lib.ThreadIbsta())

trigger ¤

trigger(*, handle: int | None = None) -> int

Trigger device.

This method is the ibtrg function.

Parameters:

Name Type Description Default
handle int | None

Board or device descriptor. Default is the handle of the instantiated class.

None

Returns:

Type Description
int

The status value (ibsta).

Source code in src/msl/equipment/interfaces/gpib.py
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
def trigger(self, *, handle: int | None = None) -> int:
    """Trigger device.

    This method is the [ibtrg](https://linux-gpib.sourceforge.io/doc_html/reference-function-ibtrg.html)
    function.

    Args:
        handle: Board or device descriptor. Default is the handle of the instantiated class.

    Returns:
        The status value (`ibsta`).
    """
    if handle is None:
        handle = self._handle
    ibsta: int = self._lib.ibtrg(handle)
    return ibsta

version ¤

version() -> str

Returns the version of the GPIB library (linux only).

Source code in src/msl/equipment/interfaces/gpib.py
940
941
942
943
944
945
946
947
948
def version(self) -> str:
    """Returns the version of the GPIB library (linux only)."""
    try:
        version = c_char_p()
        self._lib.ibvers(byref(version))
        assert version.value is not None  # noqa: S101
        return version.value.decode()
    except AttributeError:
        return ""

wait ¤

wait(mask: int, *, handle: int | None = None) -> int

Wait for an event (board or device).

This method is the ibwait function.

Parameters:

Name Type Description Default
mask int

Wait until one of the conditions specified in mask is true.

required
handle int | None

Board or device descriptor. Default is the handle of the instantiated class.

None

Returns:

Type Description
int

The status value (ibsta).

Source code in src/msl/equipment/interfaces/gpib.py
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
def wait(self, mask: int, *, handle: int | None = None) -> int:
    """Wait for an event (board or device).

    This method is the [ibwait](https://linux-gpib.sourceforge.io/doc_html/reference-function-ibwait.html)
    function.

    Args:
        mask: Wait until one of the conditions specified in `mask` is true.
        handle: Board or device descriptor. Default is the handle of the instantiated class.

    Returns:
        The status value (`ibsta`).
    """
    if handle is None:
        handle = self._handle
    ibsta: int = self._lib.ibwait(handle, mask)
    return ibsta

wait_for_srq ¤

wait_for_srq(*, handle: int | None = None) -> int

Wait for the SRQ line to be asserted (board or device).

Parameters:

Name Type Description Default
handle int | None

Board or device descriptor. Default is the handle of the instantiated class.

None

Returns:

Type Description
int

The status value (ibsta).

Source code in src/msl/equipment/interfaces/gpib.py
968
969
970
971
972
973
974
975
976
977
def wait_for_srq(self, *, handle: int | None = None) -> int:
    """Wait for the SRQ line to be asserted (board or device).

    Args:
        handle: Board or device descriptor. Default is the handle of the instantiated class.

    Returns:
        The status value (`ibsta`).
    """
    return self.wait(0x1000, handle=handle)  # SRQI = 0x1000

write ¤

write(
    message: bytes | str,
    *,
    data: Sequence1D | None = None,
    dtype: MessageDataType = "<f",
    fmt: MessageFormat | None = "ieee"
) -> int

Write a message to the equipment.

Parameters:

Name Type Description Default
message bytes | str

The message to write to the equipment.

required
data Sequence1D | None

The data to append to message. See to_bytes for more details.

None
dtype MessageDataType

The data type to use to convert each element in data to bytes. Ignored if data is None. See to_bytes for more details.

'<f'
fmt MessageFormat | None

The format to use to convert data to bytes. Ignored if data is None. See to_bytes for more details.

'ieee'

Returns:

Type Description
int

The number of bytes written.

Source code in src/msl/equipment/interfaces/message_based.py
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
def write(
    self,
    message: bytes | str,
    *,
    data: Sequence1D | None = None,
    dtype: MessageDataType = "<f",
    fmt: MessageFormat | None = "ieee",
) -> int:
    """Write a message to the equipment.

    Args:
        message: The message to write to the equipment.
        data: The data to append to `message`. See [to_bytes][msl.equipment.utils.to_bytes]
            for more details.
        dtype: The data type to use to convert each element in `data` to bytes. Ignored
            if `data` is `None`. See [to_bytes][msl.equipment.utils.to_bytes] for more details.
        fmt: The format to use to convert `data` to bytes. Ignored if `data` is `None`.
            See [to_bytes][msl.equipment.utils.to_bytes] for more details.

    Returns:
        The number of bytes written.
    """
    if not isinstance(message, bytes):
        message = message.encode(encoding=self._encoding)

    if data is not None:
        message += to_bytes(data, fmt=fmt, dtype=dtype)

    if self._write_termination and not message.endswith(self._write_termination):
        message += self._write_termination

    logger.debug("%s.write(%r)", self, message)

    try:
        return self._write(message)
    except (serial.SerialTimeoutException, socket.timeout, TimeoutError):
        raise MSLTimeoutError(self) from None
    except Exception as e:  # noqa: BLE001
        raise MSLConnectionError(self, str(e)) from None

write_async ¤

write_async(
    message: bytes, *, handle: int | None = None
) -> int

Write a message asynchronously (board or device).

This method is the ibwrta function.

Parameters:

Name Type Description Default
message bytes

The data to send.

required
handle int | None

Board or device descriptor. Default is the handle of the instantiated class.

None

Returns:

Type Description
int

The status value (ibsta).

Source code in src/msl/equipment/interfaces/gpib.py
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
def write_async(self, message: bytes, *, handle: int | None = None) -> int:
    """Write a message asynchronously (board or device).

    This method is the [ibwrta](https://linux-gpib.sourceforge.io/doc_html/reference-function-ibwrta.html) function.

    Args:
        message: The data to send.
        handle: Board or device descriptor. Default is the handle of the instantiated class.

    Returns:
        The status value (`ibsta`).
    """
    if handle is None:
        handle = self._handle
    ibsta: int = self._lib.ibwrta(handle, message, len(message))
    return ibsta

HiSLIP ¤

HiSLIP(equipment: Equipment)

Bases: MessageBased

Base class for the HiSLIP communication protocol.

Parameters:

Name Type Description Default
equipment Equipment

An Equipment instance.

required

A Connection instance supports the following properties for the HiSLIP communication protocol, as well as the properties defined in MessageBased.

Connection Properties:

Name Type Description
buffer_size int

The maximum number of bytes to read at a time. Default: 4096

lock_timeout float

The timeout (in seconds) to wait for a lock (0 means wait forever). Default: 0

Source code in src/msl/equipment/interfaces/hislip.py
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
def __init__(self, equipment: Equipment) -> None:
    """Base class for the [HiSLIP] communication protocol.

    [HiSLIP]: https://www.ivifoundation.org/downloads/Protocol%20Specifications/IVI-6.1_HiSLIP-2.0-2020-04-23.pdf

    Args:
        equipment: An [Equipment][] instance.

    A [Connection][msl.equipment.schema.Connection] instance supports the following _properties_
    for the HiSLIP communication protocol, as well as the _properties_ defined in
    [MessageBased][msl.equipment.interfaces.message_based.MessageBased].

    Attributes: Connection Properties:
        buffer_size (int): The maximum number of bytes to read at a time. _Default: `4096`_
        lock_timeout (float): The timeout (in seconds) to wait for a lock (0 means wait forever). _Default: `0`_
    """
    super().__init__(equipment)

    assert equipment.connection is not None  # noqa: S101

    info = parse_hislip_address(equipment.connection.address)
    if info is None:
        msg = f"Invalid HiSLIP address {equipment.connection.address!r}"
        raise ValueError(msg)

    self._info: ParsedHiSLIPAddress = info

    # HiSLIP does not support termination characters
    self.write_termination = None  # pyright: ignore[reportUnannotatedClassAttribute]
    self.read_termination = None  # pyright: ignore[reportUnannotatedClassAttribute]

    props = equipment.connection.properties
    self._buffer_size: int = props.get("buffer_size", 4096)
    self._lock_timeout: float = props.get("lock_timeout", 0)
    self.lock_timeout = self._lock_timeout

    self._sync: SyncClient
    self._async: AsyncClient
    self._connect()
    self._set_interface_timeout()

asynchronous property ¤

asynchronous: AsyncClient

The reference to the asynchronous client.

encoding property writable ¤

encoding: str

The encoding that is used for read and write operations.

equipment property ¤

equipment: Equipment

The Equipment associated with the interface.

lock_timeout property writable ¤

lock_timeout: float

The time, in seconds, to wait to acquire a lock.

Setting the value to ≤0 (or None) means wait forever.

max_read_size property writable ¤

max_read_size: int

The maximum number of bytes that can be read.

rstrip property writable ¤

rstrip: bool

Whether to remove trailing whitespace from read messages.

synchronous property ¤

synchronous: SyncClient

The reference to the synchronous client.

timeout property writable ¤

timeout: float | None

The timeout, in seconds, for read and write operations.

A value <0 will set the timeout to be None (blocking mode).

clear ¤

clear() -> None

Send the clear command to the device.

Source code in src/msl/equipment/interfaces/hislip.py
1475
1476
1477
1478
1479
1480
1481
1482
1483
1484
1485
def clear(self) -> None:
    """Send the `clear` command to the device."""
    # IVI-6.1: IVI High-Speed LAN Instrument Protocol (HiSLIP)
    # 23 April 2020 (Revision 2.0)
    # Section 6.12: Device Clear Transaction
    #
    # This Connection class does not use the asynchronous client in an
    # asynchronous manner, therefore there should not be any pending
    # requests that need to be waited on to finish
    acknowledged = self._async.async_device_clear()
    _ = self._sync.device_clear_complete(acknowledged.feature_bitmap)

disconnect ¤

disconnect() -> None

Close the connection to the HiSLIP server.

Source code in src/msl/equipment/interfaces/hislip.py
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
def disconnect(self) -> None:  # pyright: ignore[reportImplicitOverride]
    """Close the connection to the HiSLIP server."""
    if not hasattr(self, "_async"):
        return

    if self._async.socket is None and self._sync.socket is None:
        return

    self._async.close()
    self._sync.close()
    super().disconnect()

lock ¤

lock(lock_string: str = '') -> bool

Acquire the device's lock.

Parameters:

Name Type Description Default
lock_string str

An ASCII string that identifies this lock. If not specified, then an exclusive lock is requested, otherwise the string indicates an identification of a shared-lock request.

''

Returns:

Type Description
bool

Whether acquiring the lock was successful.

Source code in src/msl/equipment/interfaces/hislip.py
1487
1488
1489
1490
1491
1492
1493
1494
1495
1496
1497
1498
1499
def lock(self, lock_string: str = "") -> bool:
    """Acquire the device's lock.

    Args:
        lock_string: An ASCII string that identifies this lock. If not specified, then
            an exclusive lock is requested, otherwise the string indicates an
            identification of a shared-lock request.

    Returns:
        Whether acquiring the lock was successful.
    """
    status = self._async.async_lock_request(timeout=self._lock_timeout, lock_string=lock_string)
    return status.success

lock_status ¤

lock_status() -> tuple[bool, int]

Request the lock status from the HiSLIP server.

Returns:

Type Description
tuple[bool, int]

Whether the HiSLIP server has an exclusive lock with a client and the number of HiSLIP clients that have a lock with the HiSLIP server.

Source code in src/msl/equipment/interfaces/hislip.py
1510
1511
1512
1513
1514
1515
1516
1517
1518
def lock_status(self) -> tuple[bool, int]:
    """Request the lock status from the HiSLIP server.

    Returns:
        Whether the HiSLIP server has an exclusive lock with a client and
            the number of HiSLIP clients that have a lock with the HiSLIP server.
    """
    reply = self._async.async_lock_info()
    return reply.exclusive, reply.num_locks

query ¤

query(
    message: bytes | str,
    *,
    delay: float = 0.0,
    decode: Literal[True] = True,
    dtype: None = None,
    fmt: MessageFormat | None = ...,
    size: int | None = ...
) -> str
query(
    message: bytes | str,
    *,
    delay: float = 0.0,
    decode: Literal[False] = False,
    dtype: None = None,
    fmt: MessageFormat | None = ...,
    size: int | None = ...
) -> bytes
query(
    message: bytes | str,
    *,
    delay: float = 0.0,
    decode: bool = ...,
    dtype: MessageDataType = ...,
    fmt: MessageFormat | None = ...,
    size: int | None = ...
) -> NumpyArray1D
query(
    message: bytes | str,
    *,
    delay: float = 0.0,
    decode: bool = True,
    dtype: MessageDataType | None = None,
    fmt: MessageFormat | None = None,
    size: int | None = None
) -> bytes | str | NumpyArray1D

Convenience method for performing a write followed by a read.

Parameters:

Name Type Description Default
message bytes | str

The message to write to the equipment.

required
delay float

Time delay, in seconds, to wait between the write and read operations.

0.0
decode bool

Whether to decode the returned message (i.e., convert the message to a str) or keep the message as bytes. Ignored if dtype is not None.

True
dtype MessageDataType | None

The data type of the elements in the returned message. Can be any object that numpy dtype supports. See from_bytes for more details. For messages that are of scalar type (i.e., a single number) it is more efficient to not specify dtype but to pass the returned message to the int or float class to convert the message to the appropriate numeric type.

None
fmt MessageFormat | None

The format that the returned message data is in. Ignored if dtype is None. See from_bytes for more details.

None
size int | None

The number of bytes to read. Ignored if the value is None.

None

Returns:

Type Description
bytes | str | NumpyArray1D

The message from the equipment. If dtype is specified, then the message is returned as a numpy ndarray, if decode is True then the message is returned as a str, otherwise the message is returned as bytes.

Source code in src/msl/equipment/interfaces/message_based.py
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
def query(  # noqa: PLR0913
    self,
    message: bytes | str,
    *,
    delay: float = 0.0,
    decode: bool = True,
    dtype: MessageDataType | None = None,
    fmt: MessageFormat | None = None,
    size: int | None = None,
) -> bytes | str | NumpyArray1D:
    """Convenience method for performing a [write][msl.equipment.interfaces.message_based.MessageBased.write]
    followed by a [read][msl.equipment.interfaces.message_based.MessageBased.read].

    Args:
        message: The message to write to the equipment.
        delay: Time delay, in seconds, to wait between the _write_ and _read_ operations.
        decode: Whether to decode the returned message (i.e., convert the message to a [str][])
            or keep the message as [bytes][]. Ignored if `dtype` is not `None`.
        dtype: The data type of the elements in the returned message. Can be any object
            that numpy [dtype][numpy.dtype] supports. See [from_bytes][msl.equipment.utils.from_bytes]
            for more details. For messages that are of scalar type (i.e., a single number)
            it is more efficient to not specify `dtype` but to pass the returned message to the
            [int][] or [float][] class to convert the message to the appropriate numeric type.
        fmt: The format that the returned message data is in. Ignored if `dtype` is `None`.
             See [from_bytes][msl.equipment.utils.from_bytes] for more details.
        size: The number of bytes to read. Ignored if the value is `None`.

    Returns:
        The message from the equipment. If `dtype` is specified, then the message is
            returned as a numpy [ndarray][numpy.ndarray], if `decode` is `True` then the message
            is returned as a [str][], otherwise the message is returned as [bytes][].
    """  # noqa: D205
    _ = self.write(message)
    if delay > 0:
        time.sleep(delay)
    if dtype:
        return self.read(dtype=dtype, fmt=fmt, size=size)
    return self.read(decode=decode, size=size)

read ¤

read(
    *,
    decode: Literal[True] = True,
    dtype: None = None,
    fmt: MessageFormat | None = ...,
    size: int | None = ...
) -> str
read(
    *,
    decode: Literal[False] = False,
    dtype: None = None,
    fmt: MessageFormat | None = ...,
    size: int | None = ...
) -> bytes
read(
    *,
    decode: bool = ...,
    dtype: MessageDataType = ...,
    fmt: MessageFormat | None = ...,
    size: int | None = ...
) -> NumpyArray1D
read(
    *,
    decode: bool = True,
    dtype: MessageDataType | None = None,
    fmt: MessageFormat | None = None,
    size: int | None = None
) -> bytes | str | NumpyArray1D

Read a message from the equipment.

This method will block until one of the following conditions is fulfilled:

  1. size bytes have been received — only if size is not None.
  2. the read_termination byte(s) is(are) received — only if read_termination is not None.
  3. a timeout occurs — only if timeout is not None. If a timeout occurs, an MSLTimeoutError is raised.
  4. max_read_size bytes have been received. If the maximum number of bytes have been read, an MSLConnectionError is raised.

Tip

You may also want to set the rstrip value for the class instance.

Parameters:

Name Type Description Default
decode bool

Whether to decode the message (i.e., convert the message to a str) or keep the message as bytes. Ignored if dtype is not None.

True
dtype MessageDataType | None

The data type of the elements in the message. Can be any object that numpy dtype supports. See from_bytes for more details. For messages that are of scalar type (i.e., a single number) it is more efficient to not specify dtype but to pass the returned message to the int or float class to convert the message to the appropriate numeric type.

None
fmt MessageFormat | None

The format that the message data is in. Ignored if dtype is None. See from_bytes for more details.

None
size int | None

The number of bytes to read. Ignored if the value is None.

None

Returns:

Type Description
bytes | str | NumpyArray1D

The message from the equipment. If dtype is specified, then the message is returned as a numpy ndarray, if decode is True then the message is returned as a str, otherwise the message is returned as bytes.

Source code in src/msl/equipment/interfaces/message_based.py
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
def read(
    self,
    *,
    decode: bool = True,
    dtype: MessageDataType | None = None,
    fmt: MessageFormat | None = None,
    size: int | None = None,
) -> bytes | str | NumpyArray1D:
    """Read a message from the equipment.

    This method will block until one of the following conditions is fulfilled:

    1. `size` bytes have been received &mdash; only if `size` is not `None`.
    2. the [read_termination][msl.equipment.interfaces.message_based.MessageBased.read_termination]
       byte(s) is(are) received &mdash; only if
       [read_termination][msl.equipment.interfaces.message_based.MessageBased.read_termination]
       is not `None`.
    3. a timeout occurs &mdash; only if [timeout][msl.equipment.interfaces.message_based.MessageBased.timeout]
       is not `None`. If a timeout occurs, an
       [MSLTimeoutError][msl.equipment.interfaces.message_based.MSLTimeoutError] is raised.
    4. [max_read_size][msl.equipment.interfaces.message_based.MessageBased.max_read_size]
       bytes have been received. If the maximum number of bytes have been read, an
       [MSLConnectionError][msl.equipment.interfaces.message_based.MSLConnectionError] is raised.

    !!! tip
        You may also want to set the [rstrip][msl.equipment.interfaces.message_based.MessageBased.rstrip]
        value for the class instance.

    Args:
        decode: Whether to decode the message (i.e., convert the message to a [str][])
            or keep the message as [bytes][]. Ignored if `dtype` is not `None`.
        dtype: The data type of the elements in the message. Can be any object
            that numpy [dtype][numpy.dtype] supports. See [from_bytes][msl.equipment.utils.from_bytes]
            for more details. For messages that are of scalar type (i.e., a single number)
            it is more efficient to not specify `dtype` but to pass the returned message to the
            [int][] or [float][] class to convert the message to the appropriate numeric type.
        fmt: The format that the message data is in. Ignored if `dtype` is `None`.
             See [from_bytes][msl.equipment.utils.from_bytes] for more details.
        size: The number of bytes to read. Ignored if the value is `None`.

    Returns:
        The message from the equipment. If `dtype` is specified, then the message is returned
            as a numpy [ndarray][numpy.ndarray], if `decode` is `True` then the message
            is returned as a [str][], otherwise the message is returned as [bytes][].
    """
    if size is not None and size > self._max_read_size:
        msg = f"max_read_size is {self._max_read_size} bytes, requesting {size} bytes"
        raise MSLConnectionError(self, msg)

    try:
        message = self._read(size)
    except (serial.SerialTimeoutException, socket.timeout, TimeoutError):
        raise MSLTimeoutError(self) from None
    except Exception as e:  # noqa: BLE001
        msg = f"{e.__class__.__name__}: {e}"
        raise MSLConnectionError(self, msg) from None

    if size is None:
        if dtype:
            logger.debug("%s.read(dtype=%r, fmt=%r) -> %r", self, dtype, fmt, message)
        else:
            logger.debug("%s.read() -> %r", self, message)
    else:
        if len(message) != size:
            msg = f"received {len(message)} bytes, requested {size} bytes"
            raise MSLConnectionError(self, msg)
        logger.debug("%s.read(size=%s) -> %r", self, size, message)

    if self._rstrip:
        message = message.rstrip()

    if dtype:
        return from_bytes(message, fmt=fmt, dtype=dtype)

    if decode:
        return message.decode(encoding=self._encoding)

    return message

read_stb ¤

read_stb() -> int

Read the status byte from the device.

Returns:

Type Description
int

The status byte.

Source code in src/msl/equipment/interfaces/hislip.py
1462
1463
1464
1465
1466
1467
1468
1469
def read_stb(self) -> int:
    """Read the status byte from the device.

    Returns:
        The status byte.
    """
    reply = self._async.async_status_query(self._sync)
    return reply.status

reconnect ¤

reconnect(max_attempts: int = 1) -> None

Reconnect to the equipment.

Parameters:

Name Type Description Default
max_attempts int

The maximum number of attempts to try to reconnect with the equipment. If <1, keep trying until a connection is successful. If the maximum number of attempts has been reached then an exception is raise.

1
Source code in src/msl/equipment/interfaces/hislip.py
1423
1424
1425
1426
1427
1428
1429
1430
1431
1432
1433
1434
1435
1436
1437
1438
def reconnect(self, max_attempts: int = 1) -> None:
    """Reconnect to the equipment.

    Args:
        max_attempts: The maximum number of attempts to try to reconnect with the equipment.
            If &lt;1, keep trying until a connection is successful. If the maximum number
            of attempts has been reached then an exception is raise.
    """
    attempt = 0
    while True:
        attempt += 1
        try:
            return self._connect()
        except (MSLConnectionError, MSLTimeoutError):
            if 0 < max_attempts <= attempt:
                raise

remote_local_control ¤

remote_local_control(request: int) -> None

Send a GPIB-like remote/local control request.

Parameters:

Name Type Description Default
request int

The request to perform.

  • 0 — Disable remote, VI_GPIB_REN_DEASSERT
  • 1 — Enable remote, VI_GPIB_REN_ASSERT
  • 2 — Disable remote and go to local, VI_GPIB_REN_DEASSERT_GTL
  • 3 — Enable Remote and go to remote, VI_GPIB_REN_ASSERT_ADDRESS
  • 4 — Enable remote and lock out local, VI_GPIB_REN_ASSERT_LLO
  • 5 — Enable remote, go to remote, and set local lockout, VI_GPIB_REN_ASSERT_ADDRESS_LLO
  • 6 — Go to local without changing REN or lockout state, VI_GPIB_REN_ADDRESS_GTL
required
Source code in src/msl/equipment/interfaces/hislip.py
1520
1521
1522
1523
1524
1525
1526
1527
1528
1529
1530
1531
1532
1533
1534
def remote_local_control(self, request: int) -> None:
    """Send a GPIB-like remote/local control request.

    Args:
        request: The request to perform.

            * 0 &mdash; Disable remote, `VI_GPIB_REN_DEASSERT`
            * 1 &mdash; Enable remote, `VI_GPIB_REN_ASSERT`
            * 2 &mdash; Disable remote and go to local, `VI_GPIB_REN_DEASSERT_GTL`
            * 3 &mdash; Enable Remote and go to remote, `VI_GPIB_REN_ASSERT_ADDRESS`
            * 4 &mdash; Enable remote and lock out local, `VI_GPIB_REN_ASSERT_LLO`
            * 5 &mdash; Enable remote, go to remote, and set local lockout, `VI_GPIB_REN_ASSERT_ADDRESS_LLO`
            * 6 &mdash; Go to local without changing REN or lockout state, `VI_GPIB_REN_ADDRESS_GTL`
    """
    _ = self._async.async_remote_local_control(request, self._sync.message_id)

trigger ¤

trigger() -> None

Send the trigger message (emulates a GPIB Group Execute Trigger event).

Source code in src/msl/equipment/interfaces/hislip.py
1471
1472
1473
def trigger(self) -> None:
    """Send the trigger message (emulates a GPIB Group Execute Trigger event)."""
    self._sync.trigger()

unlock ¤

unlock() -> bool

Release the lock acquired by lock.

Returns:

Type Description
bool

Whether releasing the lock was successful.

Source code in src/msl/equipment/interfaces/hislip.py
1501
1502
1503
1504
1505
1506
1507
1508
def unlock(self) -> bool:
    """Release the lock acquired by [lock][msl.equipment.interfaces.hislip.HiSLIP.lock].

    Returns:
        Whether releasing the lock was successful.
    """
    status = self._async.async_lock_release(self._sync.message_id)
    return status.success

write ¤

write(
    message: bytes | str,
    *,
    data: Sequence1D | None = None,
    dtype: MessageDataType = "<f",
    fmt: MessageFormat | None = "ieee"
) -> int

Write a message to the equipment.

Parameters:

Name Type Description Default
message bytes | str

The message to write to the equipment.

required
data Sequence1D | None

The data to append to message. See to_bytes for more details.

None
dtype MessageDataType

The data type to use to convert each element in data to bytes. Ignored if data is None. See to_bytes for more details.

'<f'
fmt MessageFormat | None

The format to use to convert data to bytes. Ignored if data is None. See to_bytes for more details.

'ieee'

Returns:

Type Description
int

The number of bytes written.

Source code in src/msl/equipment/interfaces/message_based.py
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
def write(
    self,
    message: bytes | str,
    *,
    data: Sequence1D | None = None,
    dtype: MessageDataType = "<f",
    fmt: MessageFormat | None = "ieee",
) -> int:
    """Write a message to the equipment.

    Args:
        message: The message to write to the equipment.
        data: The data to append to `message`. See [to_bytes][msl.equipment.utils.to_bytes]
            for more details.
        dtype: The data type to use to convert each element in `data` to bytes. Ignored
            if `data` is `None`. See [to_bytes][msl.equipment.utils.to_bytes] for more details.
        fmt: The format to use to convert `data` to bytes. Ignored if `data` is `None`.
            See [to_bytes][msl.equipment.utils.to_bytes] for more details.

    Returns:
        The number of bytes written.
    """
    if not isinstance(message, bytes):
        message = message.encode(encoding=self._encoding)

    if data is not None:
        message += to_bytes(data, fmt=fmt, dtype=dtype)

    if self._write_termination and not message.endswith(self._write_termination):
        message += self._write_termination

    logger.debug("%s.write(%r)", self, message)

    try:
        return self._write(message)
    except (serial.SerialTimeoutException, socket.timeout, TimeoutError):
        raise MSLTimeoutError(self) from None
    except Exception as e:  # noqa: BLE001
        raise MSLConnectionError(self, str(e)) from None

NIDAQ ¤

NIDAQ(equipment: Equipment)

Bases: Interface

Use NI-DAQmx as the backend to communicate with the equipment.

The backend value must be equal to NIDAQ to use this class for the communication backend.

The returned object from calling the connect method is equivalent to importing the NI-DAQmx package, e.g.,

from msl.equipment import Backend, Connection

connection = Connection(address="Dev1", backend=Backend.NIDAQ)
nidaqmx = connection.connect()

with nidaqmx.Task() as task:
    task.ai_channels.add_ai_voltage_chan(f"{nidaqmx.address}/ai0")
    voltage = task.read()

is equivalent to

import nidaqmx

with nidaqmx.Task() as task:
    task.ai_channels.add_ai_voltage_chan("Dev1/ai0")
    voltage = task.read()

You can also combine the packages, use msl-equipment for managing information about the equipment and directly use nidaqmx for the connection. If you use this combination, the editor you use to develop your code may have better support for features like code completion and type checking.

import nidaqmx
from msl.equipment import Config

# config.xml contains <equipment eid="MSLE.0.142" name="daq" manufacturer="NI"/>
# and specifies where the equipment registers are and the connections file.
cfg = Config("config.xml")
equipment = cfg.equipment["daq"]
address = equipment.connection.address

with nidaqmx.Task() as task:
    task.ai_channels.add_ai_voltage_chan(f"{address}/ai0")
    voltage = task.read()

    # You could now use the `equipment` instance to apply a correction to the `voltage`

See the examples on the NI-DAQmx repository to learn how to use the nidaqmx package.

Parameters:

Name Type Description Default
equipment Equipment

An Equipment instance.

required
Source code in src/msl/equipment/interfaces/nidaq.py
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
def __init__(self, equipment: Equipment) -> None:
    """Use [NI-DAQmx] as the backend to communicate with the equipment.

    The [backend][msl.equipment.schema.Connection.backend] value must be equal
    to `NIDAQ` to use this class for the communication backend.

    The returned object from calling the [connect][msl.equipment.schema.Equipment.connect]
    method is equivalent to importing the [NI-DAQmx] package, e.g.,

    ```python
    from msl.equipment import Backend, Connection

    connection = Connection(address="Dev1", backend=Backend.NIDAQ)
    nidaqmx = connection.connect()

    with nidaqmx.Task() as task:
        task.ai_channels.add_ai_voltage_chan(f"{nidaqmx.address}/ai0")
        voltage = task.read()
    ```

    is equivalent to

    ```python
    import nidaqmx

    with nidaqmx.Task() as task:
        task.ai_channels.add_ai_voltage_chan("Dev1/ai0")
        voltage = task.read()
    ```

    You can also combine the packages, use `msl-equipment` for managing information
    about the equipment and directly use `nidaqmx` for the connection. If you use this
    combination, the editor you use to develop your code may have better support for
    features like code completion and type checking.

    ```python
    import nidaqmx
    from msl.equipment import Config

    # config.xml contains <equipment eid="MSLE.0.142" name="daq" manufacturer="NI"/>
    # and specifies where the equipment registers are and the connections file.
    cfg = Config("config.xml")
    equipment = cfg.equipment["daq"]
    address = equipment.connection.address

    with nidaqmx.Task() as task:
        task.ai_channels.add_ai_voltage_chan(f"{address}/ai0")
        voltage = task.read()

        # You could now use the `equipment` instance to apply a correction to the `voltage`
    ```

    See the [examples](https://github.com/ni/nidaqmx-python/tree/master/examples)
    on the [NI-DAQmx repository](https://github.com/ni/nidaqmx-python) to learn
    how to use the `nidaqmx` package.

    [NI-DAQmx]: https://nidaqmx-python.readthedocs.io/en/stable/index.html

    Args:
        equipment: An [Equipment][] instance.
    """
    super().__init__(equipment)

    if nidaqmx is None:  # pragma: no branch
        msg = "nidaqmx is not installed, run: pip install nidaqmx"
        raise RuntimeError(msg)

address property ¤

address: str

Returns the address of the Connection.

equipment property ¤

equipment: Equipment

The Equipment associated with the interface.

disconnect ¤

disconnect() -> None

Disconnect from the equipment.

This method can be overridden in the subclass if the subclass must implement tasks that need to be performed in order to safely disconnect from the equipment.

For example,

  • to clean up system resources from memory (e.g., if using a manufacturer's SDK)
  • to configure the equipment to be in a state that is safe for people working in the lab when the equipment is not in use

Tip

This method gets called automatically when the Interface instance gets garbage collected, which happens when the reference count is 0.

Source code in src/msl/equipment/schema.py
3053
3054
3055
3056
3057
3058
3059
3060
3061
3062
3063
3064
3065
3066
3067
3068
3069
def disconnect(self) -> None:
    """Disconnect from the equipment.

    This method can be overridden in the subclass if the subclass must implement
    tasks that need to be performed in order to safely disconnect from the equipment.

    For example,

    * to clean up system resources from memory (e.g., if using a manufacturer's SDK)
    * to configure the equipment to be in a state that is safe for people
      working in the lab when the equipment is not in use

    !!! tip
        This method gets called automatically when the [Interface][msl.equipment.schema.Interface]
        instance gets garbage collected, which happens when the reference count is 0.
    """
    logger.debug("Disconnected from %r", self)

Prologix ¤

Prologix(equipment: Equipment)

Bases: Interface

Use Prologix hardware to establish a connection.

For the GPIB-ETHERNET Controller, the format of the address string is Prologix::HOST::1234::PAD[::SAD], where HOST is the hostname or IP address of the Prologix hardware, 1234 is the ethernet port that is open on the Prologix hardware, PAD (Primary GPIB Address) is an integer value between 0 and 30, and SAD (Secondary GPIB Address) is an integer value between 96 and 126 (SAD is optional). For example,

  • Prologix::192.168.1.110::1234::6
  • Prologix::192.168.1.110::1234::6::96
  • Prologix::prologix-00-21-69-01-31-04::1234::6
    (typically, the hostname is prologix-<MAC Address>)

For the GPIB-USB Controller, the format of the address string is Prologix::PORT::PAD[::SAD], where PORT is the name of the serial port of the Prologix hardware, PAD (Primary GPIB Address) is an integer value between 0 and 30, and SAD (Secondary GPIB Address) is an integer value between 96 and 126 (SAD is optional). For example,

  • Prologix::COM3::6
  • Prologix::/dev/ttyUSB0::6::112

Alternatively, to clearly separate the Prologix hardware address from the GPIB address you may include GPIB:: in the address, for example,

  • Prologix::192.168.1.110::1234::GPIB::6
  • Prologix::COM3::GPIB::22::96

Parameters:

Name Type Description Default
equipment Equipment

An Equipment instance.

required

A Connection instance supports the following properties for using Prologix hardware, as well as the properties defined in, Serial (for a GPIB-USB Controller) and in Socket (for a GPIB-ETHERNET Controller).

Connection Properties:

Name Type Description
eoi int

Whether to use the End or Identify line, either 0 (disable) or 1 (enable).

eos int

GPIB termination character(s): 0 (CR+LF), 1 (CR), 2 (LF) or 3 (no termination).

eot_char int

A user-specified character to append to network output when eot_enable is set to 1 and EOI is detected. Must be an ASCII value <256, e.g., eot_char=42 appends * (ASCII 42) when EOI is detected.

eot_enable int

Enables (1) or disables (0) the appending of a user-specified character, eot_char.

mode int

Configure the Prologix hardware to be a CONTROLLER (1) or DEVICE (0). Default: 1

read_tmo_ms int

The inter-character timeout value, in milliseconds, to be used in the read command and the spoll command, i.e., the delay since the last character was read. The read_tmo_ms timeout value is not to be confused with the total time for which data is read. The read_tmo_ms value must be between 1 and 3000 milliseconds.

Source code in src/msl/equipment/interfaces/prologix.py
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
def __init__(self, equipment: Equipment) -> None:
    """Use [Prologix](https://prologix.biz/) hardware to establish a connection.

    For the GPIB-ETHERNET Controller, the format of the [address][msl.equipment.schema.Connection.address]
    string is `Prologix::HOST::1234::PAD[::SAD]`, where `HOST` is the hostname or IP address of the Prologix
    hardware, `1234` is the ethernet port that is open on the Prologix hardware, PAD (Primary GPIB Address)
    is an integer value between 0 and 30, and SAD (Secondary GPIB Address) is an integer value between
    96 and 126 (SAD is optional). For example,

    * `Prologix::192.168.1.110::1234::6`
    * `Prologix::192.168.1.110::1234::6::96`
    * `Prologix::prologix-00-21-69-01-31-04::1234::6` <br/>
       (typically, the hostname is `prologix-<MAC Address>`)

    For the GPIB-USB Controller, the format of the [address][msl.equipment.schema.Connection.address]
    string is `Prologix::PORT::PAD[::SAD]`, where `PORT` is the name of the serial port of the Prologix
    hardware, `PAD` (Primary GPIB Address) is an integer value between 0 and 30, and SAD (Secondary
    GPIB Address) is an integer value between 96 and 126 (SAD is optional). For example,

    * `Prologix::COM3::6`
    * `Prologix::/dev/ttyUSB0::6::112`

    Alternatively, to clearly separate the Prologix hardware address from the GPIB address you may include
    `GPIB::` in the address, for example,

    * `Prologix::192.168.1.110::1234::GPIB::6`
    * `Prologix::COM3::GPIB::22::96`

    Args:
        equipment: An [Equipment][] instance.

    A [Connection][msl.equipment.schema.Connection] instance supports the following _properties_
    for using Prologix hardware, as well as the _properties_ defined in,
    [Serial][msl.equipment.interfaces.serial.Serial] (for a GPIB-USB Controller) and in
    [Socket][msl.equipment.interfaces.socket.Socket] (for a GPIB-ETHERNET Controller).

    Attributes: Connection Properties:
        eoi (int): Whether to use the End or Identify line, either `0` (disable) or `1` (enable).
        eos (int): GPIB termination character(s): 0 (CR+LF), 1 (CR), 2 (LF) or 3 (no termination).
        eot_char (int): A user-specified character to append to network output when `eot_enable`
            is set to 1 and EOI is detected. Must be an ASCII value &lt;256, e.g., `eot_char=42`
            appends `*` (ASCII 42) when EOI is detected.
        eot_enable (int): Enables (1) or disables (0) the appending of a user-specified character, `eot_char`.
        mode (int): Configure the Prologix hardware to be a CONTROLLER (1) or DEVICE (0). _Default: `1`_
        read_tmo_ms (int): The inter-character timeout value, in milliseconds, to be used in the _read_
            command and the _spoll_ command, i.e., the delay since the last character was read. The
            `read_tmo_ms` timeout value is not to be confused with the total time for which data is
            read. The `read_tmo_ms` value must be between 1 and 3000 milliseconds.
    """
    self._addr: str = ""
    super().__init__(equipment)

    assert equipment.connection is not None  # noqa: S101
    info = parse_prologix_address(equipment.connection.address)
    if info is None:
        msg = f"Invalid Prologix address {equipment.connection.address!r}"
        raise ValueError(msg)

    pad = info.pad
    if pad < MIN_PAD_ADDRESS or pad > MAX_PAD_ADDRESS:
        msg = f"Invalid primary GPIB address {pad}, must be in the range [{MIN_PAD_ADDRESS}, {MAX_PAD_ADDRESS}]"
        raise ValueError(msg)

    sad = info.sad
    if sad is not None and (sad < MIN_SAD_ADDRESS or sad > MAX_SAD_ADDRESS):
        msg = f"Invalid secondary GPIB address {sad}, must be in the range [{MIN_SAD_ADDRESS}, {MAX_SAD_ADDRESS}]"
        raise ValueError(msg)

    self._addr = f"++addr {pad}" if sad is None else f"++addr {pad} {sad}"
    self._query_auto: bool = True
    self._hw_address: str = info.hw_address

    props = equipment.connection.properties

    try:
        self._controller: Serial | Socket = Prologix._controllers[self._hw_address]
    except KeyError:
        address = f"TCP::{self._hw_address}::{info.enet_port}" if info.enet_port else f"ASRL{self._hw_address}"
        e = Equipment(connection=Connection(address, **props))
        self._controller = PrologixEthernet(e) if info.enet_port else PrologixUSB(e)
        Prologix._controllers[self._hw_address] = self._controller
        Prologix._selected_addresses[self._hw_address] = ""

    # default is CONTROLLER mode
    mode = props.get("mode", 1)
    _ = self._controller.write(f"++mode {mode}")

    # set the options provided by the user
    for option in ["eoi", "eos", "eot_enable", "eot_char", "read_tmo_ms"]:
        value = props.get(option)
        if value is not None:
            _ = self._controller.write(f"++{option} {value}")

    self._ensure_gpib_address_selected()

controller property ¤

controller: Serial | Socket

The connection to the Prologix Controller for this equipment.

The returned type depends on whether a GPIB-USB or a GPIB-ETHERNET Controller is used to communicate with the equipment.

encoding property writable ¤

encoding: str

The encoding that is used for read and write operations.

equipment property ¤

equipment: Equipment

The Equipment associated with the interface.

max_read_size property writable ¤

max_read_size: int

The maximum number of bytes that can be read.

query_auto property writable ¤

query_auto: bool

Whether to send ++auto 1 before and ++auto 0 after a query to the Prologix Controller.

read_termination property writable ¤

read_termination: bytes | None

The termination character sequence that is used for a read operation.

Reading stops when the equipment stops sending data or the read_termination character sequence is detected. If you set the read_termination to be equal to a variable of type str, it will be encoded as bytes.

rstrip property writable ¤

rstrip: bool

Whether to remove trailing whitespace from read messages.

timeout property writable ¤

timeout: float | None

The timeout, in seconds, to use for the connection to the Prologix hardware.

This timeout value is not to be confused with the read_tmo_ms command that Prologix Controllers accept. To set the inter-character delay, i.e., the delay since the last character was read or for the spoll command, write the ++read_tmo_ms <time> message to the Controller.

write_termination property writable ¤

write_termination: bytes | None

The termination character sequence that is appended to write messages.

If you set the write_termination to be equal to a variable of type str, it will be encoded as bytes.

disconnect ¤

disconnect() -> None

Disconnect from the equipment.

Calling this method does not close the underlying Serial or Socket connection to the Prologix Controller since the connection to the Prologix Controller may still be required to send messages to other GPIB devices that are attached to the Controller.

Source code in src/msl/equipment/interfaces/prologix.py
170
171
172
173
174
175
176
177
178
179
180
def disconnect(self) -> None:  # pyright: ignore[reportImplicitOverride]
    """Disconnect from the equipment.

    Calling this method does not close the underlying [Serial][msl.equipment.interfaces.serial.Serial]
    or [Socket][msl.equipment.interfaces.socket.Socket] connection to the Prologix Controller since
    the connection to the Prologix Controller may still be required to send messages to other GPIB
    devices that are attached to the Controller.
    """
    if self._addr:
        self._addr = ""
        super().disconnect()

group_execute_trigger ¤

group_execute_trigger(*addresses: int) -> int

Send the Group Execute Trigger command to equipment at the specified addresses.

Up to 15 addresses may be specified. If no address is specified then the Group Execute Trigger command is issued to the currently-addressed equipment.

Parameters:

Name Type Description Default
addresses int

The primary (and optional secondary) GPIB addresses. If a secondary address is specified then it must follow its corresponding primary address, for example,

  • group_execute_trigger(1, 11, 17) → primary, primary, primary
  • group_execute_trigger(3, 96, 12, 21) → primary, secondary, primary, primary
()

Returns:

Type Description
int

The number of bytes written.

Source code in src/msl/equipment/interfaces/prologix.py
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
def group_execute_trigger(self, *addresses: int) -> int:
    """Send the Group Execute Trigger command to equipment at the specified addresses.

    Up to 15 addresses may be specified. If no address is specified then the
    Group Execute Trigger command is issued to the currently-addressed equipment.

    Args:
        addresses: The primary (and optional secondary) GPIB addresses. If a secondary address is
            specified then it must follow its corresponding primary address, for example,

            * group_execute_trigger(1, 11, 17) &#8594; primary, primary, primary
            * group_execute_trigger(3, 96, 12, 21) &#8594; primary, secondary, primary, primary

    Returns:
        The number of bytes written.
    """
    command = "++trg"
    if addresses:
        command += " " + " ".join(str(a) for a in addresses)
    return self._controller.write(command)

query ¤

query(
    message: bytes | str,
    *,
    delay: float = 0.0,
    decode: Literal[True] = True,
    dtype: None = None,
    fmt: MessageFormat | None = ...,
    size: int | None = ...
) -> str
query(
    message: bytes | str,
    *,
    delay: float = 0.0,
    decode: Literal[False] = False,
    dtype: None = None,
    fmt: MessageFormat | None = ...,
    size: int | None = ...
) -> bytes
query(
    message: bytes | str,
    *,
    delay: float = 0.0,
    decode: bool = ...,
    dtype: MessageDataType = ...,
    fmt: MessageFormat | None = ...,
    size: int | None = ...
) -> NumpyArray1D
query(
    message: bytes | str,
    *,
    delay: float = 0.0,
    decode: bool = True,
    dtype: MessageDataType | None = None,
    fmt: MessageFormat | None = None,
    size: int | None = None
) -> bytes | str | NumpyArray1D

Convenience method for performing a write followed by a read.

Parameters:

Name Type Description Default
message bytes | str

The message to write to the equipment.

required
delay float

Time delay, in seconds, to wait between the write and read operations.

0.0
decode bool

Whether to decode the returned message (i.e., convert the message to a str) or keep the message as bytes. Ignored if dtype is not None.

True
dtype MessageDataType | None

The data type of the elements in the returned message. Can be any object that numpy dtype supports. See from_bytes for more details. For messages that are of scalar type (i.e., a single number) it is more efficient to not specify dtype but to pass the returned message to the int or float class to convert the message to the appropriate numeric type.

None
fmt MessageFormat | None

The format that the returned message data is in. Ignored if dtype is None. See from_bytes for more details.

None
size int | None

The number of bytes to read. Ignored if the value is None.

None

Returns:

Type Description
bytes | str | NumpyArray1D

The message from the equipment. If dtype is specified, then the message is returned as an numpy.ndarray, if decode is True then the message is returned as a str, otherwise the message is returned as bytes.

Source code in src/msl/equipment/interfaces/prologix.py
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
def query(  # noqa: PLR0913
    self,
    message: bytes | str,
    *,
    delay: float = 0.0,
    decode: bool = True,
    dtype: MessageDataType | None = None,
    fmt: MessageFormat | None = None,
    size: int | None = None,
) -> bytes | str | NumpyArray1D:
    """Convenience method for performing a [write][msl.equipment.interfaces.prologix.Prologix.write]
    followed by a [read][msl.equipment.interfaces.prologix.Prologix.read].

    Args:
        message: The message to write to the equipment.
        delay: Time delay, in seconds, to wait between the _write_ and _read_ operations.
        decode: Whether to decode the returned message (i.e., convert the message to a [str][])
            or keep the message as [bytes][]. Ignored if `dtype` is not `None`.
        dtype: The data type of the elements in the returned message. Can be any object
            that numpy [dtype][numpy.dtype] supports. See [from_bytes][msl.equipment.utils.from_bytes]
            for more details. For messages that are of scalar type (i.e., a single number)
            it is more efficient to not specify `dtype` but to pass the returned message to the
            [int][] or [float][] class to convert the message to the appropriate numeric type.
        fmt: The format that the returned message data is in. Ignored if `dtype` is `None`.
             See [from_bytes][msl.equipment.utils.from_bytes] for more details.
        size: The number of bytes to read. Ignored if the value is `None`.

    Returns:
        The message from the equipment. If `dtype` is specified, then the message is
            returned as an [numpy.ndarray][], if `decode` is `True` then the message
            is returned as a [str][], otherwise the message is returned as [bytes][].
    """  # noqa: D205
    self._ensure_gpib_address_selected()

    if self._query_auto:
        _ = self._controller.write(b"++auto 1")

    reply = self._controller.query(message, delay=delay, decode=decode, dtype=dtype, fmt=fmt, size=size)  # type: ignore[misc, arg-type]

    if self._query_auto:
        _ = self._controller.write(b"++auto 0")

    return reply

read ¤

read(
    *,
    decode: Literal[True] = True,
    dtype: None = None,
    fmt: MessageFormat | None = ...,
    size: int | None = ...
) -> str
read(
    *,
    decode: Literal[False] = False,
    dtype: None = None,
    fmt: MessageFormat | None = ...,
    size: int | None = ...
) -> bytes
read(
    *,
    decode: bool = ...,
    dtype: MessageDataType = ...,
    fmt: MessageFormat | None = ...,
    size: int | None = ...
) -> NumpyArray1D
read(
    *,
    decode: bool = True,
    dtype: MessageDataType | None = None,
    fmt: MessageFormat | None = None,
    size: int | None = None
) -> bytes | str | NumpyArray1D

Read a message from the equipment.

See MessageBased.read() for more details.

Parameters:

Name Type Description Default
decode bool

Whether to decode the message (i.e., convert the message to a str) or keep the message as bytes. Ignored if dtype is not None.

True
dtype MessageDataType | None

The data type of the elements in the message. Can be any object that numpy dtype supports. See from_bytes for more details. For messages that are of scalar type (i.e., a single number) it is more efficient to not specify dtype but to pass the returned message to the int or float class to convert the message to the appropriate numeric type.

None
fmt MessageFormat | None

The format that the message data is in. Ignored if dtype is None. See from_bytes for more details.

None
size int | None

The number of bytes to read. Ignored if the value is None.

None

Returns:

Type Description
bytes | str | NumpyArray1D

The message from the equipment. If dtype is specified, then the message is returned as a numpy ndarray, if decode is True then the message is returned as a str, otherwise the message is returned as bytes.

Source code in src/msl/equipment/interfaces/prologix.py
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
def read(
    self,
    *,
    decode: bool = True,
    dtype: MessageDataType | None = None,
    fmt: MessageFormat | None = None,
    size: int | None = None,
) -> bytes | str | NumpyArray1D:
    """Read a message from the equipment.

    See [MessageBased.read()][msl.equipment.interfaces.message_based.MessageBased.read] for more details.

    Args:
        decode: Whether to decode the message (i.e., convert the message to a [str][])
            or keep the message as [bytes][]. Ignored if `dtype` is not `None`.
        dtype: The data type of the elements in the message. Can be any object
            that numpy [dtype][numpy.dtype] supports. See [from_bytes][msl.equipment.utils.from_bytes]
            for more details. For messages that are of scalar type (i.e., a single number)
            it is more efficient to not specify `dtype` but to pass the returned message to the
            [int][] or [float][] class to convert the message to the appropriate numeric type.
        fmt: The format that the message data is in. Ignored if `dtype` is `None`.
             See [from_bytes][msl.equipment.utils.from_bytes] for more details.
        size: The number of bytes to read. Ignored if the value is `None`.

    Returns:
        The message from the equipment. If `dtype` is specified, then the message is returned
            as a numpy [ndarray][numpy.ndarray], if `decode` is `True` then the message
            is returned as a [str][], otherwise the message is returned as [bytes][].
    """
    self._ensure_gpib_address_selected()
    return self._controller.read(decode=decode, dtype=dtype, fmt=fmt, size=size)  # type: ignore[arg-type]

write ¤

write(
    message: bytes | str,
    *,
    data: Sequence1D | None = None,
    dtype: MessageDataType = "<f",
    fmt: MessageFormat | None = "ieee"
) -> int

Write a message to the equipment.

Parameters:

Name Type Description Default
message bytes | str

The message to write to the equipment.

required
data Sequence1D | None

The data to append to message. See to_bytes for more details.

None
dtype MessageDataType

The data type to use to convert each element in data to bytes. Ignored if data is None. See to_bytes for more details.

'<f'
fmt MessageFormat | None

The format to use to convert data to bytes. Ignored if data is None. See to_bytes for more details.

'ieee'

Returns:

Type Description
int

The number of bytes written.

Source code in src/msl/equipment/interfaces/prologix.py
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
def write(
    self,
    message: bytes | str,
    *,
    data: Sequence1D | None = None,
    dtype: MessageDataType = "<f",
    fmt: MessageFormat | None = "ieee",
) -> int:
    """Write a message to the equipment.

    Args:
        message: The message to write to the equipment.
        data: The data to append to `message`. See [to_bytes][msl.equipment.utils.to_bytes]
            for more details.
        dtype: The data type to use to convert each element in `data` to bytes. Ignored
            if `data` is `None`. See [to_bytes][msl.equipment.utils.to_bytes] for more details.
        fmt: The format to use to convert `data` to bytes. Ignored if `data` is `None`.
            See [to_bytes][msl.equipment.utils.to_bytes] for more details.

    Returns:
        The number of bytes written.
    """
    self._ensure_gpib_address_selected()
    return self._controller.write(message, data=data, fmt=fmt, dtype=dtype)

PyVISA ¤

PyVISA(equipment: Equipment)

Bases: Interface

Use PyVISA as the backend to communicate with the equipment.

The backend value must be equal to PyVISA to use this class for the communication backend.

The PYVISA_LIBRARY environment variable is used (if it exists) to create the ResourceManager. This environment variable can be defined in a configuration file or by defining the environment variable in your code before connecting to the equipment using PyVISA for the first time. The default value is @ivi if this environment variable is not defined.

The returned object using msl-equipment to connect to the equipment is equivalent to calling open_resource, e.g.,

from msl.equipment import Backend, Connection

connection = Connection("GPIB::12", backend=Backend.PyVISA)
inst = connection.connect()
print(inst.query("*IDN?"))

is equivalent to

import pyvisa

rm = pyvisa.ResourceManager()
inst = rm.open_resource("GPIB::12")
print(inst.query("*IDN?"))

You can also combine the packages, use msl-equipment for managing information about the equipment and directly use pyvisa for the connection. If you use this combination, the editor you use to develop your code may have better support for features like code completion and type checking.

import pyvisa
from msl.equipment import Config

# config.xml contains <equipment eid="MSLE.0.063" name="dmm"/>
# and specifies where the equipment registers are and the connections file.
cfg = Config("config.xml")
equipment = cfg.equipment["dmm"]

rm = pyvisa.ResourceManager()
inst = rm.open_resource(equipment.connection.address)
data = inst.query("READ?")

# You could now use the `equipment` instance to apply a correction to the `data`

Parameters:

Name Type Description Default
equipment Equipment

An Equipment instance.

required
Source code in src/msl/equipment/interfaces/pyvisa.py
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
def __init__(self, equipment: Equipment) -> None:
    """Use [PyVISA] as the backend to communicate with the equipment.

    The [backend][msl.equipment.schema.Connection.backend] value must be equal to
    `PyVISA` to use this class for the communication backend.

    The `PYVISA_LIBRARY` environment variable is used (if it exists) to create the
    [ResourceManager][pyvisa.highlevel.ResourceManager]. This environment variable
    can be defined in a [configuration file][config-xml-example] or by defining the
    environment variable in your code before connecting to the equipment using
    [PyVISA][msl.equipment.interfaces.pyvisa.PyVISA] for the first time. The default
    value is `@ivi` if this environment variable is not defined.

    The returned object using `msl-equipment` to connect to the equipment is equivalent
    to calling [open_resource][pyvisa.highlevel.ResourceManager.open_resource], e.g.,

    ```python
    from msl.equipment import Backend, Connection

    connection = Connection("GPIB::12", backend=Backend.PyVISA)
    inst = connection.connect()
    print(inst.query("*IDN?"))
    ```

    is equivalent to

    ```python
    import pyvisa

    rm = pyvisa.ResourceManager()
    inst = rm.open_resource("GPIB::12")
    print(inst.query("*IDN?"))
    ```

    You can also combine the packages, use `msl-equipment` for managing information
    about the equipment and directly use `pyvisa` for the connection. If you use this
    combination, the editor you use to develop your code may have better support for
    features like code completion and type checking.

    ```python
    import pyvisa
    from msl.equipment import Config

    # config.xml contains <equipment eid="MSLE.0.063" name="dmm"/>
    # and specifies where the equipment registers are and the connections file.
    cfg = Config("config.xml")
    equipment = cfg.equipment["dmm"]

    rm = pyvisa.ResourceManager()
    inst = rm.open_resource(equipment.connection.address)
    data = inst.query("READ?")

    # You could now use the `equipment` instance to apply a correction to the `data`
    ```

    [PyVISA]: https://pyvisa.readthedocs.io/en/stable/

    Args:
        equipment: An [Equipment][] instance.
    """
    self._resource: Resource | None = None
    super().__init__(equipment)

    if pyvisa is None:  # pragma: no branch
        msg = "pyvisa is not installed, run: pip install pyvisa"  # type: ignore[unreachable]
        raise RuntimeError(msg)

    assert equipment.connection is not None  # noqa: S101
    kwargs = _prepare_kwargs(equipment.connection.properties)

    if PyVISA.rm is None:
        PyVISA.rm = pyvisa.ResourceManager()

    self._resource = PyVISA.rm.open_resource(equipment.connection.address, **kwargs)

equipment property ¤

equipment: Equipment

The Equipment associated with the interface.

rm class-attribute instance-attribute ¤

rm: ResourceManager | None = None

PyVISA Resource Manager.

disconnect ¤

disconnect() -> None

Calls pyvisa.resources.Resource.close.

Source code in src/msl/equipment/interfaces/pyvisa.py
126
127
128
129
130
131
132
def disconnect(self) -> None:  # pyright: ignore[reportImplicitOverride]
    """Calls [pyvisa.resources.Resource.close][]."""
    if self._resource is not None:
        self._resource.close()
        logger.debug("Disconnected from %s", self)
        self._resource = None
    super().disconnect()

SDK ¤

SDK(
    equipment: Equipment,
    libtype: LibType | None = None,
    path: PathLike | None = None,
)

Bases: Interface

Base class for equipment that use the manufacturer's Software Development Kit (SDK).

You can use the configuration file to add the directory that the SDK is located at to the PATH environment variable.

Parameters:

Name Type Description Default
equipment Equipment

An Equipment instance.

required
libtype LibType | None

The library type. See LoadLibrary for more details.

None
path PathLike | None

The path to the SDK. Specifying this value will take precedence over the address value.

None
Source code in src/msl/equipment/interfaces/sdk.py
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
def __init__(self, equipment: Equipment, libtype: LibType | None = None, path: PathLike | None = None) -> None:
    """Base class for equipment that use the manufacturer's Software Development Kit (SDK).

    You can use the [configuration file][config-xml-example] to add the directory that the SDK
    is located at to the `PATH` environment variable.

    Args:
        equipment: An [Equipment][] instance.
        libtype: The library type. See [LoadLibrary][msl.loadlib.load_library.LoadLibrary] for more details.
        path: The path to the SDK. Specifying this value will take precedence over the
            [address][msl.equipment.schema.Connection.address] value.
    """
    super().__init__(equipment)

    if path is None:
        assert equipment.connection is not None  # noqa: S101
        info = parse_sdk_address(equipment.connection.address)
        if info is None:
            msg = f"Invalid SDK interface address {equipment.connection.address!r}"
            raise ValueError(msg)
        path = info.path

    self._load_library: LoadLibrary = LoadLibrary(path, libtype)
    self._sdk: Any = self._load_library.lib

assembly property ¤

assembly: Any

assembly — The reference to the .NET assembly.

equipment property ¤

equipment: Equipment

The Equipment associated with the interface.

gateway property ¤

gateway: Any

gateway — The reference to the JAVA gateway.

path property ¤

path: str

str — The path to the SDK file.

sdk property ¤

sdk: Any

lib — The reference to the SDK object.

disconnect ¤

disconnect() -> None

Cleanup references to the SDK library.

Source code in src/msl/equipment/interfaces/sdk.py
57
58
59
60
61
62
def disconnect(self) -> None:  # pyright: ignore[reportImplicitOverride]
    """Cleanup references to the SDK library."""
    if hasattr(self, "_sdk") and self._sdk is not None:
        self._load_library.cleanup()
        self._sdk = None
        super().disconnect()

log_errcheck ¤

log_errcheck(
    result: Any, func: Any, arguments: tuple[Any, ...]
) -> Any

Convenience method for logging an errcheck from ctypes.

Source code in src/msl/equipment/interfaces/sdk.py
69
70
71
72
def log_errcheck(self, result: Any, func: Any, arguments: tuple[Any, ...]) -> Any:  # noqa: ANN401
    """Convenience method for logging an [errcheck][ctypes._CFuncPtr.errcheck] from [ctypes][]."""
    logger.debug("%s.%s%s -> %s", self.__class__.__name__, func.__name__, arguments, result)
    return result

Serial ¤

Serial(equipment: Equipment)

Bases: MessageBased

Base class for equipment that is connected through a serial port.

Parameters:

Name Type Description Default
equipment Equipment

An Equipment instance.

required

A Connection instance supports the following properties for the serial communication protocol, as well as the properties defined in MessageBased. The DataBits, Parity and StopBits enumeration names and values may also be used. For properties that specify an alias, you may also use the alternative name as the property name. See serial.Serial for more details.

Connection Properties:

Name Type Description
baud_rate int

The baud rate (alias: baudrate). Default: 9600

data_bits int

The number of data bits, e.g. 5, 6, 7, 8 (alias: bytesize). Default: 8

dsr_dtr bool

Whether to enable hardware (DSR/DTR) flow control (alias: dsrdtr). Default: False

inter_byte_timeout float | None

The inter-character timeout. Default: None

parity str

Parity checking, e.g. 'even', 'odd'. Default: none

rts_cts bool

Whether to enable hardware (RTS/CTS) flow control (alias: rtscts). Default: False

stop_bits int | float

The number of stop bits, e.g. 1, 1.5, 2 (alias: stopbits). Default: 1

xon_xoff bool

Whether to enable software flow control (alias: xonxoff). Default: False

Source code in src/msl/equipment/interfaces/serial.py
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
def __init__(self, equipment: Equipment) -> None:
    """Base class for equipment that is connected through a serial port.

    Args:
        equipment: An [Equipment][] instance.

    A [Connection][msl.equipment.schema.Connection] instance supports the following _properties_
    for the serial communication protocol, as well as the _properties_ defined in
    [MessageBased][msl.equipment.interfaces.message_based.MessageBased]. The
    [DataBits][msl.equipment.enumerations.DataBits], [Parity][msl.equipment.enumerations.Parity]
    and [StopBits][msl.equipment.enumerations.StopBits] enumeration names and values may also
    be used. For properties that specify an _alias_, you may also use the alternative name as
    the property name. See [serial.Serial][] for more details.

    Attributes: Connection Properties:
        baud_rate (int): The baud rate (_alias:_ baudrate). _Default: `9600`_
        data_bits (int): The number of data bits, e.g. 5, 6, 7, 8 (_alias:_ bytesize). _Default: `8`_
        dsr_dtr (bool): Whether to enable hardware (DSR/DTR) flow control (_alias:_ dsrdtr). _Default: `False`_
        inter_byte_timeout (float | None): The inter-character timeout. _Default: `None`_
        parity (str): Parity checking, e.g. 'even', 'odd'. _Default: `none`_
        rts_cts (bool): Whether to enable hardware (RTS/CTS) flow control (_alias:_ rtscts). _Default: `False`_
        stop_bits (int | float): The number of stop bits, e.g. 1, 1.5, 2 (_alias:_ stopbits). _Default: `1`_
        xon_xoff (bool): Whether to enable software flow control (_alias:_ xonxoff). _Default: `False`_
    """
    super().__init__(equipment)

    assert equipment.connection is not None  # noqa: S101
    info = parse_serial_address(equipment.connection.address)
    if info is None:
        msg = f"Invalid serial address {equipment.connection.address!r}"
        raise ValueError(msg)

    self._serial: serial.Serial = _init_serial(info.port, equipment.connection.properties)
    self._set_interface_timeout()

    try:
        self._serial.open()
    except serial.SerialException as e:
        raise MSLConnectionError(self, str(e)) from None

encoding property writable ¤

encoding: str

The encoding that is used for read and write operations.

equipment property ¤

equipment: Equipment

The Equipment associated with the interface.

max_read_size property writable ¤

max_read_size: int

The maximum number of bytes that can be read.

read_termination property writable ¤

read_termination: bytes | None

The termination character sequence that is used for a read operation.

Reading stops when the equipment stops sending data or the read_termination character sequence is detected. If you set the read_termination to be equal to a variable of type str, it will be encoded as bytes.

rstrip property writable ¤

rstrip: bool

Whether to remove trailing whitespace from read messages.

serial property ¤

serial: Serial

Returns the reference to the serial instance.

timeout property writable ¤

timeout: float | None

The timeout, in seconds, for read and write operations.

A value <0 will set the timeout to be None (blocking mode).

write_termination property writable ¤

write_termination: bytes | None

The termination character sequence that is appended to write messages.

If you set the write_termination to be equal to a variable of type str, it will be encoded as bytes.

disconnect ¤

disconnect() -> None

Close the serial port.

Source code in src/msl/equipment/interfaces/serial.py
136
137
138
139
140
def disconnect(self) -> None:  # pyright: ignore[reportImplicitOverride]
    """Close the serial port."""
    if hasattr(self, "_serial") and self._serial.is_open:
        self._serial.close()
        super().disconnect()

query ¤

query(
    message: bytes | str,
    *,
    delay: float = 0.0,
    decode: Literal[True] = True,
    dtype: None = None,
    fmt: MessageFormat | None = ...,
    size: int | None = ...
) -> str
query(
    message: bytes | str,
    *,
    delay: float = 0.0,
    decode: Literal[False] = False,
    dtype: None = None,
    fmt: MessageFormat | None = ...,
    size: int | None = ...
) -> bytes
query(
    message: bytes | str,
    *,
    delay: float = 0.0,
    decode: bool = ...,
    dtype: MessageDataType = ...,
    fmt: MessageFormat | None = ...,
    size: int | None = ...
) -> NumpyArray1D
query(
    message: bytes | str,
    *,
    delay: float = 0.0,
    decode: bool = True,
    dtype: MessageDataType | None = None,
    fmt: MessageFormat | None = None,
    size: int | None = None
) -> bytes | str | NumpyArray1D

Convenience method for performing a write followed by a read.

Parameters:

Name Type Description Default
message bytes | str

The message to write to the equipment.

required
delay float

Time delay, in seconds, to wait between the write and read operations.

0.0
decode bool

Whether to decode the returned message (i.e., convert the message to a str) or keep the message as bytes. Ignored if dtype is not None.

True
dtype MessageDataType | None

The data type of the elements in the returned message. Can be any object that numpy dtype supports. See from_bytes for more details. For messages that are of scalar type (i.e., a single number) it is more efficient to not specify dtype but to pass the returned message to the int or float class to convert the message to the appropriate numeric type.

None
fmt MessageFormat | None

The format that the returned message data is in. Ignored if dtype is None. See from_bytes for more details.

None
size int | None

The number of bytes to read. Ignored if the value is None.

None

Returns:

Type Description
bytes | str | NumpyArray1D

The message from the equipment. If dtype is specified, then the message is returned as a numpy ndarray, if decode is True then the message is returned as a str, otherwise the message is returned as bytes.

Source code in src/msl/equipment/interfaces/message_based.py
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
def query(  # noqa: PLR0913
    self,
    message: bytes | str,
    *,
    delay: float = 0.0,
    decode: bool = True,
    dtype: MessageDataType | None = None,
    fmt: MessageFormat | None = None,
    size: int | None = None,
) -> bytes | str | NumpyArray1D:
    """Convenience method for performing a [write][msl.equipment.interfaces.message_based.MessageBased.write]
    followed by a [read][msl.equipment.interfaces.message_based.MessageBased.read].

    Args:
        message: The message to write to the equipment.
        delay: Time delay, in seconds, to wait between the _write_ and _read_ operations.
        decode: Whether to decode the returned message (i.e., convert the message to a [str][])
            or keep the message as [bytes][]. Ignored if `dtype` is not `None`.
        dtype: The data type of the elements in the returned message. Can be any object
            that numpy [dtype][numpy.dtype] supports. See [from_bytes][msl.equipment.utils.from_bytes]
            for more details. For messages that are of scalar type (i.e., a single number)
            it is more efficient to not specify `dtype` but to pass the returned message to the
            [int][] or [float][] class to convert the message to the appropriate numeric type.
        fmt: The format that the returned message data is in. Ignored if `dtype` is `None`.
             See [from_bytes][msl.equipment.utils.from_bytes] for more details.
        size: The number of bytes to read. Ignored if the value is `None`.

    Returns:
        The message from the equipment. If `dtype` is specified, then the message is
            returned as a numpy [ndarray][numpy.ndarray], if `decode` is `True` then the message
            is returned as a [str][], otherwise the message is returned as [bytes][].
    """  # noqa: D205
    _ = self.write(message)
    if delay > 0:
        time.sleep(delay)
    if dtype:
        return self.read(dtype=dtype, fmt=fmt, size=size)
    return self.read(decode=decode, size=size)

read ¤

read(
    *,
    decode: Literal[True] = True,
    dtype: None = None,
    fmt: MessageFormat | None = ...,
    size: int | None = ...
) -> str
read(
    *,
    decode: Literal[False] = False,
    dtype: None = None,
    fmt: MessageFormat | None = ...,
    size: int | None = ...
) -> bytes
read(
    *,
    decode: bool = ...,
    dtype: MessageDataType = ...,
    fmt: MessageFormat | None = ...,
    size: int | None = ...
) -> NumpyArray1D
read(
    *,
    decode: bool = True,
    dtype: MessageDataType | None = None,
    fmt: MessageFormat | None = None,
    size: int | None = None
) -> bytes | str | NumpyArray1D

Read a message from the equipment.

This method will block until one of the following conditions is fulfilled:

  1. size bytes have been received — only if size is not None.
  2. the read_termination byte(s) is(are) received — only if read_termination is not None.
  3. a timeout occurs — only if timeout is not None. If a timeout occurs, an MSLTimeoutError is raised.
  4. max_read_size bytes have been received. If the maximum number of bytes have been read, an MSLConnectionError is raised.

Tip

You may also want to set the rstrip value for the class instance.

Parameters:

Name Type Description Default
decode bool

Whether to decode the message (i.e., convert the message to a str) or keep the message as bytes. Ignored if dtype is not None.

True
dtype MessageDataType | None

The data type of the elements in the message. Can be any object that numpy dtype supports. See from_bytes for more details. For messages that are of scalar type (i.e., a single number) it is more efficient to not specify dtype but to pass the returned message to the int or float class to convert the message to the appropriate numeric type.

None
fmt MessageFormat | None

The format that the message data is in. Ignored if dtype is None. See from_bytes for more details.

None
size int | None

The number of bytes to read. Ignored if the value is None.

None

Returns:

Type Description
bytes | str | NumpyArray1D

The message from the equipment. If dtype is specified, then the message is returned as a numpy ndarray, if decode is True then the message is returned as a str, otherwise the message is returned as bytes.

Source code in src/msl/equipment/interfaces/message_based.py
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
def read(
    self,
    *,
    decode: bool = True,
    dtype: MessageDataType | None = None,
    fmt: MessageFormat | None = None,
    size: int | None = None,
) -> bytes | str | NumpyArray1D:
    """Read a message from the equipment.

    This method will block until one of the following conditions is fulfilled:

    1. `size` bytes have been received &mdash; only if `size` is not `None`.
    2. the [read_termination][msl.equipment.interfaces.message_based.MessageBased.read_termination]
       byte(s) is(are) received &mdash; only if
       [read_termination][msl.equipment.interfaces.message_based.MessageBased.read_termination]
       is not `None`.
    3. a timeout occurs &mdash; only if [timeout][msl.equipment.interfaces.message_based.MessageBased.timeout]
       is not `None`. If a timeout occurs, an
       [MSLTimeoutError][msl.equipment.interfaces.message_based.MSLTimeoutError] is raised.
    4. [max_read_size][msl.equipment.interfaces.message_based.MessageBased.max_read_size]
       bytes have been received. If the maximum number of bytes have been read, an
       [MSLConnectionError][msl.equipment.interfaces.message_based.MSLConnectionError] is raised.

    !!! tip
        You may also want to set the [rstrip][msl.equipment.interfaces.message_based.MessageBased.rstrip]
        value for the class instance.

    Args:
        decode: Whether to decode the message (i.e., convert the message to a [str][])
            or keep the message as [bytes][]. Ignored if `dtype` is not `None`.
        dtype: The data type of the elements in the message. Can be any object
            that numpy [dtype][numpy.dtype] supports. See [from_bytes][msl.equipment.utils.from_bytes]
            for more details. For messages that are of scalar type (i.e., a single number)
            it is more efficient to not specify `dtype` but to pass the returned message to the
            [int][] or [float][] class to convert the message to the appropriate numeric type.
        fmt: The format that the message data is in. Ignored if `dtype` is `None`.
             See [from_bytes][msl.equipment.utils.from_bytes] for more details.
        size: The number of bytes to read. Ignored if the value is `None`.

    Returns:
        The message from the equipment. If `dtype` is specified, then the message is returned
            as a numpy [ndarray][numpy.ndarray], if `decode` is `True` then the message
            is returned as a [str][], otherwise the message is returned as [bytes][].
    """
    if size is not None and size > self._max_read_size:
        msg = f"max_read_size is {self._max_read_size} bytes, requesting {size} bytes"
        raise MSLConnectionError(self, msg)

    try:
        message = self._read(size)
    except (serial.SerialTimeoutException, socket.timeout, TimeoutError):
        raise MSLTimeoutError(self) from None
    except Exception as e:  # noqa: BLE001
        msg = f"{e.__class__.__name__}: {e}"
        raise MSLConnectionError(self, msg) from None

    if size is None:
        if dtype:
            logger.debug("%s.read(dtype=%r, fmt=%r) -> %r", self, dtype, fmt, message)
        else:
            logger.debug("%s.read() -> %r", self, message)
    else:
        if len(message) != size:
            msg = f"received {len(message)} bytes, requested {size} bytes"
            raise MSLConnectionError(self, msg)
        logger.debug("%s.read(size=%s) -> %r", self, size, message)

    if self._rstrip:
        message = message.rstrip()

    if dtype:
        return from_bytes(message, fmt=fmt, dtype=dtype)

    if decode:
        return message.decode(encoding=self._encoding)

    return message

write ¤

write(
    message: bytes | str,
    *,
    data: Sequence1D | None = None,
    dtype: MessageDataType = "<f",
    fmt: MessageFormat | None = "ieee"
) -> int

Write a message to the equipment.

Parameters:

Name Type Description Default
message bytes | str

The message to write to the equipment.

required
data Sequence1D | None

The data to append to message. See to_bytes for more details.

None
dtype MessageDataType

The data type to use to convert each element in data to bytes. Ignored if data is None. See to_bytes for more details.

'<f'
fmt MessageFormat | None

The format to use to convert data to bytes. Ignored if data is None. See to_bytes for more details.

'ieee'

Returns:

Type Description
int

The number of bytes written.

Source code in src/msl/equipment/interfaces/message_based.py
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
def write(
    self,
    message: bytes | str,
    *,
    data: Sequence1D | None = None,
    dtype: MessageDataType = "<f",
    fmt: MessageFormat | None = "ieee",
) -> int:
    """Write a message to the equipment.

    Args:
        message: The message to write to the equipment.
        data: The data to append to `message`. See [to_bytes][msl.equipment.utils.to_bytes]
            for more details.
        dtype: The data type to use to convert each element in `data` to bytes. Ignored
            if `data` is `None`. See [to_bytes][msl.equipment.utils.to_bytes] for more details.
        fmt: The format to use to convert `data` to bytes. Ignored if `data` is `None`.
            See [to_bytes][msl.equipment.utils.to_bytes] for more details.

    Returns:
        The number of bytes written.
    """
    if not isinstance(message, bytes):
        message = message.encode(encoding=self._encoding)

    if data is not None:
        message += to_bytes(data, fmt=fmt, dtype=dtype)

    if self._write_termination and not message.endswith(self._write_termination):
        message += self._write_termination

    logger.debug("%s.write(%r)", self, message)

    try:
        return self._write(message)
    except (serial.SerialTimeoutException, socket.timeout, TimeoutError):
        raise MSLTimeoutError(self) from None
    except Exception as e:  # noqa: BLE001
        raise MSLConnectionError(self, str(e)) from None

Socket ¤

Socket(equipment: Equipment)

Bases: MessageBased

Base class for equipment that is connected through a socket.

Parameters:

Name Type Description Default
equipment Equipment

An Equipment instance.

required

A Connection instance supports the following properties for the socket communication protocol, as well as the properties defined in MessageBased.

Connection Properties:

Name Type Description
buffer_size int

The maximum number of bytes to read at a time. Default: 4096

Source code in src/msl/equipment/interfaces/socket.py
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
def __init__(self, equipment: Equipment) -> None:
    """Base class for equipment that is connected through a socket.

    Args:
        equipment: An [Equipment][] instance.

    A [Connection][msl.equipment.schema.Connection] instance supports the following _properties_
    for the socket communication protocol, as well as the _properties_ defined in
    [MessageBased][msl.equipment.interfaces.message_based.MessageBased].

    Attributes: Connection Properties:
        buffer_size (int): The maximum number of bytes to read at a time. _Default: `4096`_
    """
    super().__init__(equipment)

    assert equipment.connection is not None  # noqa: S101

    info = parse_socket_address(equipment.connection.address)
    if info is None:
        msg = f"Invalid socket address {equipment.connection.address!r}"
        raise ValueError(msg)

    self._info: ParsedSocketAddress = info

    props = equipment.connection.properties
    self._buffer_size: int = props.get("buffer_size", 4096)
    self._byte_buffer: bytearray = bytearray()

    typ: int = socket.SOCK_DGRAM if equipment.connection.address.startswith("UDP") else socket.SOCK_STREAM
    self._is_stream: bool = typ == socket.SOCK_STREAM
    self._socket: socket.socket = socket.socket(family=socket.AF_INET, type=typ)
    self._connect()

encoding property writable ¤

encoding: str

The encoding that is used for read and write operations.

equipment property ¤

equipment: Equipment

The Equipment associated with the interface.

max_read_size property writable ¤

max_read_size: int

The maximum number of bytes that can be read.

read_termination property writable ¤

read_termination: bytes | None

The termination character sequence that is used for a read operation.

Reading stops when the equipment stops sending data or the read_termination character sequence is detected. If you set the read_termination to be equal to a variable of type str, it will be encoded as bytes.

rstrip property writable ¤

rstrip: bool

Whether to remove trailing whitespace from read messages.

socket property ¤

socket: socket

Returns a reference to the underlying socket.

timeout property writable ¤

timeout: float | None

The timeout, in seconds, for read and write operations.

A value <0 will set the timeout to be None (blocking mode).

write_termination property writable ¤

write_termination: bytes | None

The termination character sequence that is appended to write messages.

If you set the write_termination to be equal to a variable of type str, it will be encoded as bytes.

disconnect ¤

disconnect() -> None

Close the socket.

Source code in src/msl/equipment/interfaces/socket.py
131
132
133
134
135
def disconnect(self) -> None:  # pyright: ignore[reportImplicitOverride]
    """Close the socket."""
    if hasattr(self, "_socket") and self._socket.fileno() != -1:
        self._socket.close()
        super().disconnect()

query ¤

query(
    message: bytes | str,
    *,
    delay: float = 0.0,
    decode: Literal[True] = True,
    dtype: None = None,
    fmt: MessageFormat | None = ...,
    size: int | None = ...
) -> str
query(
    message: bytes | str,
    *,
    delay: float = 0.0,
    decode: Literal[False] = False,
    dtype: None = None,
    fmt: MessageFormat | None = ...,
    size: int | None = ...
) -> bytes
query(
    message: bytes | str,
    *,
    delay: float = 0.0,
    decode: bool = ...,
    dtype: MessageDataType = ...,
    fmt: MessageFormat | None = ...,
    size: int | None = ...
) -> NumpyArray1D
query(
    message: bytes | str,
    *,
    delay: float = 0.0,
    decode: bool = True,
    dtype: MessageDataType | None = None,
    fmt: MessageFormat | None = None,
    size: int | None = None
) -> bytes | str | NumpyArray1D

Convenience method for performing a write followed by a read.

Parameters:

Name Type Description Default
message bytes | str

The message to write to the equipment.

required
delay float

Time delay, in seconds, to wait between the write and read operations.

0.0
decode bool

Whether to decode the returned message (i.e., convert the message to a str) or keep the message as bytes. Ignored if dtype is not None.

True
dtype MessageDataType | None

The data type of the elements in the returned message. Can be any object that numpy dtype supports. See from_bytes for more details. For messages that are of scalar type (i.e., a single number) it is more efficient to not specify dtype but to pass the returned message to the int or float class to convert the message to the appropriate numeric type.

None
fmt MessageFormat | None

The format that the returned message data is in. Ignored if dtype is None. See from_bytes for more details.

None
size int | None

The number of bytes to read. Ignored if the value is None.

None

Returns:

Type Description
bytes | str | NumpyArray1D

The message from the equipment. If dtype is specified, then the message is returned as a numpy ndarray, if decode is True then the message is returned as a str, otherwise the message is returned as bytes.

Source code in src/msl/equipment/interfaces/message_based.py
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
def query(  # noqa: PLR0913
    self,
    message: bytes | str,
    *,
    delay: float = 0.0,
    decode: bool = True,
    dtype: MessageDataType | None = None,
    fmt: MessageFormat | None = None,
    size: int | None = None,
) -> bytes | str | NumpyArray1D:
    """Convenience method for performing a [write][msl.equipment.interfaces.message_based.MessageBased.write]
    followed by a [read][msl.equipment.interfaces.message_based.MessageBased.read].

    Args:
        message: The message to write to the equipment.
        delay: Time delay, in seconds, to wait between the _write_ and _read_ operations.
        decode: Whether to decode the returned message (i.e., convert the message to a [str][])
            or keep the message as [bytes][]. Ignored if `dtype` is not `None`.
        dtype: The data type of the elements in the returned message. Can be any object
            that numpy [dtype][numpy.dtype] supports. See [from_bytes][msl.equipment.utils.from_bytes]
            for more details. For messages that are of scalar type (i.e., a single number)
            it is more efficient to not specify `dtype` but to pass the returned message to the
            [int][] or [float][] class to convert the message to the appropriate numeric type.
        fmt: The format that the returned message data is in. Ignored if `dtype` is `None`.
             See [from_bytes][msl.equipment.utils.from_bytes] for more details.
        size: The number of bytes to read. Ignored if the value is `None`.

    Returns:
        The message from the equipment. If `dtype` is specified, then the message is
            returned as a numpy [ndarray][numpy.ndarray], if `decode` is `True` then the message
            is returned as a [str][], otherwise the message is returned as [bytes][].
    """  # noqa: D205
    _ = self.write(message)
    if delay > 0:
        time.sleep(delay)
    if dtype:
        return self.read(dtype=dtype, fmt=fmt, size=size)
    return self.read(decode=decode, size=size)

read ¤

read(
    *,
    decode: Literal[True] = True,
    dtype: None = None,
    fmt: MessageFormat | None = ...,
    size: int | None = ...
) -> str
read(
    *,
    decode: Literal[False] = False,
    dtype: None = None,
    fmt: MessageFormat | None = ...,
    size: int | None = ...
) -> bytes
read(
    *,
    decode: bool = ...,
    dtype: MessageDataType = ...,
    fmt: MessageFormat | None = ...,
    size: int | None = ...
) -> NumpyArray1D
read(
    *,
    decode: bool = True,
    dtype: MessageDataType | None = None,
    fmt: MessageFormat | None = None,
    size: int | None = None
) -> bytes | str | NumpyArray1D

Read a message from the equipment.

This method will block until one of the following conditions is fulfilled:

  1. size bytes have been received — only if size is not None.
  2. the read_termination byte(s) is(are) received — only if read_termination is not None.
  3. a timeout occurs — only if timeout is not None. If a timeout occurs, an MSLTimeoutError is raised.
  4. max_read_size bytes have been received. If the maximum number of bytes have been read, an MSLConnectionError is raised.

Tip

You may also want to set the rstrip value for the class instance.

Parameters:

Name Type Description Default
decode bool

Whether to decode the message (i.e., convert the message to a str) or keep the message as bytes. Ignored if dtype is not None.

True
dtype MessageDataType | None

The data type of the elements in the message. Can be any object that numpy dtype supports. See from_bytes for more details. For messages that are of scalar type (i.e., a single number) it is more efficient to not specify dtype but to pass the returned message to the int or float class to convert the message to the appropriate numeric type.

None
fmt MessageFormat | None

The format that the message data is in. Ignored if dtype is None. See from_bytes for more details.

None
size int | None

The number of bytes to read. Ignored if the value is None.

None

Returns:

Type Description
bytes | str | NumpyArray1D

The message from the equipment. If dtype is specified, then the message is returned as a numpy ndarray, if decode is True then the message is returned as a str, otherwise the message is returned as bytes.

Source code in src/msl/equipment/interfaces/message_based.py
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
def read(
    self,
    *,
    decode: bool = True,
    dtype: MessageDataType | None = None,
    fmt: MessageFormat | None = None,
    size: int | None = None,
) -> bytes | str | NumpyArray1D:
    """Read a message from the equipment.

    This method will block until one of the following conditions is fulfilled:

    1. `size` bytes have been received &mdash; only if `size` is not `None`.
    2. the [read_termination][msl.equipment.interfaces.message_based.MessageBased.read_termination]
       byte(s) is(are) received &mdash; only if
       [read_termination][msl.equipment.interfaces.message_based.MessageBased.read_termination]
       is not `None`.
    3. a timeout occurs &mdash; only if [timeout][msl.equipment.interfaces.message_based.MessageBased.timeout]
       is not `None`. If a timeout occurs, an
       [MSLTimeoutError][msl.equipment.interfaces.message_based.MSLTimeoutError] is raised.
    4. [max_read_size][msl.equipment.interfaces.message_based.MessageBased.max_read_size]
       bytes have been received. If the maximum number of bytes have been read, an
       [MSLConnectionError][msl.equipment.interfaces.message_based.MSLConnectionError] is raised.

    !!! tip
        You may also want to set the [rstrip][msl.equipment.interfaces.message_based.MessageBased.rstrip]
        value for the class instance.

    Args:
        decode: Whether to decode the message (i.e., convert the message to a [str][])
            or keep the message as [bytes][]. Ignored if `dtype` is not `None`.
        dtype: The data type of the elements in the message. Can be any object
            that numpy [dtype][numpy.dtype] supports. See [from_bytes][msl.equipment.utils.from_bytes]
            for more details. For messages that are of scalar type (i.e., a single number)
            it is more efficient to not specify `dtype` but to pass the returned message to the
            [int][] or [float][] class to convert the message to the appropriate numeric type.
        fmt: The format that the message data is in. Ignored if `dtype` is `None`.
             See [from_bytes][msl.equipment.utils.from_bytes] for more details.
        size: The number of bytes to read. Ignored if the value is `None`.

    Returns:
        The message from the equipment. If `dtype` is specified, then the message is returned
            as a numpy [ndarray][numpy.ndarray], if `decode` is `True` then the message
            is returned as a [str][], otherwise the message is returned as [bytes][].
    """
    if size is not None and size > self._max_read_size:
        msg = f"max_read_size is {self._max_read_size} bytes, requesting {size} bytes"
        raise MSLConnectionError(self, msg)

    try:
        message = self._read(size)
    except (serial.SerialTimeoutException, socket.timeout, TimeoutError):
        raise MSLTimeoutError(self) from None
    except Exception as e:  # noqa: BLE001
        msg = f"{e.__class__.__name__}: {e}"
        raise MSLConnectionError(self, msg) from None

    if size is None:
        if dtype:
            logger.debug("%s.read(dtype=%r, fmt=%r) -> %r", self, dtype, fmt, message)
        else:
            logger.debug("%s.read() -> %r", self, message)
    else:
        if len(message) != size:
            msg = f"received {len(message)} bytes, requested {size} bytes"
            raise MSLConnectionError(self, msg)
        logger.debug("%s.read(size=%s) -> %r", self, size, message)

    if self._rstrip:
        message = message.rstrip()

    if dtype:
        return from_bytes(message, fmt=fmt, dtype=dtype)

    if decode:
        return message.decode(encoding=self._encoding)

    return message

reconnect ¤

reconnect(max_attempts: int = 1) -> None

Reconnect to the equipment.

Parameters:

Name Type Description Default
max_attempts int

The maximum number of attempts to try to reconnect with the equipment. If <1, keep trying until a connection is successful. If the maximum number of attempts has been reached then an exception is raise.

1
Source code in src/msl/equipment/interfaces/socket.py
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
def reconnect(self, max_attempts: int = 1) -> None:
    """Reconnect to the equipment.

    Args:
        max_attempts: The maximum number of attempts to try to reconnect with the equipment.
            If &lt;1, keep trying until a connection is successful. If the maximum number
            of attempts has been reached then an exception is raise.
    """
    self._socket.close()
    self._socket = socket.socket(family=self._socket.family, type=self._socket.type)

    attempt = 0
    while True:
        attempt += 1
        try:
            return self._connect()
        except (MSLConnectionError, MSLTimeoutError):
            if 0 < max_attempts <= attempt:
                raise

write ¤

write(
    message: bytes | str,
    *,
    data: Sequence1D | None = None,
    dtype: MessageDataType = "<f",
    fmt: MessageFormat | None = "ieee"
) -> int

Write a message to the equipment.

Parameters:

Name Type Description Default
message bytes | str

The message to write to the equipment.

required
data Sequence1D | None

The data to append to message. See to_bytes for more details.

None
dtype MessageDataType

The data type to use to convert each element in data to bytes. Ignored if data is None. See to_bytes for more details.

'<f'
fmt MessageFormat | None

The format to use to convert data to bytes. Ignored if data is None. See to_bytes for more details.

'ieee'

Returns:

Type Description
int

The number of bytes written.

Source code in src/msl/equipment/interfaces/message_based.py
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
def write(
    self,
    message: bytes | str,
    *,
    data: Sequence1D | None = None,
    dtype: MessageDataType = "<f",
    fmt: MessageFormat | None = "ieee",
) -> int:
    """Write a message to the equipment.

    Args:
        message: The message to write to the equipment.
        data: The data to append to `message`. See [to_bytes][msl.equipment.utils.to_bytes]
            for more details.
        dtype: The data type to use to convert each element in `data` to bytes. Ignored
            if `data` is `None`. See [to_bytes][msl.equipment.utils.to_bytes] for more details.
        fmt: The format to use to convert `data` to bytes. Ignored if `data` is `None`.
            See [to_bytes][msl.equipment.utils.to_bytes] for more details.

    Returns:
        The number of bytes written.
    """
    if not isinstance(message, bytes):
        message = message.encode(encoding=self._encoding)

    if data is not None:
        message += to_bytes(data, fmt=fmt, dtype=dtype)

    if self._write_termination and not message.endswith(self._write_termination):
        message += self._write_termination

    logger.debug("%s.write(%r)", self, message)

    try:
        return self._write(message)
    except (serial.SerialTimeoutException, socket.timeout, TimeoutError):
        raise MSLTimeoutError(self) from None
    except Exception as e:  # noqa: BLE001
        raise MSLConnectionError(self, str(e)) from None

VXI11 ¤

VXI11(equipment: Equipment)

Bases: MessageBased

Base class for the VXI-11 communication protocol.

Parameters:

Name Type Description Default
equipment Equipment

An Equipment instance.

required

A Connection instance supports the following properties for the VXI-11 communication protocol, as well as the properties defined in MessageBased.

Connection Properties:

Name Type Description
buffer_size int

The maximum number of bytes to read at a time. Default: 4096

lock_timeout float

The timeout (in seconds) to wait for a lock (0 means wait forever). Default: 0

port int

The port to use instead of calling the RPC Port Mapper function.

Source code in src/msl/equipment/interfaces/vxi11.py
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
def __init__(self, equipment: Equipment) -> None:
    """Base class for the [VXI-11](http://www.vxibus.org/specifications.html) communication protocol.

    Args:
        equipment: An [Equipment][] instance.

    A [Connection][msl.equipment.schema.Connection] instance supports the following _properties_
    for the [VXI-11](http://www.vxibus.org/specifications.html) communication protocol, as well
    as the _properties_ defined in [MessageBased][msl.equipment.interfaces.message_based.MessageBased].

    Attributes: Connection Properties:
        buffer_size (int): The maximum number of bytes to read at a time. _Default: `4096`_
        lock_timeout (float): The timeout (in seconds) to wait for a lock (0 means wait forever). _Default: `0`_
        port (int): The port to use instead of calling the RPC Port Mapper function.
    """
    # the following must be defined before calling super()
    self._core_client: CoreClient | None = None
    self._abort_client: AsyncClient | None = None
    self._lock_timeout: float = 0  # updated in lock_timeout.setter
    super().__init__(equipment)

    assert equipment.connection is not None  # noqa: S101
    info = parse_vxi_address(equipment.connection.address)
    if info is None:
        msg = f"Invalid VXI-11 address {equipment.connection.address!r}"
        raise ValueError(msg)

    self._info: ParsedVXI11Address = info

    props = equipment.connection.properties
    self._buffer_size: int = props.get("buffer_size", 4096)
    self._core_port: int = props.get("port", -1)  # updated in _connect if -1
    self._abort_port: int = -1  # updated in _connect
    self._max_recv_size: int = -1  # updated in _connect
    self._link_id: int = -1  # updated in _connect
    self._io_timeout_ms: int = -1  # updated in _set_interface_timeout
    self._lock_timeout_ms: int = -1  # updated in lock_timeout.setter
    self.lock_timeout = props.get("lock_timeout", 0)

    # A non-empty read_termination value is applied by default in
    # MessageBased if the user did not specify one. Set it back
    # to None if a read-termination character was not explicitly specified.
    if "read_termination" not in props and "termination" not in props:
        self.read_termination = None  # pyright: ignore[reportUnannotatedClassAttribute]

    # VXI-11 does not support write-termination characters
    self.write_termination = None  # pyright: ignore[reportUnannotatedClassAttribute]

    self._connect()
    self._set_interface_timeout()

encoding property writable ¤

encoding: str

The encoding that is used for read and write operations.

equipment property ¤

equipment: Equipment

The Equipment associated with the interface.

lock_timeout property writable ¤

lock_timeout: float

The time, in seconds, to wait to acquire a lock.

Setting the value to ≤0 (or None) means wait forever.

max_read_size property writable ¤

max_read_size: int

The maximum number of bytes that can be read.

rstrip property writable ¤

rstrip: bool

Whether to remove trailing whitespace from read messages.

socket property ¤

socket: socket | None

Returns the reference to the underlying socket.

timeout property writable ¤

timeout: float | None

The timeout, in seconds, for read and write operations.

A value <0 will set the timeout to be None (blocking mode).

abort ¤

abort() -> None

Stop an in-progress request.

Source code in src/msl/equipment/interfaces/vxi11.py
985
986
987
988
989
990
def abort(self) -> None:
    """Stop an in-progress request."""
    if self._abort_client is None:
        self._abort_client = AsyncClient(self._info.host)
        self._abort_client.connect(self._abort_port, timeout=self.timeout)
    self._abort_client.device_abort(self._link_id)

clear ¤

clear() -> None

Send the clear command to the device.

Source code in src/msl/equipment/interfaces/vxi11.py
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
def clear(self) -> None:
    """Send the `clear` command to the device."""
    if self._core_client is None:
        raise MSLConnectionError(self, "not connected to VXI-11 device")

    self._core_client.device_clear(
        lid=self._link_id,
        flags=self._init_flag(),
        lock_timeout=self._lock_timeout_ms,
        io_timeout=self._io_timeout_ms,
    )

create_intr_chan ¤

create_intr_chan(
    host_addr: int,
    host_port: int,
    prog_num: int,
    prog_vers: int,
    prog_family: int,
) -> None

Inform the network instrument server to establish an interrupt channel.

Parameters:

Name Type Description Default
host_addr int

Address of the host servicing the interrupt.

required
host_port int

Valid port number on the client.

required
prog_num int

Program number.

required
prog_vers int

Program version number.

required
prog_family int

The underlying socket protocol family type (IPPROTO_TCP or IPPROTO_UDP).

required
Source code in src/msl/equipment/interfaces/vxi11.py
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
def create_intr_chan(self, host_addr: int, host_port: int, prog_num: int, prog_vers: int, prog_family: int) -> None:
    """Inform the network instrument server to establish an interrupt channel.

    Args:
        host_addr: Address of the host servicing the interrupt.
        host_port: Valid port number on the client.
        prog_num: Program number.
        prog_vers: Program version number.
        prog_family: The underlying socket protocol family type (`IPPROTO_TCP` or `IPPROTO_UDP`).
    """
    if self._core_client is None:
        raise MSLConnectionError(self, "not connected to VXI-11 device")

    self._core_client.create_intr_chan(
        host_addr=host_addr, host_port=host_port, prog_num=prog_num, prog_vers=prog_vers, prog_family=prog_family
    )

destroy_intr_chan ¤

destroy_intr_chan() -> None

Inform the network instrument server to close its interrupt channel.

Source code in src/msl/equipment/interfaces/vxi11.py
1021
1022
1023
1024
1025
1026
def destroy_intr_chan(self) -> None:
    """Inform the network instrument server to close its interrupt channel."""
    if self._core_client is None:
        raise MSLConnectionError(self, "not connected to VXI-11 device")

    self._core_client.destroy_intr_chan()
destroy_link() -> None

Destroy the link with the device.

Source code in src/msl/equipment/interfaces/vxi11.py
1028
1029
1030
1031
1032
1033
def destroy_link(self) -> None:
    """Destroy the link with the device."""
    if self._core_client is None:
        raise MSLConnectionError(self, "not connected to VXI-11 device")

    self._core_client.destroy_link(self._link_id)

disconnect ¤

disconnect() -> None

Unlink and close the sockets.

Source code in src/msl/equipment/interfaces/vxi11.py
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
def disconnect(self) -> None:  # pyright: ignore[reportImplicitOverride]
    """Unlink and close the sockets."""
    if self._abort_client is None and self._core_client is None:
        return

    if self._abort_client is not None:
        self._abort_client.close()
        self._abort_client = None

    if self._core_client is not None:
        if self._link_id != -1:
            with contextlib.suppress(ConnectionError):
                self._core_client.destroy_link(self._link_id)
            self._link_id = -1

        self._core_client.close()
        self._core_client = None

    super().disconnect()

docmd ¤

docmd(cmd: int, value: float, fmt: str) -> bytes

Allows for a variety of commands to be executed.

Parameters:

Name Type Description Default
cmd int

An IEEE 488 command messages. For example, to send the Group Execute Trigger command, GET, the value of cmd is 0x08.

required
value float

The value to use with cmd. Can be of type bool, int or float.

required
fmt str

How to format value. See format-characters for more details. Do not include the byte-order character. Network (big-endian) order is always used.

required

Returns:

Type Description
bytes

The results defined by cmd.

Source code in src/msl/equipment/interfaces/vxi11.py
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
def docmd(self, cmd: int, value: float, fmt: str) -> bytes:
    """Allows for a variety of commands to be executed.

    Args:
        cmd: An IEEE 488 command messages. For example, to send the Group Execute Trigger
            command, _GET_, the value of `cmd` is `0x08`.
        value: The value to use with `cmd`. Can be of type [bool][], [int][] or [float][].
        fmt: How to format `value`. See [format-characters][] for more details. Do not
            include the byte-order character. Network (big-endian) order is always used.

    Returns:
        The results defined by `cmd`.
    """
    if self._core_client is None:
        raise MSLConnectionError(self, "not connected to VXI-11 device")

    # always use network (big-endian) byte order
    s = Struct("!" + fmt.lstrip("@=<>!"))
    return self._core_client.device_docmd(
        lid=self._link_id,
        flags=self._init_flag(),
        io_timeout=self._io_timeout_ms,
        lock_timeout=self._lock_timeout_ms,
        cmd=cmd,
        network_order=True,
        size=s.size,
        data=s.pack(value),
    )

enable_sqr ¤

enable_sqr(*, state: bool, handle: bytes) -> None

Enable or disable the sending of device_intr_srq RPCs by the network instrument server.

Parameters:

Name Type Description Default
state bool

Whether to enable or disable interrupts.

required
handle bytes

Host specific data (maximum length is 40 characters).

required
Source code in src/msl/equipment/interfaces/vxi11.py
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
def enable_sqr(self, *, state: bool, handle: bytes) -> None:
    """Enable or disable the sending of `device_intr_srq` RPCs by the network instrument server.

    Args:
        state: Whether to enable or disable interrupts.
        handle: Host specific data (maximum length is 40 characters).
    """
    if self._core_client is None:
        raise MSLConnectionError(self, "not connected to VXI-11 device")

    self._core_client.device_enable_srq(lid=self._link_id, state=state, handle=handle)

local ¤

local() -> None

Place the device in a local state wherein all programmable local controls are enabled.

Source code in src/msl/equipment/interfaces/vxi11.py
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
def local(self) -> None:
    """Place the device in a local state wherein all programmable local controls are enabled."""
    if self._core_client is None:
        raise MSLConnectionError(self, "not connected to VXI-11 device")

    self._core_client.device_local(
        lid=self._link_id,
        flags=self._init_flag(),
        lock_timeout=self._lock_timeout_ms,
        io_timeout=self._io_timeout_ms,
    )

lock ¤

lock() -> None

Acquire the device's lock.

Source code in src/msl/equipment/interfaces/vxi11.py
1108
1109
1110
1111
1112
1113
def lock(self) -> None:
    """Acquire the device's lock."""
    if self._core_client is None:
        raise MSLConnectionError(self, "not connected to VXI-11 device")

    self._core_client.device_lock(lid=self._link_id, flags=self._init_flag(), lock_timeout=self._lock_timeout_ms)

query ¤

query(
    message: bytes | str,
    *,
    delay: float = 0.0,
    decode: Literal[True] = True,
    dtype: None = None,
    fmt: MessageFormat | None = ...,
    size: int | None = ...
) -> str
query(
    message: bytes | str,
    *,
    delay: float = 0.0,
    decode: Literal[False] = False,
    dtype: None = None,
    fmt: MessageFormat | None = ...,
    size: int | None = ...
) -> bytes
query(
    message: bytes | str,
    *,
    delay: float = 0.0,
    decode: bool = ...,
    dtype: MessageDataType = ...,
    fmt: MessageFormat | None = ...,
    size: int | None = ...
) -> NumpyArray1D
query(
    message: bytes | str,
    *,
    delay: float = 0.0,
    decode: bool = True,
    dtype: MessageDataType | None = None,
    fmt: MessageFormat | None = None,
    size: int | None = None
) -> bytes | str | NumpyArray1D

Convenience method for performing a write followed by a read.

Parameters:

Name Type Description Default
message bytes | str

The message to write to the equipment.

required
delay float

Time delay, in seconds, to wait between the write and read operations.

0.0
decode bool

Whether to decode the returned message (i.e., convert the message to a str) or keep the message as bytes. Ignored if dtype is not None.

True
dtype MessageDataType | None

The data type of the elements in the returned message. Can be any object that numpy dtype supports. See from_bytes for more details. For messages that are of scalar type (i.e., a single number) it is more efficient to not specify dtype but to pass the returned message to the int or float class to convert the message to the appropriate numeric type.

None
fmt MessageFormat | None

The format that the returned message data is in. Ignored if dtype is None. See from_bytes for more details.

None
size int | None

The number of bytes to read. Ignored if the value is None.

None

Returns:

Type Description
bytes | str | NumpyArray1D

The message from the equipment. If dtype is specified, then the message is returned as a numpy ndarray, if decode is True then the message is returned as a str, otherwise the message is returned as bytes.

Source code in src/msl/equipment/interfaces/message_based.py
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
def query(  # noqa: PLR0913
    self,
    message: bytes | str,
    *,
    delay: float = 0.0,
    decode: bool = True,
    dtype: MessageDataType | None = None,
    fmt: MessageFormat | None = None,
    size: int | None = None,
) -> bytes | str | NumpyArray1D:
    """Convenience method for performing a [write][msl.equipment.interfaces.message_based.MessageBased.write]
    followed by a [read][msl.equipment.interfaces.message_based.MessageBased.read].

    Args:
        message: The message to write to the equipment.
        delay: Time delay, in seconds, to wait between the _write_ and _read_ operations.
        decode: Whether to decode the returned message (i.e., convert the message to a [str][])
            or keep the message as [bytes][]. Ignored if `dtype` is not `None`.
        dtype: The data type of the elements in the returned message. Can be any object
            that numpy [dtype][numpy.dtype] supports. See [from_bytes][msl.equipment.utils.from_bytes]
            for more details. For messages that are of scalar type (i.e., a single number)
            it is more efficient to not specify `dtype` but to pass the returned message to the
            [int][] or [float][] class to convert the message to the appropriate numeric type.
        fmt: The format that the returned message data is in. Ignored if `dtype` is `None`.
             See [from_bytes][msl.equipment.utils.from_bytes] for more details.
        size: The number of bytes to read. Ignored if the value is `None`.

    Returns:
        The message from the equipment. If `dtype` is specified, then the message is
            returned as a numpy [ndarray][numpy.ndarray], if `decode` is `True` then the message
            is returned as a [str][], otherwise the message is returned as [bytes][].
    """  # noqa: D205
    _ = self.write(message)
    if delay > 0:
        time.sleep(delay)
    if dtype:
        return self.read(dtype=dtype, fmt=fmt, size=size)
    return self.read(decode=decode, size=size)

read ¤

read(
    *,
    decode: Literal[True] = True,
    dtype: None = None,
    fmt: MessageFormat | None = ...,
    size: int | None = ...
) -> str
read(
    *,
    decode: Literal[False] = False,
    dtype: None = None,
    fmt: MessageFormat | None = ...,
    size: int | None = ...
) -> bytes
read(
    *,
    decode: bool = ...,
    dtype: MessageDataType = ...,
    fmt: MessageFormat | None = ...,
    size: int | None = ...
) -> NumpyArray1D
read(
    *,
    decode: bool = True,
    dtype: MessageDataType | None = None,
    fmt: MessageFormat | None = None,
    size: int | None = None
) -> bytes | str | NumpyArray1D

Read a message from the equipment.

This method will block until one of the following conditions is fulfilled:

  1. size bytes have been received — only if size is not None.
  2. the read_termination byte(s) is(are) received — only if read_termination is not None.
  3. a timeout occurs — only if timeout is not None. If a timeout occurs, an MSLTimeoutError is raised.
  4. max_read_size bytes have been received. If the maximum number of bytes have been read, an MSLConnectionError is raised.

Tip

You may also want to set the rstrip value for the class instance.

Parameters:

Name Type Description Default
decode bool

Whether to decode the message (i.e., convert the message to a str) or keep the message as bytes. Ignored if dtype is not None.

True
dtype MessageDataType | None

The data type of the elements in the message. Can be any object that numpy dtype supports. See from_bytes for more details. For messages that are of scalar type (i.e., a single number) it is more efficient to not specify dtype but to pass the returned message to the int or float class to convert the message to the appropriate numeric type.

None
fmt MessageFormat | None

The format that the message data is in. Ignored if dtype is None. See from_bytes for more details.

None
size int | None

The number of bytes to read. Ignored if the value is None.

None

Returns:

Type Description
bytes | str | NumpyArray1D

The message from the equipment. If dtype is specified, then the message is returned as a numpy ndarray, if decode is True then the message is returned as a str, otherwise the message is returned as bytes.

Source code in src/msl/equipment/interfaces/message_based.py
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
def read(
    self,
    *,
    decode: bool = True,
    dtype: MessageDataType | None = None,
    fmt: MessageFormat | None = None,
    size: int | None = None,
) -> bytes | str | NumpyArray1D:
    """Read a message from the equipment.

    This method will block until one of the following conditions is fulfilled:

    1. `size` bytes have been received &mdash; only if `size` is not `None`.
    2. the [read_termination][msl.equipment.interfaces.message_based.MessageBased.read_termination]
       byte(s) is(are) received &mdash; only if
       [read_termination][msl.equipment.interfaces.message_based.MessageBased.read_termination]
       is not `None`.
    3. a timeout occurs &mdash; only if [timeout][msl.equipment.interfaces.message_based.MessageBased.timeout]
       is not `None`. If a timeout occurs, an
       [MSLTimeoutError][msl.equipment.interfaces.message_based.MSLTimeoutError] is raised.
    4. [max_read_size][msl.equipment.interfaces.message_based.MessageBased.max_read_size]
       bytes have been received. If the maximum number of bytes have been read, an
       [MSLConnectionError][msl.equipment.interfaces.message_based.MSLConnectionError] is raised.

    !!! tip
        You may also want to set the [rstrip][msl.equipment.interfaces.message_based.MessageBased.rstrip]
        value for the class instance.

    Args:
        decode: Whether to decode the message (i.e., convert the message to a [str][])
            or keep the message as [bytes][]. Ignored if `dtype` is not `None`.
        dtype: The data type of the elements in the message. Can be any object
            that numpy [dtype][numpy.dtype] supports. See [from_bytes][msl.equipment.utils.from_bytes]
            for more details. For messages that are of scalar type (i.e., a single number)
            it is more efficient to not specify `dtype` but to pass the returned message to the
            [int][] or [float][] class to convert the message to the appropriate numeric type.
        fmt: The format that the message data is in. Ignored if `dtype` is `None`.
             See [from_bytes][msl.equipment.utils.from_bytes] for more details.
        size: The number of bytes to read. Ignored if the value is `None`.

    Returns:
        The message from the equipment. If `dtype` is specified, then the message is returned
            as a numpy [ndarray][numpy.ndarray], if `decode` is `True` then the message
            is returned as a [str][], otherwise the message is returned as [bytes][].
    """
    if size is not None and size > self._max_read_size:
        msg = f"max_read_size is {self._max_read_size} bytes, requesting {size} bytes"
        raise MSLConnectionError(self, msg)

    try:
        message = self._read(size)
    except (serial.SerialTimeoutException, socket.timeout, TimeoutError):
        raise MSLTimeoutError(self) from None
    except Exception as e:  # noqa: BLE001
        msg = f"{e.__class__.__name__}: {e}"
        raise MSLConnectionError(self, msg) from None

    if size is None:
        if dtype:
            logger.debug("%s.read(dtype=%r, fmt=%r) -> %r", self, dtype, fmt, message)
        else:
            logger.debug("%s.read() -> %r", self, message)
    else:
        if len(message) != size:
            msg = f"received {len(message)} bytes, requested {size} bytes"
            raise MSLConnectionError(self, msg)
        logger.debug("%s.read(size=%s) -> %r", self, size, message)

    if self._rstrip:
        message = message.rstrip()

    if dtype:
        return from_bytes(message, fmt=fmt, dtype=dtype)

    if decode:
        return message.decode(encoding=self._encoding)

    return message

read_stb ¤

read_stb() -> int

Read the status byte from the device.

Returns:

Type Description
int

The status byte.

Source code in src/msl/equipment/interfaces/vxi11.py
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
def read_stb(self) -> int:
    """Read the status byte from the device.

    Returns:
        The status byte.
    """
    if self._core_client is None:
        raise MSLConnectionError(self, "not connected to VXI-11 device")

    return self._core_client.device_readstb(
        lid=self._link_id,
        flags=self._init_flag(),
        lock_timeout=self._lock_timeout_ms,
        io_timeout=self._io_timeout_ms,
    )

reconnect ¤

reconnect(max_attempts: int = 1) -> None

Reconnect to the equipment.

Parameters:

Name Type Description Default
max_attempts int

The maximum number of attempts to try to reconnect with the equipment. If <1, keep trying until a connection is successful. If the maximum number of attempts has been reached then an exception is raise.

1
Source code in src/msl/equipment/interfaces/vxi11.py
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
def reconnect(self, max_attempts: int = 1) -> None:
    """Reconnect to the equipment.

    Args:
        max_attempts: The maximum number of attempts to try to reconnect with the equipment.
            If &lt;1, keep trying until a connection is successful. If the maximum number
            of attempts has been reached then an exception is raise.
    """
    attempt = 0
    while True:
        attempt += 1
        try:
            return self._connect()
        except (MSLConnectionError, MSLTimeoutError):
            if 0 < max_attempts <= attempt:
                raise

remote ¤

remote() -> None

Place the device in a remote state wherein all programmable local controls are disabled.

Source code in src/msl/equipment/interfaces/vxi11.py
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
def remote(self) -> None:
    """Place the device in a remote state wherein all programmable local controls are disabled."""
    if self._core_client is None:
        raise MSLConnectionError(self, "not connected to VXI-11 device")

    self._core_client.device_remote(
        lid=self._link_id,
        flags=self._init_flag(),
        lock_timeout=self._lock_timeout_ms,
        io_timeout=self._io_timeout_ms,
    )

trigger ¤

trigger() -> None

Send a trigger to the device.

Source code in src/msl/equipment/interfaces/vxi11.py
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
def trigger(self) -> None:
    """Send a trigger to the device."""
    if self._core_client is None:
        raise MSLConnectionError(self, "not connected to VXI-11 device")

    self._core_client.device_trigger(
        lid=self._link_id,
        flags=self._init_flag(),
        lock_timeout=self._lock_timeout_ms,
        io_timeout=self._io_timeout_ms,
    )

unlock ¤

unlock() -> None

Release the lock acquired by lock.

Source code in src/msl/equipment/interfaces/vxi11.py
1198
1199
1200
1201
1202
1203
def unlock(self) -> None:
    """Release the lock acquired by [lock][msl.equipment.interfaces.vxi11.VXI11.lock]."""
    if self._core_client is None:
        raise MSLConnectionError(self, "not connected to VXI-11 device")

    self._core_client.device_unlock(self._link_id)

write ¤

write(
    message: bytes | str,
    *,
    data: Sequence1D | None = None,
    dtype: MessageDataType = "<f",
    fmt: MessageFormat | None = "ieee"
) -> int

Write a message to the equipment.

Parameters:

Name Type Description Default
message bytes | str

The message to write to the equipment.

required
data Sequence1D | None

The data to append to message. See to_bytes for more details.

None
dtype MessageDataType

The data type to use to convert each element in data to bytes. Ignored if data is None. See to_bytes for more details.

'<f'
fmt MessageFormat | None

The format to use to convert data to bytes. Ignored if data is None. See to_bytes for more details.

'ieee'

Returns:

Type Description
int

The number of bytes written.

Source code in src/msl/equipment/interfaces/message_based.py
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
def write(
    self,
    message: bytes | str,
    *,
    data: Sequence1D | None = None,
    dtype: MessageDataType = "<f",
    fmt: MessageFormat | None = "ieee",
) -> int:
    """Write a message to the equipment.

    Args:
        message: The message to write to the equipment.
        data: The data to append to `message`. See [to_bytes][msl.equipment.utils.to_bytes]
            for more details.
        dtype: The data type to use to convert each element in `data` to bytes. Ignored
            if `data` is `None`. See [to_bytes][msl.equipment.utils.to_bytes] for more details.
        fmt: The format to use to convert `data` to bytes. Ignored if `data` is `None`.
            See [to_bytes][msl.equipment.utils.to_bytes] for more details.

    Returns:
        The number of bytes written.
    """
    if not isinstance(message, bytes):
        message = message.encode(encoding=self._encoding)

    if data is not None:
        message += to_bytes(data, fmt=fmt, dtype=dtype)

    if self._write_termination and not message.endswith(self._write_termination):
        message += self._write_termination

    logger.debug("%s.write(%r)", self, message)

    try:
        return self._write(message)
    except (serial.SerialTimeoutException, socket.timeout, TimeoutError):
        raise MSLTimeoutError(self) from None
    except Exception as e:  # noqa: BLE001
        raise MSLConnectionError(self, str(e)) from None

ZeroMQ ¤

ZeroMQ(equipment: Equipment)

Bases: MessageBased

Base class for equipment that use the ZeroMQ communication protocol.

Parameters:

Name Type Description Default
equipment Equipment

An Equipment instance.

required

A Connection instance supports the following properties for the ZeroMQ communication protocol, as well as the properties defined in MessageBased. The ZeroMQ protocol does not use termination characters, so if termination characters are specified the value is ignored and is set to None.

Connection Properties:

Name Type Description
protocol str

ZeroMQ protocol (tcp, udp, pgm, inproc, ipc) Default: tcp

socket_type int | str

ZeroMQ socket type. Default: REQ

Source code in src/msl/equipment/interfaces/zeromq.py
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
def __init__(self, equipment: Equipment) -> None:
    """Base class for equipment that use the [ZeroMQ](https://zeromq.org/) communication protocol.

    Args:
        equipment: An [Equipment][] instance.

    A [Connection][msl.equipment.schema.Connection] instance supports the following _properties_
    for the [ZeroMQ](https://zeromq.org/) communication protocol, as well as the _properties_
    defined in [MessageBased][msl.equipment.interfaces.message_based.MessageBased].
    The [ZeroMQ](https://zeromq.org/) protocol does not use termination characters, so if
    termination characters are specified the value is ignored and is set to `None`.

    Attributes: Connection Properties:
        protocol (str): ZeroMQ protocol (`tcp`, `udp`, `pgm`, `inproc`, `ipc`) _Default: `tcp`_
        socket_type (int | str): ZeroMQ [socket type][zmq.SocketType]. _Default: `REQ`_
    """
    super().__init__(equipment)

    assert equipment.connection is not None  # noqa: S101

    address = parse_zmq_address(equipment.connection.address)
    if address is None:
        msg = f"Invalid ZeroMQ address {equipment.connection.address!r}"
        raise ValueError(msg)

    p = equipment.connection.properties
    socket_type = to_enum(p.get("socket_type", "REQ"), SocketType, to_upper=True)
    protocol: str = p.get("protocol", "tcp")

    # ZeroMQ does not use termination characters
    self.read_termination = None  # pyright: ignore[reportUnannotatedClassAttribute]
    self.write_termination = None  # pyright: ignore[reportUnannotatedClassAttribute]

    self._context: Context[SyncSocket] = zmq.Context()
    self._socket: SyncSocket = self._context.socket(socket_type)
    self._set_interface_timeout()
    self._set_interface_max_read_size()

    # Calling zmq.Socket.connect() does not verify that the host:port value until the
    # socket is used to write/read bytes. An error raised here would be for an an invalid
    # ZeroMQ addr value
    try:
        _ = self._socket.connect(f"{protocol}://{address.host}:{address.port}")
    except zmq.ZMQError as e:
        msg = f"{e.__class__.__name__}: {e}"
        raise MSLConnectionError(self, msg) from None

encoding property writable ¤

encoding: str

The encoding that is used for read and write operations.

equipment property ¤

equipment: Equipment

The Equipment associated with the interface.

max_read_size property writable ¤

max_read_size: int

The maximum number of bytes that can be read.

rstrip property writable ¤

rstrip: bool

Whether to remove trailing whitespace from read messages.

socket property ¤

socket: SyncSocket

Returns a reference to the underlying socket.

timeout property writable ¤

timeout: float | None

The timeout, in seconds, for read and write operations.

A value <0 will set the timeout to be None (blocking mode).

disconnect ¤

disconnect() -> None

Close the socket connection and terminate the context.

Source code in src/msl/equipment/interfaces/zeromq.py
100
101
102
103
104
105
106
107
def disconnect(self) -> None:  # pyright: ignore[reportImplicitOverride]
    """Close the socket connection and terminate the context."""
    if hasattr(self, "_socket") and not self._socket.closed:
        self._context.set(zmq.BLOCKY, 0)
        self._socket.setsockopt(zmq.LINGER, 0)
        self._socket.close()
        self._context.term()
        super().disconnect()

query ¤

query(
    message: bytes | str,
    *,
    delay: float = 0.0,
    decode: Literal[True] = True,
    dtype: None = None,
    fmt: MessageFormat | None = ...,
    size: int | None = ...
) -> str
query(
    message: bytes | str,
    *,
    delay: float = 0.0,
    decode: Literal[False] = False,
    dtype: None = None,
    fmt: MessageFormat | None = ...,
    size: int | None = ...
) -> bytes
query(
    message: bytes | str,
    *,
    delay: float = 0.0,
    decode: bool = ...,
    dtype: MessageDataType = ...,
    fmt: MessageFormat | None = ...,
    size: int | None = ...
) -> NumpyArray1D
query(
    message: bytes | str,
    *,
    delay: float = 0.0,
    decode: bool = True,
    dtype: MessageDataType | None = None,
    fmt: MessageFormat | None = None,
    size: int | None = None
) -> bytes | str | NumpyArray1D

Convenience method for performing a write followed by a read.

Parameters:

Name Type Description Default
message bytes | str

The message to write to the equipment.

required
delay float

Time delay, in seconds, to wait between the write and read operations.

0.0
decode bool

Whether to decode the returned message (i.e., convert the message to a str) or keep the message as bytes. Ignored if dtype is not None.

True
dtype MessageDataType | None

The data type of the elements in the returned message. Can be any object that numpy dtype supports. See from_bytes for more details. For messages that are of scalar type (i.e., a single number) it is more efficient to not specify dtype but to pass the returned message to the int or float class to convert the message to the appropriate numeric type.

None
fmt MessageFormat | None

The format that the returned message data is in. Ignored if dtype is None. See from_bytes for more details.

None
size int | None

The number of bytes to read. Ignored if the value is None.

None

Returns:

Type Description
bytes | str | NumpyArray1D

The message from the equipment. If dtype is specified, then the message is returned as a numpy ndarray, if decode is True then the message is returned as a str, otherwise the message is returned as bytes.

Source code in src/msl/equipment/interfaces/message_based.py
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
def query(  # noqa: PLR0913
    self,
    message: bytes | str,
    *,
    delay: float = 0.0,
    decode: bool = True,
    dtype: MessageDataType | None = None,
    fmt: MessageFormat | None = None,
    size: int | None = None,
) -> bytes | str | NumpyArray1D:
    """Convenience method for performing a [write][msl.equipment.interfaces.message_based.MessageBased.write]
    followed by a [read][msl.equipment.interfaces.message_based.MessageBased.read].

    Args:
        message: The message to write to the equipment.
        delay: Time delay, in seconds, to wait between the _write_ and _read_ operations.
        decode: Whether to decode the returned message (i.e., convert the message to a [str][])
            or keep the message as [bytes][]. Ignored if `dtype` is not `None`.
        dtype: The data type of the elements in the returned message. Can be any object
            that numpy [dtype][numpy.dtype] supports. See [from_bytes][msl.equipment.utils.from_bytes]
            for more details. For messages that are of scalar type (i.e., a single number)
            it is more efficient to not specify `dtype` but to pass the returned message to the
            [int][] or [float][] class to convert the message to the appropriate numeric type.
        fmt: The format that the returned message data is in. Ignored if `dtype` is `None`.
             See [from_bytes][msl.equipment.utils.from_bytes] for more details.
        size: The number of bytes to read. Ignored if the value is `None`.

    Returns:
        The message from the equipment. If `dtype` is specified, then the message is
            returned as a numpy [ndarray][numpy.ndarray], if `decode` is `True` then the message
            is returned as a [str][], otherwise the message is returned as [bytes][].
    """  # noqa: D205
    _ = self.write(message)
    if delay > 0:
        time.sleep(delay)
    if dtype:
        return self.read(dtype=dtype, fmt=fmt, size=size)
    return self.read(decode=decode, size=size)

read ¤

read(
    *,
    decode: Literal[True] = True,
    dtype: None = None,
    fmt: MessageFormat | None = ...,
    size: int | None = ...
) -> str
read(
    *,
    decode: Literal[False] = False,
    dtype: None = None,
    fmt: MessageFormat | None = ...,
    size: int | None = ...
) -> bytes
read(
    *,
    decode: bool = ...,
    dtype: MessageDataType = ...,
    fmt: MessageFormat | None = ...,
    size: int | None = ...
) -> NumpyArray1D
read(
    *,
    decode: bool = True,
    dtype: MessageDataType | None = None,
    fmt: MessageFormat | None = None,
    size: int | None = None
) -> bytes | str | NumpyArray1D

Read a message from the equipment.

This method will block until one of the following conditions is fulfilled:

  1. size bytes have been received — only if size is not None.
  2. the read_termination byte(s) is(are) received — only if read_termination is not None.
  3. a timeout occurs — only if timeout is not None. If a timeout occurs, an MSLTimeoutError is raised.
  4. max_read_size bytes have been received. If the maximum number of bytes have been read, an MSLConnectionError is raised.

Tip

You may also want to set the rstrip value for the class instance.

Parameters:

Name Type Description Default
decode bool

Whether to decode the message (i.e., convert the message to a str) or keep the message as bytes. Ignored if dtype is not None.

True
dtype MessageDataType | None

The data type of the elements in the message. Can be any object that numpy dtype supports. See from_bytes for more details. For messages that are of scalar type (i.e., a single number) it is more efficient to not specify dtype but to pass the returned message to the int or float class to convert the message to the appropriate numeric type.

None
fmt MessageFormat | None

The format that the message data is in. Ignored if dtype is None. See from_bytes for more details.

None
size int | None

The number of bytes to read. Ignored if the value is None.

None

Returns:

Type Description
bytes | str | NumpyArray1D

The message from the equipment. If dtype is specified, then the message is returned as a numpy ndarray, if decode is True then the message is returned as a str, otherwise the message is returned as bytes.

Source code in src/msl/equipment/interfaces/message_based.py
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
def read(
    self,
    *,
    decode: bool = True,
    dtype: MessageDataType | None = None,
    fmt: MessageFormat | None = None,
    size: int | None = None,
) -> bytes | str | NumpyArray1D:
    """Read a message from the equipment.

    This method will block until one of the following conditions is fulfilled:

    1. `size` bytes have been received &mdash; only if `size` is not `None`.
    2. the [read_termination][msl.equipment.interfaces.message_based.MessageBased.read_termination]
       byte(s) is(are) received &mdash; only if
       [read_termination][msl.equipment.interfaces.message_based.MessageBased.read_termination]
       is not `None`.
    3. a timeout occurs &mdash; only if [timeout][msl.equipment.interfaces.message_based.MessageBased.timeout]
       is not `None`. If a timeout occurs, an
       [MSLTimeoutError][msl.equipment.interfaces.message_based.MSLTimeoutError] is raised.
    4. [max_read_size][msl.equipment.interfaces.message_based.MessageBased.max_read_size]
       bytes have been received. If the maximum number of bytes have been read, an
       [MSLConnectionError][msl.equipment.interfaces.message_based.MSLConnectionError] is raised.

    !!! tip
        You may also want to set the [rstrip][msl.equipment.interfaces.message_based.MessageBased.rstrip]
        value for the class instance.

    Args:
        decode: Whether to decode the message (i.e., convert the message to a [str][])
            or keep the message as [bytes][]. Ignored if `dtype` is not `None`.
        dtype: The data type of the elements in the message. Can be any object
            that numpy [dtype][numpy.dtype] supports. See [from_bytes][msl.equipment.utils.from_bytes]
            for more details. For messages that are of scalar type (i.e., a single number)
            it is more efficient to not specify `dtype` but to pass the returned message to the
            [int][] or [float][] class to convert the message to the appropriate numeric type.
        fmt: The format that the message data is in. Ignored if `dtype` is `None`.
             See [from_bytes][msl.equipment.utils.from_bytes] for more details.
        size: The number of bytes to read. Ignored if the value is `None`.

    Returns:
        The message from the equipment. If `dtype` is specified, then the message is returned
            as a numpy [ndarray][numpy.ndarray], if `decode` is `True` then the message
            is returned as a [str][], otherwise the message is returned as [bytes][].
    """
    if size is not None and size > self._max_read_size:
        msg = f"max_read_size is {self._max_read_size} bytes, requesting {size} bytes"
        raise MSLConnectionError(self, msg)

    try:
        message = self._read(size)
    except (serial.SerialTimeoutException, socket.timeout, TimeoutError):
        raise MSLTimeoutError(self) from None
    except Exception as e:  # noqa: BLE001
        msg = f"{e.__class__.__name__}: {e}"
        raise MSLConnectionError(self, msg) from None

    if size is None:
        if dtype:
            logger.debug("%s.read(dtype=%r, fmt=%r) -> %r", self, dtype, fmt, message)
        else:
            logger.debug("%s.read() -> %r", self, message)
    else:
        if len(message) != size:
            msg = f"received {len(message)} bytes, requested {size} bytes"
            raise MSLConnectionError(self, msg)
        logger.debug("%s.read(size=%s) -> %r", self, size, message)

    if self._rstrip:
        message = message.rstrip()

    if dtype:
        return from_bytes(message, fmt=fmt, dtype=dtype)

    if decode:
        return message.decode(encoding=self._encoding)

    return message

write ¤

write(
    message: bytes | str,
    *,
    data: Sequence1D | None = None,
    dtype: MessageDataType = "<f",
    fmt: MessageFormat | None = "ieee"
) -> int

Write a message to the equipment.

Parameters:

Name Type Description Default
message bytes | str

The message to write to the equipment.

required
data Sequence1D | None

The data to append to message. See to_bytes for more details.

None
dtype MessageDataType

The data type to use to convert each element in data to bytes. Ignored if data is None. See to_bytes for more details.

'<f'
fmt MessageFormat | None

The format to use to convert data to bytes. Ignored if data is None. See to_bytes for more details.

'ieee'

Returns:

Type Description
int

The number of bytes written.

Source code in src/msl/equipment/interfaces/message_based.py
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
def write(
    self,
    message: bytes | str,
    *,
    data: Sequence1D | None = None,
    dtype: MessageDataType = "<f",
    fmt: MessageFormat | None = "ieee",
) -> int:
    """Write a message to the equipment.

    Args:
        message: The message to write to the equipment.
        data: The data to append to `message`. See [to_bytes][msl.equipment.utils.to_bytes]
            for more details.
        dtype: The data type to use to convert each element in `data` to bytes. Ignored
            if `data` is `None`. See [to_bytes][msl.equipment.utils.to_bytes] for more details.
        fmt: The format to use to convert `data` to bytes. Ignored if `data` is `None`.
            See [to_bytes][msl.equipment.utils.to_bytes] for more details.

    Returns:
        The number of bytes written.
    """
    if not isinstance(message, bytes):
        message = message.encode(encoding=self._encoding)

    if data is not None:
        message += to_bytes(data, fmt=fmt, dtype=dtype)

    if self._write_termination and not message.endswith(self._write_termination):
        message += self._write_termination

    logger.debug("%s.write(%r)", self, message)

    try:
        return self._write(message)
    except (serial.SerialTimeoutException, socket.timeout, TimeoutError):
        raise MSLTimeoutError(self) from None
    except Exception as e:  # noqa: BLE001
        raise MSLConnectionError(self, str(e)) from None