"""
Sangaboard module.
This Python code allows control of the Sangaboard
"""
# SPDX-License-Identifier: GPL-3.0-only
# For copyright and authorship information, see the Git history at:
# https://gitlab.com/openflexure/sangaboard/pysangaboard
import logging
import re
import time
from typing import List, Union
import semantic_version
from serial import EIGHTBITS, PARITY_NONE, STOPBITS_ONE
from .extensible_serial_instrument import (
ExtensibleSerialInstrument,
OptionalModule,
QueriedProperty,
)
LOGGER = logging.getLogger(__name__)
[docs]
class Sangaboard(ExtensibleSerialInstrument):
"""Class managing serial communications with a Sangaboard
The `Sangaboard` class handles setting up communications with the sangaboard,
wraps the various serial commands in Python methods, and provides iterators and
context managers to simplify opening/closing the hardware connection and some
other tasks like conducting a linear scan.
Arguments to the constructor are passed to the constructor of
:class:`Sangaboard.extensible_serial_instrument.ExtensibleSerialInstrument`,
most likely the only one necessary is `port` which should be set to the serial port
you will use to communicate with the motor controller.
This class can be used as a context manager, i.e. it's encouraged to use it as::
with Sangaboard() as sb:
sb.move_rel([1000,0,0])
In that case, the serial port will automatically be closed at the end of the block,
even if an error occurs. Otherwise, be sure to call the
:meth:`~.ExtensibleSerialInstrument.close()` method to release the serial port.
"""
valid_firmwares = [(1, 0), (0, 5), (0, 4)]
"""List of valid and deprecated firmwares, for communication checks.
Each is a tuple of ``(major version, minor version)``. So v1.0 is ``(1, 0)``.
"""
deprecated_firmwares = [(0, 4)]
"""Firmwares that are valid for communication but throw deprecation wanrings."""
# These are the settings for the sangaboards serial port, and can usually be left
# as default.
port_settings = {
"baudrate": 115_200,
"bytesize": EIGHTBITS,
"parity": PARITY_NONE,
"stopbits": STOPBITS_ONE,
}
# Since Sangaboard v4, we need to also try using a lower baud rate when we
# autodetect.
port_settings_list = [
{
"baudrate": 115_200,
"bytesize": EIGHTBITS,
"parity": PARITY_NONE,
"stopbits": STOPBITS_ONE,
},
{
"baudrate": 115_200,
"bytesize": EIGHTBITS,
"parity": PARITY_NONE,
"stopbits": STOPBITS_ONE,
},
]
# position, step time and ramp time are get/set using simple serial commands.
position = QueriedProperty(
get_cmd="p?",
response_string=r"%d %d %d",
doc="Get the position of the axes as a tuple of 3 integers.",
)
step_time = QueriedProperty(
get_cmd="dt?",
set_cmd="dt %d",
response_string="minimum step delay %d",
doc=(
"Get or set the minimum time between steps of the motors in microseconds."
"\n\nThe step time is ``1000000/max speed`` in steps/second. It is saved "
"to EEPROM on the sangaboard, so it will be persistent even if the motor "
"controller is turned off."
),
)
ramp_time = QueriedProperty(
get_cmd="ramp_time?",
set_cmd="ramp_time %d",
response_string="ramp time %d",
doc=(
"Get or set the acceleration time in microseconds.\n\n"
"The motors will accelerate/decelerate between stationary and maximum "
"speed over `ramp_time` microseconds. Zero means the motor runs at full "
"speed initially, with no accleration control. Small moves may last less "
"than `2*ramp_time`, in which case the acceleration will be the same, but "
"the motor will never reach full speed. It is saved to EEPROM on the "
"sangaboard, so it will be persistent even if the motor controller is "
"turned off."
),
)
# The names of the sangaboard's axes. NB this also defines the number of axes
axis_names = ("x", "y", "z")
# Once initialised, `firmware` is a string that identifies the firmware version
firmware = None
supported_light_sensors = ["TSL2591", "ADS1115"]
def __init__(self, port=None, timeout: int = 2, **kwargs):
"""Create a sangaboard object.
Arguments are passed to the constructor of
:class:`Sangaboard.extensible_serial_instrument.ExtensibleSerialInstrument`,
most likely the only one necessary is `port` which should be set to the serial
port you will use to communicate with the motor controller. That's the first
argument so it doesn't need to be named.
Note that the list of port settings is only used when autodetecting a port.
If you want to use an alternative baud rate, for example, it can be passed as
an additional keyword argument.
"""
# Initialise basic serial instrument with specified
LOGGER.info(f"Initialising ExtensibleSerialInstrument on port {port}")
ExtensibleSerialInstrument.__init__(self, port, **kwargs)
try:
# Make absolutely sure that whatever port we're on is valid
LOGGER.info("Checking valid firmware...")
self.check_valid_firmware()
# Bit messy: Defining all valid modules as not available, then overwriting
# with available information if available.
LOGGER.info("Loading modules...")
self.light_sensor = LightSensor(False)
for module in self.list_modules():
module_type = module.split(":")[0].strip()
module_model = module.split(":")[1].strip()
if module_type.startswith("Light Sensor"):
if module_model in self.supported_light_sensors:
self.light_sensor = LightSensor(
True, parent=self, model=module_model
)
else:
LOGGER.warning(
'Light sensor model "%s" not recognised.' % (module_model)
)
elif module_type.startswith("Endstops"):
self.endstops = Endstops(True, parent=self, model=module_model)
elif module_type.startswith("Illumination"):
self.illumination = Illumination(True, parent=self)
else:
LOGGER.warning(
'Module type "{}" not recognised.'.format(module_type)
)
self.board = self.query("board", timeout=timeout).rstrip()
if self.version_tuple[0] == 1:
# Enable v0.5-compatible behaviour on newer firmware
LOGGER.info(
"Enabling backwards-compatible behaviour on new Sangaboard firmware"
)
self.query("blocking_moves true")
except Exception as e:
# If an error occurred while setting up (e.g. because the board isn't
# connected or something) make sure we close the serial port cleanly
# (otherwise it hangs open).
self.close()
LOGGER.error(e)
LOGGER.warning(
"You may need to update the firmware running on the Sangaboard."
)
raise e
[docs]
def test_communications(self):
"""
Overrides superclass, used in self.open(), and port scanning
"""
LOGGER.info("Testing communication to SangaBoard")
return self.check_valid_firmware()
[docs]
def check_valid_firmware(self):
LOGGER.info("Running firmware checks...")
# Request firmware version from the board
self.firmware = self.query("version", timeout=2).rstrip()
if not self.firmware:
LOGGER.warning("No firmware version string was returned.")
return False
LOGGER.info("Firmware response: {}".format(self.firmware))
# Check for valid firmware string. Version group is matched later with
# semantic_version
match = re.match(
r"(Sangaboard Firmware|OpenFlexure Motor Board) v(.*)\s*$", self.firmware
)
if not match:
LOGGER.warning(
f"Version string '{self.firmware}' was not recognised as a supported "
"version."
)
return False
self.firmware_version = match.group(2)
try:
# We use "coerce" to allow two-element versions from old firmware
# otherwise "0.5" is not a valid version string.
parsed_version = semantic_version.Version.coerce(self.firmware_version)
self.parsed_firmware_version = parsed_version
except ValueError:
LOGGER.warning(
f"Could not parse version string '{self.firmware_version}' as a "
"semantic version."
)
return False
self.version_tuple = (parsed_version.major, parsed_version.minor)
if parsed_version.prerelease:
LOGGER.warning(
"You are using a Sangaboard with prerelease firmware "
"'{self.firmware_version}'"
)
if self.version_tuple not in Sangaboard.valid_firmwares:
LOGGER.warning(
"This version of the Python module requires firmware v0.5 (with legacy "
"support for v0.4)\n"
f"The board returned '{self.firmware}' which was parsed as version "
f"{match.groups()}"
)
return False
if self.version_tuple in Sangaboard.deprecated_firmwares:
LOGGER.warning(
"Warning this Sangaboard is using v0.4 of the firmware, this will "
"soon be removed from support! Please consider updating the Sangaboard "
"firmware."
)
return True
[docs]
def move_rel(self, displacement, axis=None):
"""Make a relative move.
displacement: integer or array/list of 3 integers
axis: None (for 3-axis moves) or one of 'x','y','z'
"""
if axis is not None:
assert axis in self.axis_names, "axis must be one of {}".format(
self.axis_names
)
self.query("mr{} {}".format(axis, int(displacement)))
else:
# TODO: assert displacement is 3 integers
self.query("mr {} {} {}".format(*list(displacement)))
[docs]
def release_motors(self):
"""De-energise the stepper motor coils"""
self.query("release")
[docs]
def zero_position(self):
"""Set the current position to zero"""
self.query("zero")
[docs]
def move_abs(self, final, **kwargs):
"""Make an absolute move to a position
Note the sangaboard only accepts relative move commands, so this first
queries the board for its position, then instructs it to make about
relative move.
"""
rel_mov = [
f_pos - i_pos for f_pos, i_pos in zip(final, self.position, strict=True)
]
return self.move_rel(rel_mov, **kwargs)
[docs]
def query(self, message, *args, **kwargs):
"""Send a message and read the response.
See ExtensibleSerialInstrument.query()
"""
time.sleep(0.001) # This is to protect the stage from us talking too fast!
return ExtensibleSerialInstrument.query(self, message, *args, **kwargs)
[docs]
def list_modules(self):
"""Return a list of strings detailing optional modules.
Each module will correspond to a string of the form ``Module Name: Model``
"""
modules = self.query(
"list_modules", multiline=True, termination_line="--END--\r\n"
).split("\r\n")[:-2]
return [str(module) for module in modules]
[docs]
def print_help(self):
"""Print the stage's built-in help message."""
print(self.query("help", multiline=True, termination_line="--END--\r\n"))
[docs]
class LightSensor(OptionalModule):
"""An optional module giving access to the light sensor.
If a light sensor is enabled in the motor controller's firmware, then
the :class:`sangaboard.Sangaboard` will gain an optional
module which is an instance of this class. It can be used to access
the light sensor (usually via the I2C bus).
"""
valid_gains = None
_valid_gains_int = None
integration_time = QueriedProperty(
get_cmd="light_sensor_integration_time?",
set_cmd="light_sensor_integration_time %d",
response_string="light sensor integration time %d ms",
doc="Get or set the integration time of the light sensor in milliseconds.",
)
intensity = QueriedProperty(
get_cmd="light_sensor_intensity?",
response_string="%d",
doc=(
"Read the current intensity measured by the light sensor (arbitrary units)."
),
)
def __init__(self, available, parent=None, model="Generic"):
super(LightSensor, self).__init__(
available, parent=parent, module_type="LightSensor", model=model
)
if available:
self.valid_gains = self.__get_gain_values()
self._valid_gains_int = [int(g) for g in self.valid_gains]
@property
def gain(self):
""" "Get or set the current gain value of the light sensor.
Valid gain values are defined in the `valid_gains` property, and should be
floating-point numbers.
"""
self.confirm_available()
gain = self._parent.query("light_sensor_gain?")
matches = re.search(r"[0-9\.]+(?=x)", gain)
assert matches is not None, 'Cannot read gain string: "{}"'.format(gain)
# gain is a float as non integer gains exist but are set with floor of value
return float(matches.group())
@gain.setter
def gain(self, val):
self.confirm_available()
assert int(val) in self._valid_gains_int, (
"Gain {} not valid must be one of: {}".format(val, self.valid_gains)
)
gain = self._parent.query("light_sensor_gain %d" % (int(val)))
matches = re.search(r"[0-9\.]+(?=x)", gain)
assert matches is not None, 'Cannot read gain string: "{}"'.format(gain)
# gain is a float as non integer gains exist but are set with floor of value
assert int(val) == int(float(matches.group())), (
'Gain of {} set, "{}" returned'.format(val, gain)
)
def __get_gain_values(self):
"""Read the allowable values for the light sensor's gain.
This function will attempt to return a list of floating-point numbers which may
be used as values of the `gain` property. If the stage returns
non-floating-point values, the list will be of strings.
"""
self.confirm_available()
gains = self._parent.query("light_sensor_gain_values?")
try:
matches = re.findall(r"[0-9\.]+(?=x)", gains)
return [float(gain) for gain in matches]
except ValueError:
# Fall back to strings if we don't get floats (unlikely)
gain_strings = gains[20:].split(", ")
return gain_strings
[docs]
class Endstops(OptionalModule):
"""An optional module for use with endstops.
If endstops are installed in the firmware the :class:`sangaboard.Sangaboard`
will gain an optional module which is an instance of this class. It can be used
to retrieve the type, state of the endstops, read and write maximum positions,
and home.
"""
installed = []
"""List of installed endstop types (min, max, soft)"""
def __init__(self, available, parent=None, model="min"):
super(Endstops, self).__init__(available, parent=parent, model="Endstops")
self.installed = model.split(" ")
status = QueriedProperty(
get_cmd="endstops?",
response_string=r"%d %d %d",
doc=(
"Get endstops status as {-1,0,1} for {min,no,max} endstop triggered for "
"each axis"
),
)
maxima = QueriedProperty(
get_cmd="max_p?",
set_cmd="max %d %d %d",
response_string="%d %d %d",
doc=(
"Vector of maximum positions, homing to max endstops will measure this, "
"can be set to a known value for use with max only and min+soft endstops"
),
)
[docs]
def home(self, direction="min", axes=("x", "y", "z")):
"""Home given/all axes in the given direction (min/max/both)
:param direction: one of {min,max,both}
:param axes: list of axes e.g. ['x','y']
"""
ax = 0
if "x" in axes:
ax += 1
if "y" in axes:
ax += 2
if "z" in axes:
ax += 4
if direction == "min" or direction == "both":
self._parent.query("home_min {}".format(ax))
if direction == "max" or direction == "both":
self._parent.query("home_max {}".format(ax))
[docs]
class Illumination(OptionalModule):
"""An optional module for illumination control.
If illumination module is built into the firmware the :class:`sangaboard.Sangaboard`
will gain an optional module which is an instance of this class.
Only one constant current channel is currently supported.
"""
def __init__(self, available, parent=None):
super(Illumination, self).__init__(available, parent=parent, model="Endstops")
self._channels = self.channels
channels = QueriedProperty(
get_cmd="led_channels?",
response_string=r"CC:%d PWM:%d",
doc="Get the number of supported CC and PWM channels",
)
pwm_frequency = QueriedProperty(
get_cmd="led_frequency?",
set_cmd="led_frequency %d",
response_string="PWM Frequency:%dHz",
doc=(
"Get or set the PWM channels frequency\n\n"
"Only supported on RP2040 based platforms. Maximum 1MHz, resolution only "
"~6bits at that frequency"
),
)
cc_led = QueriedProperty(
get_cmd="led_cc?",
set_cmd="led_cc %f",
response_string="CC LED:%f",
doc="Get or set the CC value 0-1.0\n\n"
+ "On Sangaboard v0.5 this corresponds to 0-100mA with 32 steps.",
)
[docs]
def set_pwm_led(self, channel: int, value: float):
if channel > self._channels[1]:
LOGGER.error("Requested invalid PWM channel {channel}")
raise ValueError("Invalid PWM channel {channel}")
if value < 0:
value = 0
self._parent.query(f"led_pwm {channel} {value}")
[docs]
def get_pwm_led(self, channel: int = None) -> Union[float, List[float]]:
if channel is not None and channel > self._channels[1]:
LOGGER.error("Requested invalid PWM channel {channel}")
raise ValueError("Invalid PWM channel {channel}")
return_string = "PWM:"
for _i in range(self._channels[1]):
return_string += "%f;"
pwm_values = self._parent.parsed_query(
"led_pwm?", response_string=return_string
)
return pwm_values[channel] if channel is not None else pwm_values