Source code for QInstrument.lib.QSerialInstrument

import logging

from qtpy.QtSerialPort import QSerialPortInfo
from QInstrument.lib.QAbstractInstrument import QAbstractInstrument
from QInstrument.lib.QSerialInterface import QSerialInterface


logger = logging.getLogger(__name__)


[docs] class QSerialInstrument(QAbstractInstrument): '''Base class for instruments connected via serial ports. Extends :class:`QAbstractInstrument` with a serial transport layer and command-response communication helpers. Holds a :class:`QSerialInterface` by composition and delegates raw I/O through it. Concrete instrument classes inherit from this, declare a :attr:`comm` class attribute with serial parameters, and override :meth:`identify` to verify the connected device. The full communication API available to concrete instruments is: - :meth:`transmit` — send a command with no response expected - :meth:`handshake` — send a command and return the raw response - :meth:`expect` — send a command and test the response string - :meth:`getValue` — send a command and return a typed value All four methods are defined here. A future transport subclass (e.g. ``QGPIBInstrument``) would provide the same API over a different physical layer. Attributes ---------- comm : dict Serial parameters passed to :class:`QSerialInterface` on construction. Subclasses define this as a class attribute using the enum aliases re-exported here (e.g. ``baudRate=QSerialInstrument.BaudRate.Baud9600``). BaudRate : type Alias for ``QSerialPort.BaudRate``. DataBits : type Alias for ``QSerialPort.DataBits``. StopBits : type Alias for ``QSerialPort.StopBits``. Parity : type Alias for ``QSerialPort.Parity``. FlowControl : type Alias for ``QSerialPort.FlowControl``. ''' # Re-export serial enum types for convenient access in subclasses BaudRate = QSerialInterface.BaudRate DataBits = QSerialInterface.DataBits StopBits = QSerialInterface.StopBits Parity = QSerialInterface.Parity FlowControl = QSerialInterface.FlowControl comm: dict = {} def __init__(self, portName: str | None = None, **kwargs) -> None: super().__init__() args = self.comm | kwargs self._interface = QSerialInterface(parent=self, **args) if portName: self.open(portName) def __repr__(self) -> str: name = self.__class__.__name__ if self.isOpen(): port = self._interface.portName() else: port = 'not connected' return f'{name}({port})'
[docs] def identify(self) -> bool: '''Return True if the connected device is the expected instrument. The base implementation always returns ``True``. Subclasses should override this to query the device and verify its identity. Returns ------- bool ``True`` if the device is recognized, ``False`` otherwise. ''' return True
[docs] def open(self, portName: str) -> bool: '''Open a specific serial port and verify the connected device. Opens *portName* via the interface, then calls :meth:`identify`. Closes the port and returns ``False`` if identification fails. Parameters ---------- portName : str Serial port name without the system path prefix (e.g. ``'ttyUSB0'``, ``'COM1'``). Returns ------- bool ``True`` if the port is open and the device identified. ''' if not self._interface.open(portName): return False if not self.identify(): logger.debug(f'Device on {portName} is not ' f'{self.__class__.__name__}') self._interface.close() return self._interface.isOpen()
[docs] def isOpen(self) -> bool: '''Return True if the serial interface is currently open.''' return self._interface.isOpen()
[docs] def close(self) -> None: '''Close the serial interface.''' self._interface.close()
[docs] def find(self) -> 'QSerialInstrument': '''Scan all available serial ports to locate the instrument. Calls :meth:`open` on each port returned by ``QSerialPortInfo.availablePorts()`` until one succeeds. Returns ------- QSerialInstrument The instance itself, whether or not a device was found. Call :meth:`isOpen` to check the result. ''' for port in QSerialPortInfo.availablePorts(): portName = port.portName() logger.debug(f'Trying {portName}') if self.open(portName): break else: logger.error(f'Could not find {self.__class__.__name__}') return self
[docs] def transmit(self, data: str | bytes) -> None: '''Transmit data to the instrument via the serial interface. Parameters ---------- data : str | bytes Data to send. See :meth:`QSerialInterface.transmit`. ''' self._interface.transmit(data)
[docs] def receive(self, **kwargs) -> str | bytes: '''Read a response from the instrument via the serial interface. Parameters ---------- **kwargs Passed through to :meth:`QSerialInterface.receive`. Returns ------- str | bytes Response from the instrument. ''' return self._interface.receive(**kwargs)
[docs] def handshake(self, data: str, **kwargs) -> str: '''Transmit a command and return the instrument's response. Parameters ---------- data : str Command string to send to the instrument. **kwargs : Passed through to :meth:`receive`. Returns ------- str Stripped response string from the instrument. ''' self.transmit(data) return self.receive(**kwargs).strip()
[docs] def expect(self, query: str, response: str, **kwargs) -> bool: '''Return True if the instrument's response contains *response*. Parameters ---------- query : str Command string to send to the instrument. response : str Substring expected in the instrument's reply. **kwargs : Passed through to :meth:`receive`. Returns ------- bool ``True`` if *response* appears in the instrument's reply. ''' return response in self.handshake(query, **kwargs)
[docs] def getValue(self, query: str, dtype: type = float ) -> QAbstractInstrument.PropertyValue | None: '''Query the instrument and return a typed value. Parameters ---------- query : str Command string that elicits a single-value response. dtype : type, optional Converts the response string to the desired type. Default: ``float``. Returns ------- PropertyValue or None Value converted by *dtype*, or ``None`` if conversion fails. ''' response = self.handshake(query) try: value = dtype(response) except (ValueError, TypeError): value = None return value
[docs] @classmethod def example(cls, portname: str | None = None) -> None: '''Connect to an instrument and print its current settings. Creates a ``QCoreApplication``, opens the instrument on *portname* (or auto-detects it with :meth:`find` when *portname* is ``None``), then prints the instrument repr. Intended to be run from ``__main__`` in each instrument module: .. code-block:: python if __name__ == '__main__': QMyInstrument.example() Parameters ---------- portname : str | None, optional Serial port to open (e.g. ``'/dev/ttyUSB0'``). If ``None``, all available ports are scanned via :meth:`find`. ''' from qtpy.QtCore import QCoreApplication QCoreApplication.instance() or QCoreApplication([]) instrument = cls().find() if portname is None else cls(portname) if not instrument.isOpen(): print(f'{cls.__name__}: instrument not found.') return print(instrument)
__all__ = ['QSerialInstrument']