Skip to content

HiSLIP¤

HiSLIP (MessageBased) ¤

HiSLIP(equipment: Equipment)

Base class for the HiSLIP communication protocol.

Parameters:

Name Type Description Default
equipment Equipment

An Equipment instance.

required

A Connection instance supports the following properties for the HiSLIP communication protocol, as well as the properties defined in MessageBased.

Connection Properties:

Name Type Description
buffer_size int

The maximum number of bytes to read at a time. Default: 4096

lock_timeout float

The timeout (in seconds) to wait for a lock (0 means wait forever). Default: 0

Source code in src/msl/equipment/interfaces/hislip.py
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
def __init__(self, equipment: Equipment) -> None:
    """Base class for the [HiSLIP] communication protocol.

    [HiSLIP]: https://www.ivifoundation.org/downloads/Protocol%20Specifications/IVI-6.1_HiSLIP-2.0-2020-04-23.pdf

    Args:
        equipment: An [Equipment][] instance.

    A [Connection][msl.equipment.schema.Connection] instance supports the following _properties_
    for the HiSLIP communication protocol, as well as the _properties_ defined in
    [MessageBased][msl.equipment.interfaces.message_based.MessageBased].

    Attributes: Connection Properties:
        buffer_size (int): The maximum number of bytes to read at a time. _Default: `4096`_
        lock_timeout (float): The timeout (in seconds) to wait for a lock (0 means wait forever). _Default: `0`_
    """
    super().__init__(equipment)

    assert equipment.connection is not None  # noqa: S101

    info = parse_hislip_address(equipment.connection.address)
    if info is None:
        msg = f"Invalid HiSLIP address {equipment.connection.address!r}"
        raise ValueError(msg)

    self._info: ParsedHiSLIPAddress = info

    # HiSLIP does not support termination characters
    self.write_termination = None  # pyright: ignore[reportUnannotatedClassAttribute]
    self.read_termination = None  # pyright: ignore[reportUnannotatedClassAttribute]

    props = equipment.connection.properties
    self._buffer_size: int = props.get("buffer_size", 4096)
    self._lock_timeout: float = props.get("lock_timeout", 0)
    self.lock_timeout = self._lock_timeout

    self._sync: SyncClient
    self._async: AsyncClient
    self._connect()
    self._set_interface_max_read_size()
    self._set_interface_timeout()

asynchronous property ¤

asynchronous: AsyncClient

The reference to the asynchronous client.

encoding property writable ¤

encoding: str

The encoding that is used for read and write operations.

equipment property ¤

equipment: Equipment

The Equipment associated with the interface.

lock_timeout property writable ¤

lock_timeout: float

The time, in seconds, to wait to acquire a lock.

Setting the value to ≤0 (or None) means wait forever.

max_read_size property writable ¤

max_read_size: int

The maximum number of bytes that can be read.

rstrip property writable ¤

rstrip: bool

Whether to remove trailing whitespace from read messages.

synchronous property ¤

synchronous: SyncClient

The reference to the synchronous client.

timeout property writable ¤

timeout: float | None

The timeout, in seconds, for read and write operations.

A value <0 will set the timeout to be None (blocking mode).

clear ¤

clear() -> None

Send the clear command to the device.

Source code in src/msl/equipment/interfaces/hislip.py
1472
1473
1474
1475
1476
1477
1478
1479
1480
1481
1482
def clear(self) -> None:
    """Send the `clear` command to the device."""
    # IVI-6.1: IVI High-Speed LAN Instrument Protocol (HiSLIP)
    # 23 April 2020 (Revision 2.0)
    # Section 6.12: Device Clear Transaction
    #
    # This Connection class does not use the asynchronous client in an
    # asynchronous manner, therefore there should not be any pending
    # requests that need to be waited on to finish
    acknowledged = self._async.async_device_clear()
    _ = self._sync.device_clear_complete(acknowledged.feature_bitmap)

disconnect ¤

disconnect() -> None

Close the connection to the HiSLIP server.

Source code in src/msl/equipment/interfaces/hislip.py
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
def disconnect(self) -> None:  # pyright: ignore[reportImplicitOverride]
    """Close the connection to the HiSLIP server."""
    if not hasattr(self, "_async"):
        return

    if self._async.socket is None and self._sync.socket is None:
        return

    self._async.close()
    self._sync.close()
    super().disconnect()

lock ¤

lock(lock_string: str = '') -> bool

Acquire the device's lock.

Parameters:

Name Type Description Default
lock_string str

An ASCII string that identifies this lock. If not specified, then an exclusive lock is requested, otherwise the string indicates an identification of a shared-lock request.

''

Returns:

Type Description
bool

Whether acquiring the lock was successful.

Source code in src/msl/equipment/interfaces/hislip.py
1484
1485
1486
1487
1488
1489
1490
1491
1492
1493
1494
1495
1496
def lock(self, lock_string: str = "") -> bool:
    """Acquire the device's lock.

    Args:
        lock_string: An ASCII string that identifies this lock. If not specified, then
            an exclusive lock is requested, otherwise the string indicates an
            identification of a shared-lock request.

    Returns:
        Whether acquiring the lock was successful.
    """
    status = self._async.async_lock_request(timeout=self._lock_timeout, lock_string=lock_string)
    return status.success

lock_status ¤

lock_status() -> tuple[bool, int]

Request the lock status from the HiSLIP server.

Returns:

Type Description
tuple[bool, int]

Whether the HiSLIP server has an exclusive lock with a client and the number of HiSLIP clients that have a lock with the HiSLIP server.

Source code in src/msl/equipment/interfaces/hislip.py
1507
1508
1509
1510
1511
1512
1513
1514
1515
def lock_status(self) -> tuple[bool, int]:
    """Request the lock status from the HiSLIP server.

    Returns:
        Whether the HiSLIP server has an exclusive lock with a client and
            the number of HiSLIP clients that have a lock with the HiSLIP server.
    """
    reply = self._async.async_lock_info()
    return reply.exclusive, reply.num_locks

query ¤

query(
    message: bytes | str,
    *,
    delay: float = 0.0,
    decode: Literal[True] = True,
    dtype: None = None,
    fmt: MessageFormat = ...,
    size: int | None = ...
) -> str
query(
    message: bytes | str,
    *,
    delay: float = 0.0,
    decode: Literal[False] = False,
    dtype: None = None,
    fmt: MessageFormat = ...,
    size: int | None = ...
) -> bytes
query(
    message: bytes | str,
    *,
    delay: float = 0.0,
    decode: bool = ...,
    dtype: MessageDataType = ...,
    fmt: MessageFormat = ...,
    size: int | None = ...
) -> NumpyArray1D
query(
    message: bytes | str,
    *,
    delay: float = 0.0,
    decode: bool = True,
    dtype: MessageDataType | None = None,
    fmt: MessageFormat = None,
    size: int | None = None
) -> bytes | str | NumpyArray1D

Convenience method for performing a write followed by a read.

Parameters:

Name Type Description Default
message bytes | str

The message to write to the equipment.

required
delay float

Time delay, in seconds, to wait between the write and read operations.

0.0
decode bool

Whether to decode the returned message (i.e., convert the message to a str) or keep the message as bytes. Ignored if dtype is not None.

True
dtype MessageDataType | None

The data type of the elements in the returned message. Can be any object that numpy dtype supports. For messages that are of scalar type (i.e., a single number) it is more efficient to not specify dtype but to pass the returned message to the int or float class to convert the message to the appropriate numeric type. See MessageDataType for more details.

None
fmt MessageFormat

The format that the returned message data is in. Ignored if dtype is None. See MessageFormat for more details.

None
size int | None

The number of bytes to read. Ignored if the value is None.

None

Returns:

Type Description
bytes | str | NumpyArray1D

The message from the equipment. If dtype is specified, then the message is returned as a numpy ndarray, if decode is True then the message is returned as a str, otherwise the message is returned as bytes.

Source code in src/msl/equipment/interfaces/message_based.py
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
def query(
    self,
    message: bytes | str,
    *,
    delay: float = 0.0,
    decode: bool = True,
    dtype: MessageDataType | None = None,
    fmt: MessageFormat = None,
    size: int | None = None,
) -> bytes | str | NumpyArray1D:
    """Convenience method for performing a [write][msl.equipment.interfaces.message_based.MessageBased.write]
    followed by a [read][msl.equipment.interfaces.message_based.MessageBased.read].

    Args:
        message: The message to write to the equipment.
        delay: Time delay, in seconds, to wait between the _write_ and _read_ operations.
        decode: Whether to decode the returned message (i.e., convert the message to a [str][])
            or keep the message as [bytes][]. Ignored if `dtype` is not `None`.
        dtype: The data type of the elements in the returned message. Can be any object that numpy
            [dtype][numpy.dtype] supports. For messages that are of scalar type (i.e., a single number)
            it is more efficient to not specify `dtype` but to pass the returned message to the
            [int][] or [float][] class to convert the message to the appropriate numeric type.
            See [MessageDataType][msl.equipment._types.MessageDataType] for more details.
        fmt: The format that the returned message data is in. Ignored if `dtype` is `None`.
            See [MessageFormat][msl.equipment._types.MessageFormat] for more details.
        size: The number of bytes to read. Ignored if the value is `None`.

    Returns:
        The message from the equipment. If `dtype` is specified, then the message is
            returned as a numpy [ndarray][numpy.ndarray], if `decode` is `True` then the message
            is returned as a [str][], otherwise the message is returned as [bytes][].
    """  # noqa: D205
    _ = self.write(message)
    if delay > 0:
        time.sleep(delay)
    if dtype:
        return self.read(dtype=dtype, fmt=fmt, size=size)
    return self.read(decode=decode, size=size)

read ¤

read(
    *,
    decode: Literal[True] = True,
    dtype: None = None,
    fmt: MessageFormat = ...,
    size: int | None = ...
) -> str
read(
    *,
    decode: Literal[False] = False,
    dtype: None = None,
    fmt: MessageFormat = ...,
    size: int | None = ...
) -> bytes
read(
    *,
    decode: bool = ...,
    dtype: MessageDataType = ...,
    fmt: MessageFormat = ...,
    size: int | None = ...
) -> NumpyArray1D
read(
    *,
    decode: bool = True,
    dtype: MessageDataType | None = None,
    fmt: MessageFormat = None,
    size: int | None = None
) -> bytes | str | NumpyArray1D

Read a message from the equipment.

This method will block until one of the following conditions is fulfilled:

  1. size bytes have been received — only if size is not None.
  2. the read_termination byte(s) is(are) received — only if read_termination is not None.
  3. a timeout occurs — only if timeout is not None. If a timeout occurs, an MSLTimeoutError is raised.
  4. max_read_size bytes have been received. If the maximum number of bytes have been read, an MSLConnectionError is raised.

Tip

You may also want to set the rstrip value for the class instance.

Parameters:

Name Type Description Default
decode bool

Whether to decode the message (i.e., convert the message to a str) or keep the message as bytes. Ignored if dtype is not None.

True
dtype MessageDataType | None

The data type of the elements in the returned message. Can be any object that numpy dtype supports. For messages that are of scalar type (i.e., a single number) it is more efficient to not specify dtype but to pass the returned message to the int or float class to convert the message to the appropriate numeric type. See MessageDataType for more details.

None
fmt MessageFormat

The format that the returned message data is in. Ignored if dtype is None. See MessageFormat for more details.

None
size int | None

The number of bytes to read. Ignored if the value is None.

None

Returns:

Type Description
bytes | str | NumpyArray1D

The message from the equipment. If dtype is specified, then the message is returned as a numpy ndarray, if decode is True then the message is returned as a str, otherwise the message is returned as bytes.

Source code in src/msl/equipment/interfaces/message_based.py
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
def read(
    self,
    *,
    decode: bool = True,
    dtype: MessageDataType | None = None,
    fmt: MessageFormat = None,
    size: int | None = None,
) -> bytes | str | NumpyArray1D:
    """Read a message from the equipment.

    This method will block until one of the following conditions is fulfilled:

    1. `size` bytes have been received &mdash; only if `size` is not `None`.
    2. the [read_termination][msl.equipment.interfaces.message_based.MessageBased.read_termination]
       byte(s) is(are) received &mdash; only if
       [read_termination][msl.equipment.interfaces.message_based.MessageBased.read_termination]
       is not `None`.
    3. a timeout occurs &mdash; only if [timeout][msl.equipment.interfaces.message_based.MessageBased.timeout]
       is not `None`. If a timeout occurs, an
       [MSLTimeoutError][msl.equipment.interfaces.message_based.MSLTimeoutError] is raised.
    4. [max_read_size][msl.equipment.interfaces.message_based.MessageBased.max_read_size]
       bytes have been received. If the maximum number of bytes have been read, an
       [MSLConnectionError][msl.equipment.interfaces.message_based.MSLConnectionError] is raised.

    !!! tip
        You may also want to set the [rstrip][msl.equipment.interfaces.message_based.MessageBased.rstrip]
        value for the class instance.

    Args:
        decode: Whether to decode the message (i.e., convert the message to a [str][])
            or keep the message as [bytes][]. Ignored if `dtype` is not `None`.
        dtype: The data type of the elements in the returned message. Can be any object that numpy
            [dtype][numpy.dtype] supports. For messages that are of scalar type (i.e., a single number)
            it is more efficient to not specify `dtype` but to pass the returned message to the
            [int][] or [float][] class to convert the message to the appropriate numeric type.
            See [MessageDataType][msl.equipment._types.MessageDataType] for more details.
        fmt: The format that the returned message data is in. Ignored if `dtype` is `None`.
            See [MessageFormat][msl.equipment._types.MessageFormat] for more details.
        size: The number of bytes to read. Ignored if the value is `None`.

    Returns:
        The message from the equipment. If `dtype` is specified, then the message is returned
            as a numpy [ndarray][numpy.ndarray], if `decode` is `True` then the message
            is returned as a [str][], otherwise the message is returned as [bytes][].
    """
    if size is not None and size > self._max_read_size:
        msg = f"max_read_size is {self._max_read_size} bytes, requesting {size} bytes"
        raise MSLConnectionError(self, msg)

    try:
        message = self._read(size)
    except (serial.SerialTimeoutException, socket.timeout, TimeoutError, USBTimeoutError):
        raise MSLTimeoutError(self) from None
    except Exception as e:  # noqa: BLE001
        msg = f"{e.__class__.__name__}: {e}"
        raise MSLConnectionError(self, msg) from None

    if size is None:
        if dtype:
            logger.debug("%s.read(dtype=%r, fmt=%r) -> %r", self, dtype, fmt, message)
        else:
            logger.debug("%s.read() -> %r", self, message)
    else:
        if len(message) != size:
            msg = f"received {len(message)} bytes, requested {size} bytes"
            raise MSLConnectionError(self, msg)
        logger.debug("%s.read(size=%s) -> %r", self, size, message)

    if self._rstrip:
        message = message.rstrip()

    if dtype:
        return from_bytes(message, fmt=fmt, dtype=dtype)

    if decode:
        return message.decode(encoding=self._encoding)

    return message

read_stb ¤

read_stb() -> int

Read the status byte from the device.

Returns:

Type Description
int

The status byte.

Source code in src/msl/equipment/interfaces/hislip.py
1459
1460
1461
1462
1463
1464
1465
1466
def read_stb(self) -> int:
    """Read the status byte from the device.

    Returns:
        The status byte.
    """
    reply = self._async.async_status_query(self._sync)
    return reply.status

reconnect ¤

reconnect(max_attempts: int = 1) -> None

Reconnect to the equipment.

Parameters:

Name Type Description Default
max_attempts int

The maximum number of attempts to try to reconnect with the equipment. If <1, keep trying until a connection is successful. If the maximum number of attempts has been reached then an exception is raise.

1
Source code in src/msl/equipment/interfaces/hislip.py
1420
1421
1422
1423
1424
1425
1426
1427
1428
1429
1430
1431
1432
1433
1434
1435
def reconnect(self, max_attempts: int = 1) -> None:
    """Reconnect to the equipment.

    Args:
        max_attempts: The maximum number of attempts to try to reconnect with the equipment.
            If &lt;1, keep trying until a connection is successful. If the maximum number
            of attempts has been reached then an exception is raise.
    """
    attempt = 0
    while True:
        attempt += 1
        try:
            return self._connect()
        except (MSLConnectionError, MSLTimeoutError):
            if 0 < max_attempts <= attempt:
                raise

remote_local_control ¤

remote_local_control(request: int) -> None

Send a GPIB-like remote/local control request.

Parameters:

Name Type Description Default
request int

The request to perform.

  • 0 — Disable remote, VI_GPIB_REN_DEASSERT
  • 1 — Enable remote, VI_GPIB_REN_ASSERT
  • 2 — Disable remote and go to local, VI_GPIB_REN_DEASSERT_GTL
  • 3 — Enable Remote and go to remote, VI_GPIB_REN_ASSERT_ADDRESS
  • 4 — Enable remote and lock out local, VI_GPIB_REN_ASSERT_LLO
  • 5 — Enable remote, go to remote, and set local lockout, VI_GPIB_REN_ASSERT_ADDRESS_LLO
  • 6 — Go to local without changing REN or lockout state, VI_GPIB_REN_ADDRESS_GTL
required
Source code in src/msl/equipment/interfaces/hislip.py
1517
1518
1519
1520
1521
1522
1523
1524
1525
1526
1527
1528
1529
1530
1531
def remote_local_control(self, request: int) -> None:
    """Send a GPIB-like remote/local control request.

    Args:
        request: The request to perform.

            * 0 &mdash; Disable remote, `VI_GPIB_REN_DEASSERT`
            * 1 &mdash; Enable remote, `VI_GPIB_REN_ASSERT`
            * 2 &mdash; Disable remote and go to local, `VI_GPIB_REN_DEASSERT_GTL`
            * 3 &mdash; Enable Remote and go to remote, `VI_GPIB_REN_ASSERT_ADDRESS`
            * 4 &mdash; Enable remote and lock out local, `VI_GPIB_REN_ASSERT_LLO`
            * 5 &mdash; Enable remote, go to remote, and set local lockout, `VI_GPIB_REN_ASSERT_ADDRESS_LLO`
            * 6 &mdash; Go to local without changing REN or lockout state, `VI_GPIB_REN_ADDRESS_GTL`
    """
    _ = self._async.async_remote_local_control(request, self._sync.message_id)

trigger ¤

trigger() -> None

Send the trigger message (emulates a GPIB Group Execute Trigger event).

Source code in src/msl/equipment/interfaces/hislip.py
1468
1469
1470
def trigger(self) -> None:
    """Send the trigger message (emulates a GPIB Group Execute Trigger event)."""
    self._sync.trigger()

unlock ¤

unlock() -> bool

Release the lock acquired by lock.

Returns:

Type Description
bool

Whether releasing the lock was successful.

Source code in src/msl/equipment/interfaces/hislip.py
1498
1499
1500
1501
1502
1503
1504
1505
def unlock(self) -> bool:
    """Release the lock acquired by [lock][msl.equipment.interfaces.hislip.HiSLIP.lock].

    Returns:
        Whether releasing the lock was successful.
    """
    status = self._async.async_lock_release(self._sync.message_id)
    return status.success

write ¤

write(
    message: bytes | str,
    *,
    data: Sequence1D | None = None,
    dtype: MessageDataType = "<f",
    fmt: MessageFormat = "ieee"
) -> int

Write a message to the equipment.

Parameters:

Name Type Description Default
message bytes | str

The message to write to the equipment.

required
data Sequence1D | None

The data to append to message.

None
dtype MessageDataType

The data type to use to convert each element in data to bytes. Ignored if data is None. See MessageDataType for more details.

'<f'
fmt MessageFormat

The format to use to convert data to bytes. Ignored if data is None. See MessageFormat for more details.

'ieee'

Returns:

Type Description
int

The number of bytes written.

Source code in src/msl/equipment/interfaces/message_based.py
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
def write(
    self,
    message: bytes | str,
    *,
    data: Sequence1D | None = None,
    dtype: MessageDataType = "<f",
    fmt: MessageFormat = "ieee",
) -> int:
    """Write a message to the equipment.

    Args:
        message: The message to write to the equipment.
        data: The data to append to `message`.
        dtype: The data type to use to convert each element in `data` to bytes. Ignored
            if `data` is `None`. See [MessageDataType][msl.equipment._types.MessageDataType]
            for more details.
        fmt: The format to use to convert `data` to bytes. Ignored if `data` is `None`.
            See [MessageFormat][msl.equipment._types.MessageFormat] for more details.

    Returns:
        The number of bytes written.
    """
    if not isinstance(message, bytes):
        message = message.encode(encoding=self._encoding)

    if data is not None:
        message += to_bytes(data, fmt=fmt, dtype=dtype)

    if self._write_termination and not message.endswith(self._write_termination):
        message += self._write_termination

    logger.debug("%s.write(%r)", self, message)

    try:
        return self._write(message)
    except (serial.SerialTimeoutException, socket.timeout, TimeoutError, USBTimeoutError):
        raise MSLTimeoutError(self) from None
    except Exception as e:  # noqa: BLE001
        raise MSLConnectionError(self, str(e)) from None