Skip to content

Socket¤

Socket (MessageBased) ¤

Socket(equipment: Equipment)

Base class for equipment that is connected through a socket.

Parameters:

Name Type Description Default
equipment Equipment

An Equipment instance.

required

A Connection instance supports the following properties for the socket communication protocol, as well as the properties defined in MessageBased.

Connection Properties:

Name Type Description
buffer_size int

The maximum number of bytes to read at a time. Default: 4096

Source code in src/msl/equipment/interfaces/socket.py
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
def __init__(self, equipment: Equipment) -> None:
    """Base class for equipment that is connected through a socket.

    Args:
        equipment: An [Equipment][] instance.

    A [Connection][msl.equipment.schema.Connection] instance supports the following _properties_
    for the socket communication protocol, as well as the _properties_ defined in
    [MessageBased][msl.equipment.interfaces.message_based.MessageBased].

    Attributes: Connection Properties:
        buffer_size (int): The maximum number of bytes to read at a time. _Default: `4096`_
    """
    super().__init__(equipment)

    assert equipment.connection is not None  # noqa: S101

    info = parse_socket_address(equipment.connection.address)
    if info is None:
        msg = f"Invalid socket address {equipment.connection.address!r}"
        raise ValueError(msg)

    self._info: ParsedSocketAddress = info

    props = equipment.connection.properties
    self._buffer_size: int = props.get("buffer_size", 4096)
    self._byte_buffer: bytearray = bytearray()

    typ: int = socket.SOCK_DGRAM if equipment.connection.address.startswith("UDP") else socket.SOCK_STREAM
    self._is_stream: bool = typ == socket.SOCK_STREAM
    self._socket: socket.socket = socket.socket(family=socket.AF_INET, type=typ)
    self._connect()

encoding property writable ¤

encoding: str

The encoding that is used for read and write operations.

equipment property ¤

equipment: Equipment

The Equipment associated with the interface.

max_read_size property writable ¤

max_read_size: int

The maximum number of bytes that can be read.

read_termination property writable ¤

read_termination: bytes | None

The termination character sequence that is used for a read operation.

Reading stops when the equipment stops sending data or the read_termination character sequence is detected. If you set the read_termination to be equal to a variable of type str, it will be encoded as bytes.

rstrip property writable ¤

rstrip: bool

Whether to remove trailing whitespace from read messages.

socket property ¤

socket: socket

Returns a reference to the underlying socket.

timeout property writable ¤

timeout: float | None

The timeout, in seconds, for read and write operations.

A value <0 will set the timeout to be None (blocking mode).

write_termination property writable ¤

write_termination: bytes | None

The termination character sequence that is appended to write messages.

If you set the write_termination to be equal to a variable of type str, it will be encoded as bytes.

disconnect ¤

disconnect() -> None

Close the socket.

Source code in src/msl/equipment/interfaces/socket.py
131
132
133
134
135
def disconnect(self) -> None:  # pyright: ignore[reportImplicitOverride]
    """Close the socket."""
    if hasattr(self, "_socket") and self._socket.fileno() != -1:
        self._socket.close()
        super().disconnect()

query ¤

query(
    message: bytes | str,
    *,
    delay: float = 0.0,
    decode: Literal[True] = True,
    dtype: None = None,
    fmt: MessageFormat = ...,
    size: int | None = ...
) -> str
query(
    message: bytes | str,
    *,
    delay: float = 0.0,
    decode: Literal[False] = False,
    dtype: None = None,
    fmt: MessageFormat = ...,
    size: int | None = ...
) -> bytes
query(
    message: bytes | str,
    *,
    delay: float = 0.0,
    decode: bool = ...,
    dtype: MessageDataType = ...,
    fmt: MessageFormat = ...,
    size: int | None = ...
) -> NumpyArray1D
query(
    message: bytes | str,
    *,
    delay: float = 0.0,
    decode: bool = True,
    dtype: MessageDataType | None = None,
    fmt: MessageFormat = None,
    size: int | None = None
) -> bytes | str | NumpyArray1D

Convenience method for performing a write followed by a read.

Parameters:

Name Type Description Default
message bytes | str

The message to write to the equipment.

required
delay float

Time delay, in seconds, to wait between the write and read operations.

0.0
decode bool

Whether to decode the returned message (i.e., convert the message to a str) or keep the message as bytes. Ignored if dtype is not None.

True
dtype MessageDataType | None

The data type of the elements in the returned message. Can be any object that numpy dtype supports. For messages that are of scalar type (i.e., a single number) it is more efficient to not specify dtype but to pass the returned message to the int or float class to convert the message to the appropriate numeric type. See MessageDataType for more details.

None
fmt MessageFormat

The format that the returned message data is in. Ignored if dtype is None. See MessageFormat for more details.

None
size int | None

The number of bytes to read. Ignored if the value is None.

None

Returns:

Type Description
bytes | str | NumpyArray1D

The message from the equipment. If dtype is specified, then the message is returned as a numpy ndarray, if decode is True then the message is returned as a str, otherwise the message is returned as bytes.

Source code in src/msl/equipment/interfaces/message_based.py
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
def query(
    self,
    message: bytes | str,
    *,
    delay: float = 0.0,
    decode: bool = True,
    dtype: MessageDataType | None = None,
    fmt: MessageFormat = None,
    size: int | None = None,
) -> bytes | str | NumpyArray1D:
    """Convenience method for performing a [write][msl.equipment.interfaces.message_based.MessageBased.write]
    followed by a [read][msl.equipment.interfaces.message_based.MessageBased.read].

    Args:
        message: The message to write to the equipment.
        delay: Time delay, in seconds, to wait between the _write_ and _read_ operations.
        decode: Whether to decode the returned message (i.e., convert the message to a [str][])
            or keep the message as [bytes][]. Ignored if `dtype` is not `None`.
        dtype: The data type of the elements in the returned message. Can be any object that numpy
            [dtype][numpy.dtype] supports. For messages that are of scalar type (i.e., a single number)
            it is more efficient to not specify `dtype` but to pass the returned message to the
            [int][] or [float][] class to convert the message to the appropriate numeric type.
            See [MessageDataType][msl.equipment._types.MessageDataType] for more details.
        fmt: The format that the returned message data is in. Ignored if `dtype` is `None`.
            See [MessageFormat][msl.equipment._types.MessageFormat] for more details.
        size: The number of bytes to read. Ignored if the value is `None`.

    Returns:
        The message from the equipment. If `dtype` is specified, then the message is
            returned as a numpy [ndarray][numpy.ndarray], if `decode` is `True` then the message
            is returned as a [str][], otherwise the message is returned as [bytes][].
    """  # noqa: D205
    _ = self.write(message)
    if delay > 0:
        time.sleep(delay)
    if dtype:
        return self.read(dtype=dtype, fmt=fmt, size=size)
    return self.read(decode=decode, size=size)

read ¤

read(
    *,
    decode: Literal[True] = True,
    dtype: None = None,
    fmt: MessageFormat = ...,
    size: int | None = ...
) -> str
read(
    *,
    decode: Literal[False] = False,
    dtype: None = None,
    fmt: MessageFormat = ...,
    size: int | None = ...
) -> bytes
read(
    *,
    decode: bool = ...,
    dtype: MessageDataType = ...,
    fmt: MessageFormat = ...,
    size: int | None = ...
) -> NumpyArray1D
read(
    *,
    decode: bool = True,
    dtype: MessageDataType | None = None,
    fmt: MessageFormat = None,
    size: int | None = None
) -> bytes | str | NumpyArray1D

Read a message from the equipment.

This method will block until one of the following conditions is fulfilled:

  1. size bytes have been received — only if size is not None.
  2. the read_termination byte(s) is(are) received — only if read_termination is not None.
  3. a timeout occurs — only if timeout is not None. If a timeout occurs, an MSLTimeoutError is raised.
  4. max_read_size bytes have been received. If the maximum number of bytes have been read, an MSLConnectionError is raised.

Tip

You may also want to set the rstrip value for the class instance.

Parameters:

Name Type Description Default
decode bool

Whether to decode the message (i.e., convert the message to a str) or keep the message as bytes. Ignored if dtype is not None.

True
dtype MessageDataType | None

The data type of the elements in the returned message. Can be any object that numpy dtype supports. For messages that are of scalar type (i.e., a single number) it is more efficient to not specify dtype but to pass the returned message to the int or float class to convert the message to the appropriate numeric type. See MessageDataType for more details.

None
fmt MessageFormat

The format that the returned message data is in. Ignored if dtype is None. See MessageFormat for more details.

None
size int | None

The number of bytes to read. Ignored if the value is None.

None

Returns:

Type Description
bytes | str | NumpyArray1D

The message from the equipment. If dtype is specified, then the message is returned as a numpy ndarray, if decode is True then the message is returned as a str, otherwise the message is returned as bytes.

Source code in src/msl/equipment/interfaces/message_based.py
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
def read(
    self,
    *,
    decode: bool = True,
    dtype: MessageDataType | None = None,
    fmt: MessageFormat = None,
    size: int | None = None,
) -> bytes | str | NumpyArray1D:
    """Read a message from the equipment.

    This method will block until one of the following conditions is fulfilled:

    1. `size` bytes have been received &mdash; only if `size` is not `None`.
    2. the [read_termination][msl.equipment.interfaces.message_based.MessageBased.read_termination]
       byte(s) is(are) received &mdash; only if
       [read_termination][msl.equipment.interfaces.message_based.MessageBased.read_termination]
       is not `None`.
    3. a timeout occurs &mdash; only if [timeout][msl.equipment.interfaces.message_based.MessageBased.timeout]
       is not `None`. If a timeout occurs, an
       [MSLTimeoutError][msl.equipment.interfaces.message_based.MSLTimeoutError] is raised.
    4. [max_read_size][msl.equipment.interfaces.message_based.MessageBased.max_read_size]
       bytes have been received. If the maximum number of bytes have been read, an
       [MSLConnectionError][msl.equipment.interfaces.message_based.MSLConnectionError] is raised.

    !!! tip
        You may also want to set the [rstrip][msl.equipment.interfaces.message_based.MessageBased.rstrip]
        value for the class instance.

    Args:
        decode: Whether to decode the message (i.e., convert the message to a [str][])
            or keep the message as [bytes][]. Ignored if `dtype` is not `None`.
        dtype: The data type of the elements in the returned message. Can be any object that numpy
            [dtype][numpy.dtype] supports. For messages that are of scalar type (i.e., a single number)
            it is more efficient to not specify `dtype` but to pass the returned message to the
            [int][] or [float][] class to convert the message to the appropriate numeric type.
            See [MessageDataType][msl.equipment._types.MessageDataType] for more details.
        fmt: The format that the returned message data is in. Ignored if `dtype` is `None`.
            See [MessageFormat][msl.equipment._types.MessageFormat] for more details.
        size: The number of bytes to read. Ignored if the value is `None`.

    Returns:
        The message from the equipment. If `dtype` is specified, then the message is returned
            as a numpy [ndarray][numpy.ndarray], if `decode` is `True` then the message
            is returned as a [str][], otherwise the message is returned as [bytes][].
    """
    if size is not None and size > self._max_read_size:
        msg = f"max_read_size is {self._max_read_size} bytes, requesting {size} bytes"
        raise MSLConnectionError(self, msg)

    try:
        message = self._read(size)
    except (serial.SerialTimeoutException, socket.timeout, TimeoutError, USBTimeoutError):
        raise MSLTimeoutError(self) from None
    except Exception as e:  # noqa: BLE001
        msg = f"{e.__class__.__name__}: {e}"
        raise MSLConnectionError(self, msg) from None

    if size is None:
        if dtype:
            logger.debug("%s.read(dtype=%r, fmt=%r) -> %r", self, dtype, fmt, message)
        else:
            logger.debug("%s.read() -> %r", self, message)
    else:
        if len(message) != size:
            msg = f"received {len(message)} bytes, requested {size} bytes"
            raise MSLConnectionError(self, msg)
        logger.debug("%s.read(size=%s) -> %r", self, size, message)

    if self._rstrip:
        message = message.rstrip()

    if dtype:
        return from_bytes(message, fmt=fmt, dtype=dtype)

    if decode:
        return message.decode(encoding=self._encoding)

    return message

reconnect ¤

reconnect(max_attempts: int = 1) -> None

Reconnect to the equipment.

Parameters:

Name Type Description Default
max_attempts int

The maximum number of attempts to try to reconnect with the equipment. If ≤ 0, keep trying until a connection is successful. If the maximum number of attempts has been reached then an exception is raised.

1
Source code in src/msl/equipment/interfaces/socket.py
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
def reconnect(self, max_attempts: int = 1) -> None:
    """Reconnect to the equipment.

    Args:
        max_attempts: The maximum number of attempts to try to reconnect with the equipment.
            If &le; 0, keep trying until a connection is successful. If the maximum number
            of attempts has been reached then an exception is raised.
    """
    self._socket.close()
    self._socket = socket.socket(family=self._socket.family, type=self._socket.type)

    attempt = 0
    while True:
        attempt += 1
        try:
            return self._connect()
        except (MSLConnectionError, MSLTimeoutError):
            if 0 < max_attempts <= attempt:
                raise

write ¤

write(
    message: bytes | str,
    *,
    data: Sequence1D | None = None,
    dtype: MessageDataType = "<f",
    fmt: MessageFormat = "ieee"
) -> int

Write a message to the equipment.

Parameters:

Name Type Description Default
message bytes | str

The message to write to the equipment.

required
data Sequence1D | None

The data to append to message.

None
dtype MessageDataType

The data type to use to convert each element in data to bytes. Ignored if data is None. See MessageDataType for more details.

'<f'
fmt MessageFormat

The format to use to convert data to bytes. Ignored if data is None. See MessageFormat for more details.

'ieee'

Returns:

Type Description
int

The number of bytes written.

Source code in src/msl/equipment/interfaces/message_based.py
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
def write(
    self,
    message: bytes | str,
    *,
    data: Sequence1D | None = None,
    dtype: MessageDataType = "<f",
    fmt: MessageFormat = "ieee",
) -> int:
    """Write a message to the equipment.

    Args:
        message: The message to write to the equipment.
        data: The data to append to `message`.
        dtype: The data type to use to convert each element in `data` to bytes. Ignored
            if `data` is `None`. See [MessageDataType][msl.equipment._types.MessageDataType]
            for more details.
        fmt: The format to use to convert `data` to bytes. Ignored if `data` is `None`.
            See [MessageFormat][msl.equipment._types.MessageFormat] for more details.

    Returns:
        The number of bytes written.
    """
    if not isinstance(message, bytes):
        message = message.encode(encoding=self._encoding)

    if data is not None:
        message += to_bytes(data, fmt=fmt, dtype=dtype)

    if self._write_termination and not message.endswith(self._write_termination):
        message += self._write_termination

    logger.debug("%s.write(%r)", self, message)

    try:
        return self._write(message)
    except (serial.SerialTimeoutException, socket.timeout, TimeoutError, USBTimeoutError):
        raise MSLTimeoutError(self) from None
    except Exception as e:  # noqa: BLE001
        raise MSLConnectionError(self, str(e)) from None