"""
This module has been modified from the chopped-out of class from nplab_.
It is a serial instrument class to simplify the process of interfacing with
equipment that talks on a serial port. The idea is that your instrument can
subclass :class:`ExtensibleSerialInstrument` and provide methods to control the
hardware, which will mostly consist of `self.query()` commands. It also has
options for adding OptionalModules so that you don't have to have multiple
classes for lots of instruments with different configurations
The :class:`QueriedProperty` class is a convenient shorthand to create a property
that is read and/or set with a single serial query (i.e. a read followed by a write).
"""
# SPDX-License-Identifier: GPL-3.0-only
# For copyright and authorship information, see the Git history at:
# https://gitlab.com/openflexure/sangaboard/pysangaboard
import logging
import os
import re
import threading
import time
import serial
from serial.tools import list_ports, list_ports_common
from .testing import DummySerialDevice
LOGGER = logging.getLogger(__name__)
[docs]
def list_serial_ports():
"""List available serial ports, including a workaround for Raspberry Pi
The built-in list of com ports misses the RPi hardware serial port.
This workaround explicitly checks for it, and includes it if present.
"""
comports = list_ports.comports()
if os.path.exists("/dev/ttyS0"):
comports.append(list_ports_common.ListPortInfo("/dev/ttyS0"))
return comports
[docs]
class ExtensibleSerialInstrument(object):
"""
An instrument that communicates by sending strings back and forth over serial
This base class provides commonly-used mechanisms that support the use of
serial instruments. Most interactions with this class involve
a call to the `query` method. This writes a message and returns the reply.
This has been hacked together from the nplab_ MessageBusInstrument and
SerialInstrument classes.
**Threading Notes**
The message bus protocol includes a property, `communications_lock`. All
commands that use the communications bus should be protected by this lock.
It's also permissible to use it to protect sequences of calls to the bus
that must be atomic (e.g. a multi-part exchange of messages). However, try
not to hold it too long - or odd things might happen if other threads are
blocked for a long time. The lock is reentrant so there's no issue with
acquiring it twice.
"""
# All messages to or from the instrument end with this character.
termination_character = "\n"
# If multi-line responses are recieved, they must end with this string
termination_line = None
ignore_echo = False
port_settings = {}
# If multiple different settings are possible, this list ensures the are all tried
# during autodetection
port_settings_list = None
def __init__(self, port=None, **kwargs):
"""
Set up the serial port and so on.
"""
LOGGER.info("Updating ESI port settings")
LOGGER.debug(kwargs)
self.port_settings.update(kwargs)
LOGGER.info("Opening ESI connection to port {}".format(port))
self.open(port, False) # Eventually this shouldn't rely on init...
LOGGER.info("Opened ESI connection to port {}".format(port))
[docs]
def open(self, port=None, quiet=True):
"""Open communications with the serial port.
If no port is specified, it will attempt to autodetect. If quiet=True
then we don't warn when ports are opened multiple times.
"""
with self.communications_lock:
if hasattr(self, "_ser") and self._ser.isOpen():
if not quiet:
LOGGER.warning("Attempted to open an already-open port!")
return
if port is None:
port = self.find_port()
if port is None:
raise FileNotFoundError(
"We don't have a serial port to open, meaning you "
"didn't specify a valid port. Are you sure the "
"instrument is connected?"
)
LOGGER.info("Creating serial.Serial instance...")
if isinstance(port, DummySerialDevice):
self._ser = port
else:
self._ser = serial.Serial(port, **self.port_settings)
LOGGER.info(f"Created {self._ser}")
if not self.test_communications():
raise IOError(
"The instrument doesn't seem to be responding. Did you specify"
" the right port?"
)
[docs]
def close(self):
"""Cleanly close the device. Includes proper logging statements."""
LOGGER.debug("Closing serial connection")
with self.communications_lock:
try:
self._ser.close()
except Exception as e:
LOGGER.warning("The serial port didn't close cleanly: {}".format(e))
LOGGER.debug("Connection closed")
def __del__(self):
"""Emergency close the device. Try to avoid having to use this."""
if hasattr(self, "_ser") and self._ser.isOpen():
with self.communications_lock:
print(
"Closing an open serial communication has been triggered by "
"garbage collection!\n"
"Please close this device more sensibly in future."
)
try:
self._ser.close()
except Exception as e:
print("The serial port didn't close cleanly: {}".format(e))
def __enter__(self):
return self
def __exit__(self, _type, value, traceback):
"""Cleanly close down the instrument at end of a with block."""
self.close()
[docs]
def write(self, query_string):
"""Write a string to the serial port"""
with self.communications_lock:
assert self._ser.isOpen(), (
"Attempted to write to the serial port before it was opened. "
"Perhaps you need to call the 'open' method first?"
)
LOGGER.debug("Encoding message...")
data = query_string + self.termination_character
data = data.encode()
LOGGER.debug(f"Writing message: {data}")
self._ser.write(data)
LOGGER.debug("Write successful")
[docs]
def readline(self, timeout=None):
"""Read one line from the serial port."""
self._ser.timeout = timeout
with self.communications_lock:
return (
self._ser.readline()
.decode("utf8")
.replace(self.termination_character, "\n")
)
_communications_lock = None
@property
def communications_lock(self):
"""A lock object used to protect access to the communications bus"""
# This requires initialisation but our init method won't be called - so
# the property initialises it on first use.
if self._communications_lock is None:
self._communications_lock = threading.RLock()
return self._communications_lock
[docs]
def read_multiline(self, termination_line=None, timeout=None):
"""Read one line from the underlying bus. Must be overriden.
This should not need to be reimplemented unless there's a more efficient
way of reading multiple lines than multiple calls to readline()."""
with self.communications_lock:
if termination_line is None:
termination_line = self.termination_line
assert isinstance(termination_line, str), (
"If you perform a multiline query, you must specify a termination line "
"either through the termination_line keyword argument or the "
"termination_line property of the NPSerialInstrument."
)
response = ""
last_line = "dummy"
# read until we get the termination line.
while termination_line not in last_line and len(last_line) > 0:
last_line = self.readline(timeout)
response += last_line
return response
[docs]
def query(self, query_string, multiline=False, termination_line=None, timeout=None):
"""
Write a string to the stage controller and return its response.
It will block until a response is received. The multiline and termination_line
commands will keep reading until a termination phrase is reached.
"""
with self.communications_lock:
self.flush_input_buffer()
LOGGER.debug(f"Writing query: {query_string}")
self.write(query_string)
LOGGER.debug("Waiting for query response...")
if self.ignore_echo: # Needs Implementing for a multiline read!
first_line = self.readline(timeout).strip()
if first_line == query_string:
return self.readline(timeout).strip()
else:
LOGGER.info("This command did not echo!!!")
return first_line
if termination_line is not None:
multiline = True
if multiline:
return self.read_multiline(termination_line)
else:
# question: should we strip the final newline?
line = self.readline(timeout).strip()
return line
[docs]
def parsed_query(
self,
query_string,
response_string=r"%d",
re_flags=0,
parse_function=None,
**kwargs,
):
"""
Perform a query, returning a parsed form of the response.
First query the instrument with the given query string, then compare
the response against a template. The template may contain text and
placeholders (e.g. %i and %f for integer and floating point values
respectively). Regular expressions are also allowed - each group is
considered as one item to be parsed. However, currently it's not
supported to use both % placeholders and regular expressions at the
same time.
If placeholders %i, %f, etc. are used, the returned values are
automatically converted to integer or floating point, otherwise you
must specify a parsing function (applied to all groups) or a list of
parsing functions (applied to each group in turn).
"""
response_regex = response_string
def no_parse(x):
"""A null parse function that simply returns the input."""
return x
# tuples of (regex matching placeholder, regex to replace it with, parse
# function)
placeholders = [
(r"%c", r".", no_parse),
# TODO support %cn where n is a number of chars
(r"%(\d+)c", r".{\1}", no_parse),
(r"%d", r"[-+]?\\d+", int),
(r"%[eEfg]", r"[-+]?(?:\\d+(?:\.\\d*)?|\.\\d+)(?:[eE][-+]?\\d+)?", float),
# 0=autodetect base
(r"%i", r"[-+]?(?:0[xX][\\dA-Fa-f]+|0[0-7]*|\\d+)", lambda x: int(x, 0)),
# 8 means octal
(r"%o", r"[-+]?[0-7]+", lambda x: int(x, 8)),
(r"%s", r"\\s+", no_parse),
(r"%u", r"\\d+", int),
# 16 forces hexadecimal
(r"%[xX]", r"[-+]?(?:0[xX])?[\\dA-Fa-f]+", lambda x: int(x, 16)),
]
matched_placeholders = []
for placeholder, regex, parse_fun in placeholders:
# substitute regex for placeholder
response_regex = re.sub(placeholder, "(" + regex + ")", response_regex)
# save the positions of the placeholders
matched_placeholders.extend(
[
(parse_fun, m.start())
for m in re.finditer(placeholder, response_string)
]
)
if parse_function is None:
parse_function = [
f for f, s in sorted(matched_placeholders, key=lambda m: m[1])
] # order parse functions by their occurrence in the original string
if not hasattr(parse_function, "__iter__"):
parse_function = [parse_function] # make sure it's a list.
reply = self.query(query_string, **kwargs) # do the query
# if match this could be because another response entered the buffer between
# write and read. Sleep for short while then check if something is now in the
# buffer, while the buffer is not empty repeat regex
waited = False
res = re.search(response_regex, reply, flags=re_flags)
while res is None:
if not waited:
time.sleep(0.1)
waited = True
original_reply = reply
if self._ser.inWaiting():
reply = self.readline().strip()
res = re.search(response_regex, reply, flags=re_flags)
if res is not None:
LOGGER.warning(
"Query suceeded after initially receieving unmatched response "
f"('{original_reply}') to '{query_string}'. "
f"Match pattern /{response_string}/ "
"(generated regex /{response_regex}/)"
)
else:
raise ValueError(
"Stage response to '{query_string}' ('{original_reply}') wasn't "
"matched by /{response_string}/ "
"(generated regex /{response_regex}/)"
)
try:
parsed_result = [
f(g) for f, g in zip(parse_function, res.groups(), strict=True)
] # try to apply each parse function to its argument
if len(parsed_result) == 1:
return parsed_result[0]
else:
return parsed_result
except ValueError as e:
LOGGER.info("Matched Groups: {}".format(res.groups()))
LOGGER.info("Parsing Functions {}:".format(parse_function))
raise ValueError(
f"Stage response to {query_string} ('{reply}') couldn't be parsed by "
"the supplied function"
) from e
[docs]
def int_query(self, query_string, **kwargs):
"""Perform a query and return the result(s) as integer(s) (see parsedQuery)"""
return self.parsed_query(query_string, "%d", **kwargs)
[docs]
def float_query(self, query_string, **kwargs):
"""Perform a query and return the result(s) as float(s) (see parsedQuery)"""
return self.parsed_query(query_string, "%f", **kwargs)
[docs]
def test_communications(self):
"""Check if the device is available on the current port.
This should be overridden by subclasses. Assume the port has been
successfully opened and the settings are as defined by self.port_settings.
Usually this function sends a command and checks for a known reply."""
with self.communications_lock:
return True
[docs]
def find_port(self):
"""Find the first serial port where we can establish communications.
Iterate through the available serial ports and query them to see
if our instrument is there. If a list of settings is provided in
`self.port_settings_list` we will try each configuration in turn.
"""
print("Auto-scanning ports")
if not self.port_settings_list:
self.port_settings_list = [self.port_settings]
with self.communications_lock:
success = False
# Loop through serial ports, apparently 256 is the limit?!
for port_name, _, _ in list_serial_ports():
for port_settings in self.port_settings_list:
self.port_settings = port_settings
try:
LOGGER.debug(
f"Trying port {port_name} with settings {port_settings}"
)
self.open(port_name)
success = True
LOGGER.info("Success!")
except Exception:
LOGGER.debug(
f"Failed to connect to port {port_name} with settings "
f"{port_settings}"
)
finally:
self.close()
if success:
# again, make sure this happens *after* closing the port
break
if success:
# again, make sure this happens *after* closing the port
break
if success:
return port_name
else:
return None
[docs]
class OptionalModule(object):
"""This allows a `ExtensibleSerialInstrument` to have optional features.
OptionalModule is designed as a base class
for interfacing with optional modules which may or may not be included with
the serial instrument, and can be added or removed at run-time.
"""
def __init__(
self, available, parent=None, module_type="Undefined", model="Generic"
):
assert type(available) is bool, (
"Option module availablity should be a boolean not a {}".format(
type(available)
)
)
self._available = available
self._parent = parent
assert type(module_type) is str, (
"Option module type should be a string not a {}".format(type(module_type))
)
self.module_type = module_type
if available:
assert type(model) is str, (
"Option module type should be a string not a {}".format(type(model))
)
self.model = model
else:
self.model = None
@property
def available(self):
return self._available
[docs]
def confirm_available(self):
"""Check if module is available.
Raises exception if not available.
"""
assert self._available, 'No "{}" supported on firmware'.format(self.module_type)
[docs]
def describe(self):
"""Consistently spaced desciption for listing modules"""
return self.module_type + " " * (25 - len(self.module_type)) + "- " + self.model
[docs]
class QueriedProperty(object):
"""A Property interface that reads and writes from the instrument on the bus.
This returns a property-like (i.e. a descriptor) object. You can use it
in a class definition just like a property. The property it creates will
interact with the instrument over the communication bus to set and retrieve
its value. It uses calls to `ExtensibleSerialInstrument.parsed_query` to set or
get the value of the property.
`QueriedProperty` can be used to define properties on a `ExtensibleSerialInstrument`
or an `OptionalModule` (in which case the `ExtensibleSerialInstrument.parsed_query`
method of the parent object will be used).
Arguments:
:get_cmd:
the string sent to the instrument to obtain the value
:set_cmd:
the string used to set the value (use {} or % placeholders)
:validate:
a list of allowable values
:valrange:
a maximum and minimum value
:fdel:
a function to call when it's deleted
:doc:
the docstring
:response_string:
supply a % code (as you would for response_string in a
``ExtensibleSerialInstrument.parsed_query``)
:ack_writes:
set to "readline" to discard a line of input after writing.
"""
def __init__(
self,
get_cmd=None,
set_cmd=None,
validate=None,
valrange=None,
fdel=None,
doc=None,
response_string=None,
ack_writes="no",
):
self.response_string = response_string
self.get_cmd = get_cmd
self.set_cmd = set_cmd
self.validate = validate
self.valrange = valrange
self.fdel = fdel
self.ack_writes = ack_writes
self.__doc__ = doc
# TODO: standardise the return (single value only vs parsed result), consider bool
def __get__(self, obj, objtype=None):
if issubclass(type(obj), OptionalModule):
obj.confirm_available()
obj = obj._parent
if obj is None:
return self
assert issubclass(type(obj), ExtensibleSerialInstrument)
if self.get_cmd is None:
raise AttributeError("unreadable attribute")
# Allow certain "magic" values to set the response string
for key, val in [("float", r"%f"), ("int", r"%d")]:
if self.response_string == key:
self.response_string = val
if self.response_string in ["bool", "raw", None]:
value = obj.query(self.get_cmd)
if self.response_string == "bool":
value = bool(value)
else:
value = obj.parsed_query(self.get_cmd, self.response_string)
return value
def __set__(self, obj, value):
if issubclass(type(obj), OptionalModule):
obj.confirm_available()
obj = obj._parent
assert issubclass(type(obj), ExtensibleSerialInstrument)
if self.set_cmd is None:
raise AttributeError("can't set attribute")
if self.validate is not None and value not in self.validate:
raise ValueError(
f"invalid value supplied - value must be one of {self.validate}"
)
if self.valrange:
low, high = self.valrange
if not low <= value <= high:
raise ValueError(
f"invalid value supplied - value must be in the range {low}-{high}"
)
message = self.set_cmd
if "{0" in message:
message = message.format(value)
elif "%" in message:
message = message % value
obj.write(message)
if self.ack_writes == "readline":
obj.readline()
def __delete__(self, obj):
if self.fdel is None:
raise AttributeError("can't delete attribute")
self.fdel(obj)