Skip to content

USB¤

Prerequisites¤

Before communicating with a USB device, a libusb-compatible driver must be installed and the directory to the libusb library must be available on the PATH environment variable.

The following instructions are intended to be a starting point if you are having issues communicating with a USB device.

Windows¤

Download the latest libusb library from the repository.

Tip

Here, version 1.0.29 is downloaded. Update this value in the following command if there is a new release.

Invoke-WebRequest -Uri https://github.com/libusb/libusb/releases/download/v1.0.29/libusb-1.0.29.7z -OutFile libusb.7z

Use 7zip to extract the zip file. First, install 7zip if it's not already.

winget install 7zip.7zip

Then extract the library file and copy the appropriate (i.e., x86 or x64, VS or MinGW) libusb-1.0.dll file to a directory that is on your PATH environment variable. The following command extracts the 64-bit library file that was built with Visual Studio 2022 to the C:\Windows\System32 directory, but you may choose to extract a different DLL file to a different directory.

& 'C:\Program Files\7-Zip\7z' e libusb.7z -oC:\Windows\System32 VS2022\MS64\dll\libusb-1.0.dll

Finally, install the Zadig application

winget install akeo.ie.Zadig

then run Zadig (you must open a new administrative terminal to run the following command)

zadig

to install a driver (WinUSB is recommended) for the USB device — follow the Zadig User Guide.

See How to use libusb on Windows for additional information.

Debian/Ubuntu¤

Install the libusb-1.0-0 package.

sudo apt install libusb-1.0-0

To access a USB device without root privilege you should create a udev configuration file. There are many ways to configure udev, here is a typical setup.

# /etc/udev/rules.d/10-custom.rules

SUBSYSTEM=="usb", ATTR{idVendor}=="03eb", ATTR{idProduct}=="2107", GROUP="plugdev", MODE="0664"
SUBSYSTEM=="usb", ATTR{idVendor}=="0403", ATTR{idProduct}=="6001", GROUP="plugdev", MODE="0664"
SUBSYSTEM=="usb", ATTR{idVendor}=="0403", ATTR{idProduct}=="faf0", GROUP="plugdev", MODE="0664"

You need to unplug / plug back the USB device once this file has been created so that udev loads the rules for the matching device, or alternatively, inform the udev daemon about the changes.

sudo udevadm control --reload-rules
sudo udevadm trigger

With this setup, be sure to add users that want to access the USB device(s) to the plugdev group.

sudo adduser $USER plugdev

Remember that you need to log out / log in to get the above command effective, or start a subshell.

newgrp plugdev

macOS¤

Install the libusb package.

brew install libusb

Endpoint dataclass ¤

Endpoint(address: int, interface_number: int, max_packet_size: int)

Information about a USB Endpoint.

Attributes:

Name Type Description
address int

The bEndpointAddress value of the USB Endpoint.

max_packet_size int

The wMaxPacketSize value of the USB Endpoint.

interface_number int

The bInterfaceNumber value of the USB Interface that the USB Endpoint is associated with.

USB (MessageBased) ¤

USB(equipment: Equipment)

Base class for (raw) USB communication.

Parameters:

Name Type Description Default
equipment Equipment

An Equipment instance.

required

A Connection instance supports the following properties for the USB communication protocol, as well as the properties defined in MessageBased.

Connection Properties:

Name Type Description
bAlternateSetting int

The value of bAlternateSetting of the USB Interface Descriptor. Default: 0

bConfigurationValue int | None

The value of bConfigurationValue of the USB Configuration Descriptor. If None, use the first Configuration found. Default: None

buffer_size int

The maximum number of bytes to read at a time. Default: 4096

usb_backend Literal['libusb1', 'libusb0', 'openusb'] | None

The PyUSB backend library to use for the connection. If None, selects the first backend that is available. Default: None

Source code in src/msl/equipment/interfaces/usb.py
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
def __init__(self, equipment: Equipment) -> None:  # noqa: C901, PLR0915
    """Base class for (raw) USB communication.

    Args:
        equipment: An [Equipment][] instance.

    A [Connection][msl.equipment.schema.Connection] instance supports the following _properties_
    for the USB communication protocol, as well as the _properties_ defined in
    [MessageBased][msl.equipment.interfaces.message_based.MessageBased].

    Attributes: Connection Properties:
        bAlternateSetting (int): The value of `bAlternateSetting` of the USB Interface Descriptor.
            _Default: `0`_
        bConfigurationValue (int | None): The value of `bConfigurationValue` of the USB Configuration
            Descriptor. If `None`, use the first Configuration found. _Default: `None`_
        buffer_size (int): The maximum number of bytes to read at a time. _Default: `4096`_
        usb_backend (Literal["libusb1", "libusb0", "openusb"] | None): The PyUSB backend library to use
            for the connection. If `None`, selects the first backend that is available. _Default: `None`_
    """
    self._detached: bool = False
    self._interface_number: int = 0
    self._timeout_ms: int = 0
    self._byte_buffer: bytearray = bytearray()
    super().__init__(equipment)

    assert equipment.connection is not None  # noqa: S101
    parsed = parse_usb_address(equipment.connection.address)
    if parsed is None:
        msg = f"Invalid USB address {equipment.connection.address!r}"
        raise ValueError(msg)

    p = equipment.connection.properties
    self._buffer_size: int = p.get("buffer_size", 4096)

    backend = p.get("usb_backend")
    if isinstance(backend, str):
        backend = _usb_backend(backend)

    try:
        device = _find_device(parsed, backend)
    except usb.core.NoBackendError:
        msg = (
            "A PyUSB backend is not available. For tips on how to fix this issue see "
            "https://mslnz.github.io/msl-equipment/dev/api/interfaces/usb/"
        )
        raise MSLConnectionError(self, msg) from None

    if device is None:
        msg = "The USB device was not found"
        if _is_linux_and_not_sudo():
            msg += " (try running as sudo or create a udev rule)"
        raise MSLConnectionError(self, msg)

    self._device: Any = device  # usb.core.Device instance

    self._interface_number = parsed.interface_number
    configuration_value: int | None = p.get("bConfigurationValue")
    alternate_setting: int = p.get("bAlternateSetting", 0)

    # If a kernel driver is active, the device will be unable to perform I/O
    # On Windows there is no kernel so NotImplementedError is raised
    with contextlib.suppress(usb.core.USBError, NotImplementedError):
        if device.is_kernel_driver_active(self._interface_number):
            device.detach_kernel_driver(self._interface_number)
            self._detached = True

    # Only set the USB Configuration if one is not already set
    # https://libusb.sourceforge.io/api-1.0/libusb_caveats.html#configsel
    cfg = None
    with contextlib.suppress(usb.core.USBError):
        cfg = device.get_active_configuration()

    if cfg is None or (configuration_value is not None and cfg.bConfigurationValue != configuration_value):
        try:
            device.set_configuration(configuration_value)
            cfg = device.get_active_configuration()
        except ValueError:
            msg = f"Invalid configuration value {configuration_value}"
            raise MSLConnectionError(self, msg) from None
        except usb.core.USBError as e:
            msg = f"Cannot set configuration to value {configuration_value}, {e}"
            raise MSLConnectionError(self, msg) from None

    # Get the USB Interface (and maybe set the Alternate Setting)
    interface = cfg[(self._interface_number, alternate_setting)]
    alternates = list(usb.util.find_descriptor(cfg, find_all=True, bInterfaceNumber=interface.bInterfaceNumber))
    if len(alternates) > 1:
        try:
            interface.set_altsetting()
        except usb.core.USBError as e:
            msg = f"Cannot set alternate setting for {interface!r}, {e}"
            raise MSLConnectionError(self, msg) from None

    # Get the info about some of the In/Out Endpoints for the selected USB Interface
    ep = _endpoint(self, interface, usb.util.ENDPOINT_IN, usb.util.ENDPOINT_TYPE_BULK)
    assert ep is not None  # noqa: S101
    self._bulk_in: Endpoint = ep

    ep = _endpoint(self, interface, usb.util.ENDPOINT_OUT, usb.util.ENDPOINT_TYPE_BULK)
    assert ep is not None  # noqa: S101
    self._bulk_out: Endpoint = ep

    self._intr_in: Endpoint | None = _endpoint(self, interface, usb.util.ENDPOINT_IN, usb.util.ENDPOINT_TYPE_INTR)
    self._intr_out: Endpoint | None = _endpoint(self, interface, usb.util.ENDPOINT_OUT, usb.util.ENDPOINT_TYPE_INTR)

bulk_in_endpoint property ¤

bulk_in_endpoint: Endpoint

Information about the Bulk-IN endpoint.

bulk_out_endpoint property ¤

bulk_out_endpoint: Endpoint

Information about the Bulk-OUT endpoint.

device_version property ¤

device_version: int

Returns the device version (release) number.

Corresponds to the bcdDevice field in the Device Descriptor.

encoding property writable ¤

encoding: str

The encoding that is used for read and write operations.

equipment property ¤

equipment: Equipment

The Equipment associated with the interface.

intr_in_endpoint property ¤

intr_in_endpoint: Endpoint | None

Information about the Interrupt-IN endpoint.

intr_out_endpoint property ¤

intr_out_endpoint: Endpoint | None

Information about the Interrupt-OUT endpoint.

max_read_size property writable ¤

max_read_size: int

The maximum number of bytes that can be read.

read_termination property writable ¤

read_termination: bytes | None

The termination character sequence that is used for a read operation.

Reading stops when the equipment stops sending data or the read_termination character sequence is detected. If you set the read_termination to be equal to a variable of type str, it will be encoded as bytes.

rstrip property writable ¤

rstrip: bool

Whether to remove trailing whitespace from read messages.

timeout property writable ¤

timeout: float | None

The timeout, in seconds, for read and write operations.

A value <0 will set the timeout to be None (blocking mode).

write_termination property writable ¤

write_termination: bytes | None

The termination character sequence that is appended to write messages.

If you set the write_termination to be equal to a variable of type str, it will be encoded as bytes.

CtrlDirection (IntEnum) ¤

The direction of a control transfer.

Attributes:

Name Type Description
IN int

Transfer direction is from device to computer.

OUT int

Transfer direction is from computer to device.

CtrlRecipient (IntEnum) ¤

The recipient of a control transfer.

Attributes:

Name Type Description
DEVICE int

Transfer is for a Device descriptor.

INTERFACE int

Transfer is for an Interface descriptor.

ENDPOINT int

Transfer is for an Endpoint descriptor.

CtrlType (IntEnum) ¤

The type of a control transfer.

Attributes:

Name Type Description
STANDARD int

Standard type.

CLASS int

Class type.

VENDOR int

Vendor type.

build_request_type staticmethod ¤

build_request_type(
    direction: CtrlDirection, type: CtrlType, recipient: CtrlRecipient
) -> int

Build a bmRequestType field for a control request.

Parameters:

Name Type Description Default
direction CtrlDirection

Transfer direction.

required
type CtrlType

Transfer type.

required
recipient CtrlRecipient

Recipient of the transfer.

required

Returns:

Type Description
int

The request_type argument for a ctrl_transfer.

Source code in src/msl/equipment/interfaces/usb.py
471
472
473
474
475
476
477
478
479
480
481
482
483
484
@staticmethod
def build_request_type(direction: USB.CtrlDirection, type: USB.CtrlType, recipient: USB.CtrlRecipient) -> int:  # noqa: A002
    """Build a `bmRequestType` field for a control request.

    Args:
        direction: Transfer direction.
        type: Transfer type.
        recipient: Recipient of the transfer.

    Returns:
        The `request_type` argument for a [ctrl_transfer][msl.equipment.interfaces.usb.USB.ctrl_transfer].
    """
    request: int = usb.util.build_request_type(direction, type, recipient)
    return request

clear_halt ¤

clear_halt(endpoint: Endpoint) -> None

Clear the halt/stall condition for an endpoint.

Parameters:

Name Type Description Default
endpoint Endpoint

The endpoint to clear.

required
Source code in src/msl/equipment/interfaces/usb.py
496
497
498
499
500
501
502
503
504
505
506
def clear_halt(self, endpoint: Endpoint) -> None:
    """Clear the halt/stall condition for an endpoint.

    Args:
        endpoint: The endpoint to clear.
    """
    logger.debug("%s.clear_halt(0x%02X)", self, endpoint.address)
    try:
        self._device.clear_halt(endpoint.address)
    except usb.core.USBError as e:
        raise MSLConnectionError(self, str(e)) from None

ctrl_transfer ¤

ctrl_transfer(
    request_type: int,
    request: int,
    value: int = ...,
    index: int = ...,
    data_or_length: None = None,
) -> int | array[int]
ctrl_transfer(
    request_type: int,
    request: int,
    value: int = ...,
    index: int = ...,
    data_or_length: array[int] | bytes | bytearray | str = ...,
) -> int
ctrl_transfer(
    request_type: int,
    request: int,
    value: int = ...,
    index: int = ...,
    data_or_length: int = ...,
) -> array[int]
ctrl_transfer(
    request_type: int,
    request: int,
    value: int = 0,
    index: int = 0,
    data_or_length: int | array[int] | bytes | bytearray | str | None = None,
) -> int | array[int]

Perform a control transfer on Endpoint 0.

Parameters:

Name Type Description Default
request_type int

The bmRequestType field of the request. The bitmap value defines the direction (OUT or IN) of the request, the type of request and the designated recipient. See build_request_type.

required
request int

The bRequest field of the request.

required
value int

The wValue field of the request.

0
index int

The wIndex field of the request.

0
data_or_length int | array[int] | bytes | bytearray | str | None

Either the data payload for an OUT request, an array buffer to receive data for an IN request, or the number of bytes to read for an IN request.

None

Returns:

Type Description
int | array[int]

For an OUT request or if data_or_length is an array, the returned value is the number of bytes transferred. For an IN request, the returned value is the data that was read (as an array).

Source code in src/msl/equipment/interfaces/usb.py
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
def ctrl_transfer(
    self,
    request_type: int,
    request: int,
    value: int = 0,
    index: int = 0,
    data_or_length: int | array[int] | bytes | bytearray | str | None = None,
) -> int | array[int]:
    """Perform a control transfer on Endpoint 0.

    Args:
        request_type: The *bmRequestType* field of the request. The bitmap value defines the
            direction (OUT or IN) of the request, the type of request and the designated recipient.
            See [build_request_type][msl.equipment.interfaces.usb.USB.build_request_type].
        request: The *bRequest* field of the request.
        value: The *wValue* field of the request.
        index: The *wIndex* field of the request.
        data_or_length: Either the data payload for an OUT request, an [array][array.array]
            buffer to receive data for an IN request, or the number of bytes to read for an
            IN request.

    Returns:
        For an OUT request or if `data_or_length` is an [array][array.array], the returned
            value is the number of bytes transferred. For an IN request, the returned
            value is the data that was read (as an [array][array.array]).
    """
    # fmt: off
    logger.debug(
        "%s.ctrl_transfer(0x%02X, 0x%02X, 0x%04X, 0x%04X, %s, %d)",
        self, request_type, request, value, index, data_or_length, self._timeout_ms
    )
    # fmt: on

    try:
        out: int | array[int] = self._device.ctrl_transfer(
            bmRequestType=request_type,
            bRequest=request,
            wValue=value,
            wIndex=index,
            data_or_wLength=data_or_length,
            timeout=self._timeout_ms,
        )
    except usb.core.USBTimeoutError:
        raise MSLTimeoutError(self) from None
    except usb.core.USBError as e:
        raise MSLConnectionError(self, str(e)) from None
    else:
        return out

disconnect ¤

disconnect() -> None

Disconnect from the USB device.

Source code in src/msl/equipment/interfaces/usb.py
600
601
602
603
604
605
606
607
608
609
610
611
612
613
def disconnect(self) -> None:  # pyright: ignore[reportImplicitOverride]
    """Disconnect from the USB device."""
    if not hasattr(self, "_device") or self._device is None:
        return None

    if self._detached:
        with contextlib.suppress(usb.core.USBError):
            self._device.attach_kernel_driver(self._interface_number)

    with contextlib.suppress(usb.core.USBError):
        usb.util.dispose_resources(self._device)

    self._device = None
    return super().disconnect()

query ¤

query(
    message: bytes | str,
    *,
    delay: float = 0.0,
    decode: Literal[True] = True,
    dtype: None = None,
    fmt: MessageFormat = ...,
    size: int | None = ...
) -> str
query(
    message: bytes | str,
    *,
    delay: float = 0.0,
    decode: Literal[False] = False,
    dtype: None = None,
    fmt: MessageFormat = ...,
    size: int | None = ...
) -> bytes
query(
    message: bytes | str,
    *,
    delay: float = 0.0,
    decode: bool = ...,
    dtype: MessageDataType = ...,
    fmt: MessageFormat = ...,
    size: int | None = ...
) -> NumpyArray1D
query(
    message: bytes | str,
    *,
    delay: float = 0.0,
    decode: bool = True,
    dtype: MessageDataType | None = None,
    fmt: MessageFormat = None,
    size: int | None = None
) -> bytes | str | NumpyArray1D

Convenience method for performing a write followed by a read.

Parameters:

Name Type Description Default
message bytes | str

The message to write to the equipment.

required
delay float

Time delay, in seconds, to wait between the write and read operations.

0.0
decode bool

Whether to decode the returned message (i.e., convert the message to a str) or keep the message as bytes. Ignored if dtype is not None.

True
dtype MessageDataType | None

The data type of the elements in the returned message. Can be any object that numpy dtype supports. For messages that are of scalar type (i.e., a single number) it is more efficient to not specify dtype but to pass the returned message to the int or float class to convert the message to the appropriate numeric type. See MessageDataType for more details.

None
fmt MessageFormat

The format that the returned message data is in. Ignored if dtype is None. See MessageFormat for more details.

None
size int | None

The number of bytes to read. Ignored if the value is None.

None

Returns:

Type Description
bytes | str | NumpyArray1D

The message from the equipment. If dtype is specified, then the message is returned as a numpy ndarray, if decode is True then the message is returned as a str, otherwise the message is returned as bytes.

Source code in src/msl/equipment/interfaces/message_based.py
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
def query(
    self,
    message: bytes | str,
    *,
    delay: float = 0.0,
    decode: bool = True,
    dtype: MessageDataType | None = None,
    fmt: MessageFormat = None,
    size: int | None = None,
) -> bytes | str | NumpyArray1D:
    """Convenience method for performing a [write][msl.equipment.interfaces.message_based.MessageBased.write]
    followed by a [read][msl.equipment.interfaces.message_based.MessageBased.read].

    Args:
        message: The message to write to the equipment.
        delay: Time delay, in seconds, to wait between the _write_ and _read_ operations.
        decode: Whether to decode the returned message (i.e., convert the message to a [str][])
            or keep the message as [bytes][]. Ignored if `dtype` is not `None`.
        dtype: The data type of the elements in the returned message. Can be any object that numpy
            [dtype][numpy.dtype] supports. For messages that are of scalar type (i.e., a single number)
            it is more efficient to not specify `dtype` but to pass the returned message to the
            [int][] or [float][] class to convert the message to the appropriate numeric type.
            See [MessageDataType][msl.equipment._types.MessageDataType] for more details.
        fmt: The format that the returned message data is in. Ignored if `dtype` is `None`.
            See [MessageFormat][msl.equipment._types.MessageFormat] for more details.
        size: The number of bytes to read. Ignored if the value is `None`.

    Returns:
        The message from the equipment. If `dtype` is specified, then the message is
            returned as a numpy [ndarray][numpy.ndarray], if `decode` is `True` then the message
            is returned as a [str][], otherwise the message is returned as [bytes][].
    """  # noqa: D205
    _ = self.write(message)
    if delay > 0:
        time.sleep(delay)
    if dtype:
        return self.read(dtype=dtype, fmt=fmt, size=size)
    return self.read(decode=decode, size=size)

read ¤

read(
    *,
    decode: Literal[True] = True,
    dtype: None = None,
    fmt: MessageFormat = ...,
    size: int | None = ...
) -> str
read(
    *,
    decode: Literal[False] = False,
    dtype: None = None,
    fmt: MessageFormat = ...,
    size: int | None = ...
) -> bytes
read(
    *,
    decode: bool = ...,
    dtype: MessageDataType = ...,
    fmt: MessageFormat = ...,
    size: int | None = ...
) -> NumpyArray1D
read(
    *,
    decode: bool = True,
    dtype: MessageDataType | None = None,
    fmt: MessageFormat = None,
    size: int | None = None
) -> bytes | str | NumpyArray1D

Read a message from the equipment.

This method will block until one of the following conditions is fulfilled:

  1. size bytes have been received — only if size is not None.
  2. the read_termination byte(s) is(are) received — only if read_termination is not None.
  3. a timeout occurs — only if timeout is not None. If a timeout occurs, an MSLTimeoutError is raised.
  4. max_read_size bytes have been received. If the maximum number of bytes have been read, an MSLConnectionError is raised.

Tip

You may also want to set the rstrip value for the class instance.

Parameters:

Name Type Description Default
decode bool

Whether to decode the message (i.e., convert the message to a str) or keep the message as bytes. Ignored if dtype is not None.

True
dtype MessageDataType | None

The data type of the elements in the returned message. Can be any object that numpy dtype supports. For messages that are of scalar type (i.e., a single number) it is more efficient to not specify dtype but to pass the returned message to the int or float class to convert the message to the appropriate numeric type. See MessageDataType for more details.

None
fmt MessageFormat

The format that the returned message data is in. Ignored if dtype is None. See MessageFormat for more details.

None
size int | None

The number of bytes to read. Ignored if the value is None.

None

Returns:

Type Description
bytes | str | NumpyArray1D

The message from the equipment. If dtype is specified, then the message is returned as a numpy ndarray, if decode is True then the message is returned as a str, otherwise the message is returned as bytes.

Source code in src/msl/equipment/interfaces/message_based.py
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
def read(
    self,
    *,
    decode: bool = True,
    dtype: MessageDataType | None = None,
    fmt: MessageFormat = None,
    size: int | None = None,
) -> bytes | str | NumpyArray1D:
    """Read a message from the equipment.

    This method will block until one of the following conditions is fulfilled:

    1. `size` bytes have been received &mdash; only if `size` is not `None`.
    2. the [read_termination][msl.equipment.interfaces.message_based.MessageBased.read_termination]
       byte(s) is(are) received &mdash; only if
       [read_termination][msl.equipment.interfaces.message_based.MessageBased.read_termination]
       is not `None`.
    3. a timeout occurs &mdash; only if [timeout][msl.equipment.interfaces.message_based.MessageBased.timeout]
       is not `None`. If a timeout occurs, an
       [MSLTimeoutError][msl.equipment.interfaces.message_based.MSLTimeoutError] is raised.
    4. [max_read_size][msl.equipment.interfaces.message_based.MessageBased.max_read_size]
       bytes have been received. If the maximum number of bytes have been read, an
       [MSLConnectionError][msl.equipment.interfaces.message_based.MSLConnectionError] is raised.

    !!! tip
        You may also want to set the [rstrip][msl.equipment.interfaces.message_based.MessageBased.rstrip]
        value for the class instance.

    Args:
        decode: Whether to decode the message (i.e., convert the message to a [str][])
            or keep the message as [bytes][]. Ignored if `dtype` is not `None`.
        dtype: The data type of the elements in the returned message. Can be any object that numpy
            [dtype][numpy.dtype] supports. For messages that are of scalar type (i.e., a single number)
            it is more efficient to not specify `dtype` but to pass the returned message to the
            [int][] or [float][] class to convert the message to the appropriate numeric type.
            See [MessageDataType][msl.equipment._types.MessageDataType] for more details.
        fmt: The format that the returned message data is in. Ignored if `dtype` is `None`.
            See [MessageFormat][msl.equipment._types.MessageFormat] for more details.
        size: The number of bytes to read. Ignored if the value is `None`.

    Returns:
        The message from the equipment. If `dtype` is specified, then the message is returned
            as a numpy [ndarray][numpy.ndarray], if `decode` is `True` then the message
            is returned as a [str][], otherwise the message is returned as [bytes][].
    """
    if size is not None and size > self._max_read_size:
        msg = f"max_read_size is {self._max_read_size} bytes, requesting {size} bytes"
        raise MSLConnectionError(self, msg)

    try:
        message = self._read(size)
    except (serial.SerialTimeoutException, socket.timeout, TimeoutError, USBTimeoutError):
        raise MSLTimeoutError(self) from None
    except Exception as e:  # noqa: BLE001
        msg = f"{e.__class__.__name__}: {e}"
        raise MSLConnectionError(self, msg) from None

    if size is None:
        if dtype:
            logger.debug("%s.read(dtype=%r, fmt=%r) -> %r", self, dtype, fmt, message)
        else:
            logger.debug("%s.read() -> %r", self, message)
    else:
        if len(message) != size:
            msg = f"received {len(message)} bytes, requested {size} bytes"
            raise MSLConnectionError(self, msg)
        logger.debug("%s.read(size=%s) -> %r", self, size, message)

    if self._rstrip:
        message = message.rstrip()

    if dtype:
        return from_bytes(message, fmt=fmt, dtype=dtype)

    if decode:
        return message.decode(encoding=self._encoding)

    return message

reset_device ¤

reset_device() -> None

Perform a USB port reset for the device.

If your program has to call this method, the reset will cause the device state to change (e.g., register values may be reset).

Source code in src/msl/equipment/interfaces/usb.py
625
626
627
628
629
630
631
632
def reset_device(self) -> None:
    """Perform a USB port reset for the device.

    If your program has to call this method, the reset will cause the
    device state to change (e.g., register values may be reset).
    """
    logger.debug("%s.reset_device()", self)
    self._device.reset()

write ¤

write(
    message: bytes | str,
    *,
    data: Sequence1D | None = None,
    dtype: MessageDataType = "<f",
    fmt: MessageFormat = "ieee"
) -> int

Write a message to the equipment.

Parameters:

Name Type Description Default
message bytes | str

The message to write to the equipment.

required
data Sequence1D | None

The data to append to message.

None
dtype MessageDataType

The data type to use to convert each element in data to bytes. Ignored if data is None. See MessageDataType for more details.

'<f'
fmt MessageFormat

The format to use to convert data to bytes. Ignored if data is None. See MessageFormat for more details.

'ieee'

Returns:

Type Description
int

The number of bytes written.

Source code in src/msl/equipment/interfaces/message_based.py
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
def write(
    self,
    message: bytes | str,
    *,
    data: Sequence1D | None = None,
    dtype: MessageDataType = "<f",
    fmt: MessageFormat = "ieee",
) -> int:
    """Write a message to the equipment.

    Args:
        message: The message to write to the equipment.
        data: The data to append to `message`.
        dtype: The data type to use to convert each element in `data` to bytes. Ignored
            if `data` is `None`. See [MessageDataType][msl.equipment._types.MessageDataType]
            for more details.
        fmt: The format to use to convert `data` to bytes. Ignored if `data` is `None`.
            See [MessageFormat][msl.equipment._types.MessageFormat] for more details.

    Returns:
        The number of bytes written.
    """
    if not isinstance(message, bytes):
        message = message.encode(encoding=self._encoding)

    if data is not None:
        message += to_bytes(data, fmt=fmt, dtype=dtype)

    if self._write_termination and not message.endswith(self._write_termination):
        message += self._write_termination

    logger.debug("%s.write(%r)", self, message)

    try:
        return self._write(message)
    except (serial.SerialTimeoutException, socket.timeout, TimeoutError, USBTimeoutError):
        raise MSLTimeoutError(self) from None
    except Exception as e:  # noqa: BLE001
        raise MSLConnectionError(self, str(e)) from None