Source code for QInstrument.lib.QSerialInterface

from __future__ import annotations

import logging

from qtpy import QtCore
from qtpy.QtSerialPort import QSerialPort


logger = logging.getLogger(__name__)


[docs] class QSerialInterface(QSerialPort): '''Serial port wrapper providing framed I/O for instrument communication. Wraps ``QSerialPort`` to provide raw serial I/O with custom end-of-line handling for :meth:`transmit` and :meth:`receive`. Intended to run in a dedicated worker thread owned by :class:`QSerialInstrument`; port discovery and device identification are handled by the instrument layer, not here. Parameters ---------- portName : str Name of the serial port to open on construction, without the system-dependent path prefix (e.g. ``'ttyUSB0'``, ``'COM1'``). Pass an empty string (default) to skip opening on construction. eol : bytes | str End-of-line sequence appended to outgoing strings by :meth:`transmit` and used as the read terminator by :meth:`receive`. Default: ``''`` (no terminator). timeout : int | None Milliseconds to wait for incoming data before giving up. Default: ``None``, which is treated as ``100`` ms. baudRate : QSerialPort.BaudRate | None Port baud rate. Uses the ``QSerialPort`` default if ``None``. dataBits : QSerialPort.DataBits | None Number of data bits. Uses the ``QSerialPort`` default if ``None``. stopBits : QSerialPort.StopBits | None Number of stop bits. Uses the ``QSerialPort`` default if ``None``. parity : QSerialPort.Parity | None Parity mode. Uses the ``QSerialPort`` default if ``None``. flowControl : QSerialPort.FlowControl | None Flow control mode. Uses the ``QSerialPort`` default if ``None``. Attributes ---------- eol : bytes End-of-line sequence used for read/write termination. timeout : int Read timeout in milliseconds. BaudRate : type Alias for ``QSerialPort.BaudRate``. Use as ``QSerialInstrument.BaudRate.Baud9600`` in ``comm`` dicts. DataBits : type Alias for ``QSerialPort.DataBits``. StopBits : type Alias for ``QSerialPort.StopBits``. Parity : type Alias for ``QSerialPort.Parity``. FlowControl : type Alias for ``QSerialPort.FlowControl``. Examples -------- >>> iface = QSerialInterface(eol='\\n') >>> iface.open('ttyUSB0') ''' BaudRate = QSerialPort.BaudRate DataBits = QSerialPort.DataBits StopBits = QSerialPort.StopBits Parity = QSerialPort.Parity FlowControl = QSerialPort.FlowControl def __init__(self, portName: str = '', eol: bytes | str = '', timeout: int | None = None, baudRate: QSerialPort.BaudRate | None = None, dataBits: QSerialPort.DataBits | None = None, stopBits: QSerialPort.StopBits | None = None, parity: QSerialPort.Parity | None = None, flowControl: QSerialPort.FlowControl | None = None, **kwargs) -> None: super().__init__(**kwargs) if baudRate is not None: # PyQt6 enums expose .value; PyQt5 enums need int() baud = (baudRate.value if hasattr(baudRate, 'value') else int(baudRate)) self.setBaudRate(baud) if dataBits is not None: self.setDataBits(dataBits) if stopBits is not None: self.setStopBits(stopBits) if parity is not None: self.setParity(parity) if flowControl is not None: self.setFlowControl(flowControl) self.eol = eol if isinstance(eol, bytes) else eol.encode() self.timeout = timeout or 100 self.open(portName)
[docs] def open(self, portName: str) -> bool: '''Open the serial port for read/write access. Parameters ---------- portName : str Name of the serial port device file, without the system-dependent path prefix. Examples: ``'ttyUSB0'``, ``'COM1'``. Returns ------- bool ``True`` if the port was opened successfully. ''' if not portName: return False self.setPortName(portName) if not super().open(QSerialPort.OpenModeFlag.ReadWrite): logger.debug(f'Could not open {portName}') return False self.clear() return True
[docs] def transmit(self, data: str | bytes) -> None: '''Transmit data to the instrument. Strings are encoded to bytes and appended with :attr:`eol` before transmission. Raw ``bytes`` are written as-is without appending :attr:`eol`. Parameters ---------- data : str | bytes Data to transmit. Pass a ``str`` for normal ASCII commands; pass ``bytes`` for binary payloads that must not be modified. ''' if not self.isOpen(): logger.warning('Cannot send data: device is not open.') return if isinstance(data, str): data = data.encode() + self.eol self.write(data) self.flush() logger.debug(f'sent: {data}')
[docs] def receive(self, eol: str | bytes | None = None, raw: bool = False) -> str | bytes: '''Read from the serial interface until the end-of-line sequence. Reads available data into a buffer until :attr:`eol` is found or the read times out. The EOL bytes are stripped from the returned value. Intended to run in a dedicated worker thread (see :class:`QInstrumentWidget`), where blocking the thread with :meth:`waitForReadyRead` is correct and prevents reentrancy. Parameters ---------- eol : str | bytes | None End-of-line sequence to match. Defaults to the instance :attr:`eol` attribute. raw : bool If ``True``, return the response as ``bytes``. If ``False``, decode and return as ``str``. Default: ``False``. Returns ------- str | bytes Data received from the instrument, with the EOL sequence stripped. Returns an empty value on timeout. ''' if eol is not None: eol = eol.encode() if isinstance(eol, str) else eol else: eol = self.eol buffer = b'' while True: if not self.bytesAvailable(): if not self.waitForReadyRead(self.timeout): logger.debug('Timeout waiting for response') break buffer += bytes(self.readAll()) if eol and eol in buffer: buffer = buffer[:buffer.index(eol)] break return buffer if raw else buffer.decode('utf-8', errors='replace')
[docs] def readn(self, n: int = 1) -> bytes: '''Receive exactly n bytes from the instrument. Parameters ---------- n : int Number of bytes to read. Default: ``1``. Returns ------- bytes Data received from the instrument. May be shorter than ``n`` if the read times out. ''' if not self.isOpen(): logger.warning('Cannot read data: device is not open.') return b'' buffer = b'' while len(buffer) < n: if not self.bytesAvailable(): if not self.waitForReadyRead(self.timeout): logger.warning('Timeout waiting for response') break buffer += bytes(self.readAll()) return buffer[:n]
[docs] def sendbreak(self, duration: int = 250) -> None: '''Send a break signal to the instrument. Some instruments use a serial break to reset their communication state after a desynchronisation (e.g. a timeout caused by a dropped response). This method is not currently called anywhere; it is retained as a building block for future error-recovery logic. A complete recovery strategy would: detect failure (timeout in :meth:`receive`, ``None`` from :meth:`QSerialInstrument.getValue`), flush buffers, call :meth:`sendbreak`, and retry before surfacing a persistent failure to the caller. Parameters ---------- duration : int Duration of the break state in milliseconds. Valid range: [1, 500]. Default: ``250``. ''' if not self.isOpen(): logger.warning('Cannot send break: port is not open.') return self.setBreakEnabled(True) QtCore.QTimer.singleShot(duration, lambda: self.setBreakEnabled(False))
__all__ = ['QSerialInterface']