Skip to content

VXI11¤

VXI11 (MessageBased) ¤

VXI11(equipment: Equipment)

Base class for the VXI-11 communication protocol.

Parameters:

Name Type Description Default
equipment Equipment

An Equipment instance.

required

A Connection instance supports the following properties for the VXI-11 communication protocol, as well as the properties defined in MessageBased.

Connection Properties:

Name Type Description
buffer_size int

The maximum number of bytes to read at a time. Default: 4096

lock_timeout float

The timeout (in seconds) to wait for a lock (0 means wait forever). Default: 0

port int

The port to use instead of calling the RPC Port Mapper function.

Source code in src/msl/equipment/interfaces/vxi11.py
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
def __init__(self, equipment: Equipment) -> None:
    """Base class for the [VXI-11](http://www.vxibus.org/specifications.html) communication protocol.

    Args:
        equipment: An [Equipment][] instance.

    A [Connection][msl.equipment.schema.Connection] instance supports the following _properties_
    for the [VXI-11](http://www.vxibus.org/specifications.html) communication protocol, as well
    as the _properties_ defined in [MessageBased][msl.equipment.interfaces.message_based.MessageBased].

    Attributes: Connection Properties:
        buffer_size (int): The maximum number of bytes to read at a time. _Default: `4096`_
        lock_timeout (float): The timeout (in seconds) to wait for a lock (0 means wait forever). _Default: `0`_
        port (int): The port to use instead of calling the RPC Port Mapper function.
    """
    # the following must be defined before calling super()
    self._core_client: CoreClient | None = None
    self._abort_client: AsyncClient | None = None
    self._lock_timeout: float = 0  # updated in lock_timeout.setter
    super().__init__(equipment)

    assert equipment.connection is not None  # noqa: S101
    info = parse_vxi_address(equipment.connection.address)
    if info is None:
        msg = f"Invalid VXI-11 address {equipment.connection.address!r}"
        raise ValueError(msg)

    self._info: ParsedVXI11Address = info

    props = equipment.connection.properties
    self._buffer_size: int = props.get("buffer_size", 4096)
    self._core_port: int = props.get("port", -1)  # updated in _connect if -1
    self._abort_port: int = -1  # updated in _connect
    self._max_recv_size: int = -1  # updated in _connect
    self._link_id: int = -1  # updated in _connect
    self._io_timeout_ms: int = -1  # updated in _set_interface_timeout
    self._lock_timeout_ms: int = -1  # updated in lock_timeout.setter
    self.lock_timeout = props.get("lock_timeout", 0)

    # A non-empty read_termination value is applied by default in
    # MessageBased if the user did not specify one. Set it back
    # to None if a read-termination character was not explicitly specified.
    if "read_termination" not in props and "termination" not in props:
        self.read_termination = None  # pyright: ignore[reportUnannotatedClassAttribute]

    # VXI-11 does not support write-termination characters
    self.write_termination = None  # pyright: ignore[reportUnannotatedClassAttribute]

    self._connect()
    self._set_interface_timeout()

encoding property writable ¤

encoding: str

The encoding that is used for read and write operations.

equipment property ¤

equipment: Equipment

The Equipment associated with the interface.

lock_timeout property writable ¤

lock_timeout: float

The time, in seconds, to wait to acquire a lock.

Setting the value to ≤0 (or None) means wait forever.

max_read_size property writable ¤

max_read_size: int

The maximum number of bytes that can be read.

rstrip property writable ¤

rstrip: bool

Whether to remove trailing whitespace from read messages.

socket property ¤

socket: socket | None

Returns the reference to the underlying socket.

timeout property writable ¤

timeout: float | None

The timeout, in seconds, for read and write operations.

A value <0 will set the timeout to be None (blocking mode).

abort ¤

abort() -> None

Stop an in-progress request.

Source code in src/msl/equipment/interfaces/vxi11.py
985
986
987
988
989
990
def abort(self) -> None:
    """Stop an in-progress request."""
    if self._abort_client is None:
        self._abort_client = AsyncClient(self._info.host)
        self._abort_client.connect(self._abort_port, timeout=self.timeout)
    self._abort_client.device_abort(self._link_id)

clear ¤

clear() -> None

Send the clear command to the device.

Source code in src/msl/equipment/interfaces/vxi11.py
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
def clear(self) -> None:
    """Send the `clear` command to the device."""
    if self._core_client is None:
        raise MSLConnectionError(self, "not connected to VXI-11 device")

    self._core_client.device_clear(
        lid=self._link_id,
        flags=self._init_flag(),
        lock_timeout=self._lock_timeout_ms,
        io_timeout=self._io_timeout_ms,
    )

create_intr_chan ¤

create_intr_chan(
    host_addr: int,
    host_port: int,
    prog_num: int,
    prog_vers: int,
    prog_family: int,
) -> None

Inform the network instrument server to establish an interrupt channel.

Parameters:

Name Type Description Default
host_addr int

Address of the host servicing the interrupt.

required
host_port int

Valid port number on the client.

required
prog_num int

Program number.

required
prog_vers int

Program version number.

required
prog_family int

The underlying socket protocol family type (IPPROTO_TCP or IPPROTO_UDP).

required
Source code in src/msl/equipment/interfaces/vxi11.py
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
def create_intr_chan(self, host_addr: int, host_port: int, prog_num: int, prog_vers: int, prog_family: int) -> None:
    """Inform the network instrument server to establish an interrupt channel.

    Args:
        host_addr: Address of the host servicing the interrupt.
        host_port: Valid port number on the client.
        prog_num: Program number.
        prog_vers: Program version number.
        prog_family: The underlying socket protocol family type (`IPPROTO_TCP` or `IPPROTO_UDP`).
    """
    if self._core_client is None:
        raise MSLConnectionError(self, "not connected to VXI-11 device")

    self._core_client.create_intr_chan(
        host_addr=host_addr, host_port=host_port, prog_num=prog_num, prog_vers=prog_vers, prog_family=prog_family
    )

destroy_intr_chan ¤

destroy_intr_chan() -> None

Inform the network instrument server to close its interrupt channel.

Source code in src/msl/equipment/interfaces/vxi11.py
1021
1022
1023
1024
1025
1026
def destroy_intr_chan(self) -> None:
    """Inform the network instrument server to close its interrupt channel."""
    if self._core_client is None:
        raise MSLConnectionError(self, "not connected to VXI-11 device")

    self._core_client.destroy_intr_chan()
destroy_link() -> None

Destroy the link with the device.

Source code in src/msl/equipment/interfaces/vxi11.py
1028
1029
1030
1031
1032
1033
def destroy_link(self) -> None:
    """Destroy the link with the device."""
    if self._core_client is None:
        raise MSLConnectionError(self, "not connected to VXI-11 device")

    self._core_client.destroy_link(self._link_id)

disconnect ¤

disconnect() -> None

Unlink and close the sockets.

Source code in src/msl/equipment/interfaces/vxi11.py
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
def disconnect(self) -> None:  # pyright: ignore[reportImplicitOverride]
    """Unlink and close the sockets."""
    if self._abort_client is None and self._core_client is None:
        return

    if self._abort_client is not None:
        self._abort_client.close()
        self._abort_client = None

    if self._core_client is not None:
        if self._link_id != -1:
            with contextlib.suppress(ConnectionError):
                self._core_client.destroy_link(self._link_id)
            self._link_id = -1

        self._core_client.close()
        self._core_client = None

    super().disconnect()

docmd ¤

docmd(cmd: int, value: float, fmt: str) -> bytes

Allows for a variety of commands to be executed.

Parameters:

Name Type Description Default
cmd int

An IEEE 488 command messages. For example, to send the Group Execute Trigger command, GET, the value of cmd is 0x08.

required
value float

The value to use with cmd. Can be of type bool, int or float.

required
fmt str

How to format value. See format-characters for more details. Do not include the byte-order character. Network (big-endian) order is always used.

required

Returns:

Type Description
bytes

The results defined by cmd.

Source code in src/msl/equipment/interfaces/vxi11.py
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
def docmd(self, cmd: int, value: float, fmt: str) -> bytes:
    """Allows for a variety of commands to be executed.

    Args:
        cmd: An IEEE 488 command messages. For example, to send the Group Execute Trigger
            command, _GET_, the value of `cmd` is `0x08`.
        value: The value to use with `cmd`. Can be of type [bool][], [int][] or [float][].
        fmt: How to format `value`. See [format-characters][] for more details. Do not
            include the byte-order character. Network (big-endian) order is always used.

    Returns:
        The results defined by `cmd`.
    """
    if self._core_client is None:
        raise MSLConnectionError(self, "not connected to VXI-11 device")

    # always use network (big-endian) byte order
    s = Struct("!" + fmt.lstrip("@=<>!"))
    return self._core_client.device_docmd(
        lid=self._link_id,
        flags=self._init_flag(),
        io_timeout=self._io_timeout_ms,
        lock_timeout=self._lock_timeout_ms,
        cmd=cmd,
        network_order=True,
        size=s.size,
        data=s.pack(value),
    )

enable_sqr ¤

enable_sqr(*, state: bool, handle: bytes) -> None

Enable or disable the sending of device_intr_srq RPCs by the network instrument server.

Parameters:

Name Type Description Default
state bool

Whether to enable or disable interrupts.

required
handle bytes

Host specific data (maximum length is 40 characters).

required
Source code in src/msl/equipment/interfaces/vxi11.py
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
def enable_sqr(self, *, state: bool, handle: bytes) -> None:
    """Enable or disable the sending of `device_intr_srq` RPCs by the network instrument server.

    Args:
        state: Whether to enable or disable interrupts.
        handle: Host specific data (maximum length is 40 characters).
    """
    if self._core_client is None:
        raise MSLConnectionError(self, "not connected to VXI-11 device")

    self._core_client.device_enable_srq(lid=self._link_id, state=state, handle=handle)

local ¤

local() -> None

Place the device in a local state wherein all programmable local controls are enabled.

Source code in src/msl/equipment/interfaces/vxi11.py
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
def local(self) -> None:
    """Place the device in a local state wherein all programmable local controls are enabled."""
    if self._core_client is None:
        raise MSLConnectionError(self, "not connected to VXI-11 device")

    self._core_client.device_local(
        lid=self._link_id,
        flags=self._init_flag(),
        lock_timeout=self._lock_timeout_ms,
        io_timeout=self._io_timeout_ms,
    )

lock ¤

lock() -> None

Acquire the device's lock.

Source code in src/msl/equipment/interfaces/vxi11.py
1108
1109
1110
1111
1112
1113
def lock(self) -> None:
    """Acquire the device's lock."""
    if self._core_client is None:
        raise MSLConnectionError(self, "not connected to VXI-11 device")

    self._core_client.device_lock(lid=self._link_id, flags=self._init_flag(), lock_timeout=self._lock_timeout_ms)

query ¤

query(
    message: bytes | str,
    *,
    delay: float = 0.0,
    decode: Literal[True] = True,
    dtype: None = None,
    fmt: MessageFormat = ...,
    size: int | None = ...
) -> str
query(
    message: bytes | str,
    *,
    delay: float = 0.0,
    decode: Literal[False] = False,
    dtype: None = None,
    fmt: MessageFormat = ...,
    size: int | None = ...
) -> bytes
query(
    message: bytes | str,
    *,
    delay: float = 0.0,
    decode: bool = ...,
    dtype: MessageDataType = ...,
    fmt: MessageFormat = ...,
    size: int | None = ...
) -> NumpyArray1D
query(
    message: bytes | str,
    *,
    delay: float = 0.0,
    decode: bool = True,
    dtype: MessageDataType | None = None,
    fmt: MessageFormat = None,
    size: int | None = None
) -> bytes | str | NumpyArray1D

Convenience method for performing a write followed by a read.

Parameters:

Name Type Description Default
message bytes | str

The message to write to the equipment.

required
delay float

Time delay, in seconds, to wait between the write and read operations.

0.0
decode bool

Whether to decode the returned message (i.e., convert the message to a str) or keep the message as bytes. Ignored if dtype is not None.

True
dtype MessageDataType | None

The data type of the elements in the returned message. Can be any object that numpy dtype supports. For messages that are of scalar type (i.e., a single number) it is more efficient to not specify dtype but to pass the returned message to the int or float class to convert the message to the appropriate numeric type. See MessageDataType for more details.

None
fmt MessageFormat

The format that the returned message data is in. Ignored if dtype is None. See MessageFormat for more details.

None
size int | None

The number of bytes to read. Ignored if the value is None.

None

Returns:

Type Description
bytes | str | NumpyArray1D

The message from the equipment. If dtype is specified, then the message is returned as a numpy ndarray, if decode is True then the message is returned as a str, otherwise the message is returned as bytes.

Source code in src/msl/equipment/interfaces/message_based.py
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
def query(
    self,
    message: bytes | str,
    *,
    delay: float = 0.0,
    decode: bool = True,
    dtype: MessageDataType | None = None,
    fmt: MessageFormat = None,
    size: int | None = None,
) -> bytes | str | NumpyArray1D:
    """Convenience method for performing a [write][msl.equipment.interfaces.message_based.MessageBased.write]
    followed by a [read][msl.equipment.interfaces.message_based.MessageBased.read].

    Args:
        message: The message to write to the equipment.
        delay: Time delay, in seconds, to wait between the _write_ and _read_ operations.
        decode: Whether to decode the returned message (i.e., convert the message to a [str][])
            or keep the message as [bytes][]. Ignored if `dtype` is not `None`.
        dtype: The data type of the elements in the returned message. Can be any object that numpy
            [dtype][numpy.dtype] supports. For messages that are of scalar type (i.e., a single number)
            it is more efficient to not specify `dtype` but to pass the returned message to the
            [int][] or [float][] class to convert the message to the appropriate numeric type.
            See [MessageDataType][msl.equipment._types.MessageDataType] for more details.
        fmt: The format that the returned message data is in. Ignored if `dtype` is `None`.
            See [MessageFormat][msl.equipment._types.MessageFormat] for more details.
        size: The number of bytes to read. Ignored if the value is `None`.

    Returns:
        The message from the equipment. If `dtype` is specified, then the message is
            returned as a numpy [ndarray][numpy.ndarray], if `decode` is `True` then the message
            is returned as a [str][], otherwise the message is returned as [bytes][].
    """  # noqa: D205
    _ = self.write(message)
    if delay > 0:
        time.sleep(delay)
    if dtype:
        return self.read(dtype=dtype, fmt=fmt, size=size)
    return self.read(decode=decode, size=size)

read ¤

read(
    *,
    decode: Literal[True] = True,
    dtype: None = None,
    fmt: MessageFormat = ...,
    size: int | None = ...
) -> str
read(
    *,
    decode: Literal[False] = False,
    dtype: None = None,
    fmt: MessageFormat = ...,
    size: int | None = ...
) -> bytes
read(
    *,
    decode: bool = ...,
    dtype: MessageDataType = ...,
    fmt: MessageFormat = ...,
    size: int | None = ...
) -> NumpyArray1D
read(
    *,
    decode: bool = True,
    dtype: MessageDataType | None = None,
    fmt: MessageFormat = None,
    size: int | None = None
) -> bytes | str | NumpyArray1D

Read a message from the equipment.

This method will block until one of the following conditions is fulfilled:

  1. size bytes have been received — only if size is not None.
  2. the read_termination byte(s) is(are) received — only if read_termination is not None.
  3. a timeout occurs — only if timeout is not None. If a timeout occurs, an MSLTimeoutError is raised.
  4. max_read_size bytes have been received. If the maximum number of bytes have been read, an MSLConnectionError is raised.

Tip

You may also want to set the rstrip value for the class instance.

Parameters:

Name Type Description Default
decode bool

Whether to decode the message (i.e., convert the message to a str) or keep the message as bytes. Ignored if dtype is not None.

True
dtype MessageDataType | None

The data type of the elements in the returned message. Can be any object that numpy dtype supports. For messages that are of scalar type (i.e., a single number) it is more efficient to not specify dtype but to pass the returned message to the int or float class to convert the message to the appropriate numeric type. See MessageDataType for more details.

None
fmt MessageFormat

The format that the returned message data is in. Ignored if dtype is None. See MessageFormat for more details.

None
size int | None

The number of bytes to read. Ignored if the value is None.

None

Returns:

Type Description
bytes | str | NumpyArray1D

The message from the equipment. If dtype is specified, then the message is returned as a numpy ndarray, if decode is True then the message is returned as a str, otherwise the message is returned as bytes.

Source code in src/msl/equipment/interfaces/message_based.py
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
def read(
    self,
    *,
    decode: bool = True,
    dtype: MessageDataType | None = None,
    fmt: MessageFormat = None,
    size: int | None = None,
) -> bytes | str | NumpyArray1D:
    """Read a message from the equipment.

    This method will block until one of the following conditions is fulfilled:

    1. `size` bytes have been received &mdash; only if `size` is not `None`.
    2. the [read_termination][msl.equipment.interfaces.message_based.MessageBased.read_termination]
       byte(s) is(are) received &mdash; only if
       [read_termination][msl.equipment.interfaces.message_based.MessageBased.read_termination]
       is not `None`.
    3. a timeout occurs &mdash; only if [timeout][msl.equipment.interfaces.message_based.MessageBased.timeout]
       is not `None`. If a timeout occurs, an
       [MSLTimeoutError][msl.equipment.interfaces.message_based.MSLTimeoutError] is raised.
    4. [max_read_size][msl.equipment.interfaces.message_based.MessageBased.max_read_size]
       bytes have been received. If the maximum number of bytes have been read, an
       [MSLConnectionError][msl.equipment.interfaces.message_based.MSLConnectionError] is raised.

    !!! tip
        You may also want to set the [rstrip][msl.equipment.interfaces.message_based.MessageBased.rstrip]
        value for the class instance.

    Args:
        decode: Whether to decode the message (i.e., convert the message to a [str][])
            or keep the message as [bytes][]. Ignored if `dtype` is not `None`.
        dtype: The data type of the elements in the returned message. Can be any object that numpy
            [dtype][numpy.dtype] supports. For messages that are of scalar type (i.e., a single number)
            it is more efficient to not specify `dtype` but to pass the returned message to the
            [int][] or [float][] class to convert the message to the appropriate numeric type.
            See [MessageDataType][msl.equipment._types.MessageDataType] for more details.
        fmt: The format that the returned message data is in. Ignored if `dtype` is `None`.
            See [MessageFormat][msl.equipment._types.MessageFormat] for more details.
        size: The number of bytes to read. Ignored if the value is `None`.

    Returns:
        The message from the equipment. If `dtype` is specified, then the message is returned
            as a numpy [ndarray][numpy.ndarray], if `decode` is `True` then the message
            is returned as a [str][], otherwise the message is returned as [bytes][].
    """
    if size is not None and size > self._max_read_size:
        msg = f"max_read_size is {self._max_read_size} bytes, requesting {size} bytes"
        raise MSLConnectionError(self, msg)

    try:
        message = self._read(size)
    except (serial.SerialTimeoutException, socket.timeout, TimeoutError, USBTimeoutError):
        raise MSLTimeoutError(self) from None
    except Exception as e:  # noqa: BLE001
        msg = f"{e.__class__.__name__}: {e}"
        raise MSLConnectionError(self, msg) from None

    if size is None:
        if dtype:
            logger.debug("%s.read(dtype=%r, fmt=%r) -> %r", self, dtype, fmt, message)
        else:
            logger.debug("%s.read() -> %r", self, message)
    else:
        if len(message) != size:
            msg = f"received {len(message)} bytes, requested {size} bytes"
            raise MSLConnectionError(self, msg)
        logger.debug("%s.read(size=%s) -> %r", self, size, message)

    if self._rstrip:
        message = message.rstrip()

    if dtype:
        return from_bytes(message, fmt=fmt, dtype=dtype)

    if decode:
        return message.decode(encoding=self._encoding)

    return message

read_stb ¤

read_stb() -> int

Read the status byte from the device.

Returns:

Type Description
int

The status byte.

Source code in src/msl/equipment/interfaces/vxi11.py
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
def read_stb(self) -> int:
    """Read the status byte from the device.

    Returns:
        The status byte.
    """
    if self._core_client is None:
        raise MSLConnectionError(self, "not connected to VXI-11 device")

    return self._core_client.device_readstb(
        lid=self._link_id,
        flags=self._init_flag(),
        lock_timeout=self._lock_timeout_ms,
        io_timeout=self._io_timeout_ms,
    )

reconnect ¤

reconnect(max_attempts: int = 1) -> None

Reconnect to the equipment.

Parameters:

Name Type Description Default
max_attempts int

The maximum number of attempts to try to reconnect with the equipment. If <1, keep trying until a connection is successful. If the maximum number of attempts has been reached then an exception is raise.

1
Source code in src/msl/equipment/interfaces/vxi11.py
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
def reconnect(self, max_attempts: int = 1) -> None:
    """Reconnect to the equipment.

    Args:
        max_attempts: The maximum number of attempts to try to reconnect with the equipment.
            If &lt;1, keep trying until a connection is successful. If the maximum number
            of attempts has been reached then an exception is raise.
    """
    attempt = 0
    while True:
        attempt += 1
        try:
            return self._connect()
        except (MSLConnectionError, MSLTimeoutError):
            if 0 < max_attempts <= attempt:
                raise

remote ¤

remote() -> None

Place the device in a remote state wherein all programmable local controls are disabled.

Source code in src/msl/equipment/interfaces/vxi11.py
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
def remote(self) -> None:
    """Place the device in a remote state wherein all programmable local controls are disabled."""
    if self._core_client is None:
        raise MSLConnectionError(self, "not connected to VXI-11 device")

    self._core_client.device_remote(
        lid=self._link_id,
        flags=self._init_flag(),
        lock_timeout=self._lock_timeout_ms,
        io_timeout=self._io_timeout_ms,
    )

trigger ¤

trigger() -> None

Send a trigger to the device.

Source code in src/msl/equipment/interfaces/vxi11.py
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
def trigger(self) -> None:
    """Send a trigger to the device."""
    if self._core_client is None:
        raise MSLConnectionError(self, "not connected to VXI-11 device")

    self._core_client.device_trigger(
        lid=self._link_id,
        flags=self._init_flag(),
        lock_timeout=self._lock_timeout_ms,
        io_timeout=self._io_timeout_ms,
    )

unlock ¤

unlock() -> None

Release the lock acquired by lock.

Source code in src/msl/equipment/interfaces/vxi11.py
1198
1199
1200
1201
1202
1203
def unlock(self) -> None:
    """Release the lock acquired by [lock][msl.equipment.interfaces.vxi11.VXI11.lock]."""
    if self._core_client is None:
        raise MSLConnectionError(self, "not connected to VXI-11 device")

    self._core_client.device_unlock(self._link_id)

write ¤

write(
    message: bytes | str,
    *,
    data: Sequence1D | None = None,
    dtype: MessageDataType = "<f",
    fmt: MessageFormat = "ieee"
) -> int

Write a message to the equipment.

Parameters:

Name Type Description Default
message bytes | str

The message to write to the equipment.

required
data Sequence1D | None

The data to append to message.

None
dtype MessageDataType

The data type to use to convert each element in data to bytes. Ignored if data is None. See MessageDataType for more details.

'<f'
fmt MessageFormat

The format to use to convert data to bytes. Ignored if data is None. See MessageFormat for more details.

'ieee'

Returns:

Type Description
int

The number of bytes written.

Source code in src/msl/equipment/interfaces/message_based.py
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
def write(
    self,
    message: bytes | str,
    *,
    data: Sequence1D | None = None,
    dtype: MessageDataType = "<f",
    fmt: MessageFormat = "ieee",
) -> int:
    """Write a message to the equipment.

    Args:
        message: The message to write to the equipment.
        data: The data to append to `message`.
        dtype: The data type to use to convert each element in `data` to bytes. Ignored
            if `data` is `None`. See [MessageDataType][msl.equipment._types.MessageDataType]
            for more details.
        fmt: The format to use to convert `data` to bytes. Ignored if `data` is `None`.
            See [MessageFormat][msl.equipment._types.MessageFormat] for more details.

    Returns:
        The number of bytes written.
    """
    if not isinstance(message, bytes):
        message = message.encode(encoding=self._encoding)

    if data is not None:
        message += to_bytes(data, fmt=fmt, dtype=dtype)

    if self._write_termination and not message.endswith(self._write_termination):
        message += self._write_termination

    logger.debug("%s.write(%r)", self, message)

    try:
        return self._write(message)
    except (serial.SerialTimeoutException, socket.timeout, TimeoutError, USBTimeoutError):
        raise MSLTimeoutError(self) from None
    except Exception as e:  # noqa: BLE001
        raise MSLConnectionError(self, str(e)) from None