Source code for sangaboard.sangaboard

"""
Sangaboard module.

This Python code allows control of the Sangaboard
"""

# SPDX-License-Identifier: GPL-3.0-only
# For copyright and authorship information, see the Git history at:
# https://gitlab.com/openflexure/sangaboard/pysangaboard

import logging
import re
import time
from typing import List, Union

import semantic_version
from serial import EIGHTBITS, PARITY_NONE, STOPBITS_ONE

from .extensible_serial_instrument import (
    ExtensibleSerialInstrument,
    OptionalModule,
    QueriedProperty,
)

LOGGER = logging.getLogger(__name__)


[docs] class Sangaboard(ExtensibleSerialInstrument): """Class managing serial communications with a Sangaboard The `Sangaboard` class handles setting up communications with the sangaboard, wraps the various serial commands in Python methods, and provides iterators and context managers to simplify opening/closing the hardware connection and some other tasks like conducting a linear scan. Arguments to the constructor are passed to the constructor of :class:`Sangaboard.extensible_serial_instrument.ExtensibleSerialInstrument`, most likely the only one necessary is `port` which should be set to the serial port you will use to communicate with the motor controller. This class can be used as a context manager, i.e. it's encouraged to use it as:: with Sangaboard() as sb: sb.move_rel([1000,0,0]) In that case, the serial port will automatically be closed at the end of the block, even if an error occurs. Otherwise, be sure to call the :meth:`~.ExtensibleSerialInstrument.close()` method to release the serial port. """ valid_firmwares = [(1, 0), (0, 5), (0, 4)] """List of valid and deprecated firmwares, for communication checks. Each is a tuple of ``(major version, minor version)``. So v1.0 is ``(1, 0)``. """ deprecated_firmwares = [(0, 4)] """Firmwares that are valid for communication but throw deprecation wanrings.""" # These are the settings for the sangaboards serial port, and can usually be left # as default. port_settings = { "baudrate": 115_200, "bytesize": EIGHTBITS, "parity": PARITY_NONE, "stopbits": STOPBITS_ONE, } # Since Sangaboard v4, we need to also try using a lower baud rate when we # autodetect. port_settings_list = [ { "baudrate": 115_200, "bytesize": EIGHTBITS, "parity": PARITY_NONE, "stopbits": STOPBITS_ONE, }, { "baudrate": 115_200, "bytesize": EIGHTBITS, "parity": PARITY_NONE, "stopbits": STOPBITS_ONE, }, ] # position, step time and ramp time are get/set using simple serial commands. position = QueriedProperty( get_cmd="p?", response_string=r"%d %d %d", doc="Get the position of the axes as a tuple of 3 integers.", ) step_time = QueriedProperty( get_cmd="dt?", set_cmd="dt %d", response_string="minimum step delay %d", doc=( "Get or set the minimum time between steps of the motors in microseconds." "\n\nThe step time is ``1000000/max speed`` in steps/second. It is saved " "to EEPROM on the sangaboard, so it will be persistent even if the motor " "controller is turned off." ), ) ramp_time = QueriedProperty( get_cmd="ramp_time?", set_cmd="ramp_time %d", response_string="ramp time %d", doc=( "Get or set the acceleration time in microseconds.\n\n" "The motors will accelerate/decelerate between stationary and maximum " "speed over `ramp_time` microseconds. Zero means the motor runs at full " "speed initially, with no accleration control. Small moves may last less " "than `2*ramp_time`, in which case the acceleration will be the same, but " "the motor will never reach full speed. It is saved to EEPROM on the " "sangaboard, so it will be persistent even if the motor controller is " "turned off." ), ) # The names of the sangaboard's axes. NB this also defines the number of axes axis_names = ("x", "y", "z") # Once initialised, `firmware` is a string that identifies the firmware version firmware = None supported_light_sensors = ["TSL2591", "ADS1115"] def __init__(self, port=None, timeout: int = 2, **kwargs): """Create a sangaboard object. Arguments are passed to the constructor of :class:`Sangaboard.extensible_serial_instrument.ExtensibleSerialInstrument`, most likely the only one necessary is `port` which should be set to the serial port you will use to communicate with the motor controller. That's the first argument so it doesn't need to be named. Note that the list of port settings is only used when autodetecting a port. If you want to use an alternative baud rate, for example, it can be passed as an additional keyword argument. """ # Initialise basic serial instrument with specified LOGGER.info(f"Initialising ExtensibleSerialInstrument on port {port}") ExtensibleSerialInstrument.__init__(self, port, **kwargs) try: # Make absolutely sure that whatever port we're on is valid LOGGER.info("Checking valid firmware...") self.check_valid_firmware() # Bit messy: Defining all valid modules as not available, then overwriting # with available information if available. LOGGER.info("Loading modules...") self.light_sensor = LightSensor(False) for module in self.list_modules(): module_type = module.split(":")[0].strip() module_model = module.split(":")[1].strip() if module_type.startswith("Light Sensor"): if module_model in self.supported_light_sensors: self.light_sensor = LightSensor( True, parent=self, model=module_model ) else: LOGGER.warning( 'Light sensor model "%s" not recognised.' % (module_model) ) elif module_type.startswith("Endstops"): self.endstops = Endstops(True, parent=self, model=module_model) elif module_type.startswith("Illumination"): self.illumination = Illumination(True, parent=self) else: LOGGER.warning( 'Module type "{}" not recognised.'.format(module_type) ) self.board = self.query("board", timeout=timeout).rstrip() if self.version_tuple[0] == 1: # Enable v0.5-compatible behaviour on newer firmware LOGGER.info( "Enabling backwards-compatible behaviour on new Sangaboard firmware" ) self.query("blocking_moves true") except Exception as e: # If an error occurred while setting up (e.g. because the board isn't # connected or something) make sure we close the serial port cleanly # (otherwise it hangs open). self.close() LOGGER.error(e) LOGGER.warning( "You may need to update the firmware running on the Sangaboard." ) raise e
[docs] def test_communications(self): """ Overrides superclass, used in self.open(), and port scanning """ LOGGER.info("Testing communication to SangaBoard") return self.check_valid_firmware()
[docs] def check_valid_firmware(self): LOGGER.info("Running firmware checks...") # Request firmware version from the board self.firmware = self.query("version", timeout=2).rstrip() if not self.firmware: LOGGER.warning("No firmware version string was returned.") return False LOGGER.info("Firmware response: {}".format(self.firmware)) # Check for valid firmware string. Version group is matched later with # semantic_version match = re.match( r"(Sangaboard Firmware|OpenFlexure Motor Board) v(.*)\s*$", self.firmware ) if not match: LOGGER.warning( f"Version string '{self.firmware}' was not recognised as a supported " "version." ) return False self.firmware_version = match.group(2) try: # We use "coerce" to allow two-element versions from old firmware # otherwise "0.5" is not a valid version string. parsed_version = semantic_version.Version.coerce(self.firmware_version) self.parsed_firmware_version = parsed_version except ValueError: LOGGER.warning( f"Could not parse version string '{self.firmware_version}' as a " "semantic version." ) return False self.version_tuple = (parsed_version.major, parsed_version.minor) if parsed_version.prerelease: LOGGER.warning( "You are using a Sangaboard with prerelease firmware " "'{self.firmware_version}'" ) if self.version_tuple not in Sangaboard.valid_firmwares: LOGGER.warning( "This version of the Python module requires firmware v0.5 (with legacy " "support for v0.4)\n" f"The board returned '{self.firmware}' which was parsed as version " f"{match.groups()}" ) return False if self.version_tuple in Sangaboard.deprecated_firmwares: LOGGER.warning( "Warning this Sangaboard is using v0.4 of the firmware, this will " "soon be removed from support! Please consider updating the Sangaboard " "firmware." ) return True
[docs] def move_rel(self, displacement, axis=None): """Make a relative move. displacement: integer or array/list of 3 integers axis: None (for 3-axis moves) or one of 'x','y','z' """ if axis is not None: assert axis in self.axis_names, "axis must be one of {}".format( self.axis_names ) self.query("mr{} {}".format(axis, int(displacement))) else: # TODO: assert displacement is 3 integers self.query("mr {} {} {}".format(*list(displacement)))
[docs] def release_motors(self): """De-energise the stepper motor coils""" self.query("release")
[docs] def zero_position(self): """Set the current position to zero""" self.query("zero")
[docs] def move_abs(self, final, **kwargs): """Make an absolute move to a position Note the sangaboard only accepts relative move commands, so this first queries the board for its position, then instructs it to make about relative move. """ rel_mov = [ f_pos - i_pos for f_pos, i_pos in zip(final, self.position, strict=True) ] return self.move_rel(rel_mov, **kwargs)
[docs] def query(self, message, *args, **kwargs): """Send a message and read the response. See ExtensibleSerialInstrument.query() """ time.sleep(0.001) # This is to protect the stage from us talking too fast! return ExtensibleSerialInstrument.query(self, message, *args, **kwargs)
[docs] def list_modules(self): """Return a list of strings detailing optional modules. Each module will correspond to a string of the form ``Module Name: Model`` """ modules = self.query( "list_modules", multiline=True, termination_line="--END--\r\n" ).split("\r\n")[:-2] return [str(module) for module in modules]
[docs] def print_help(self): """Print the stage's built-in help message.""" print(self.query("help", multiline=True, termination_line="--END--\r\n"))
[docs] class LightSensor(OptionalModule): """An optional module giving access to the light sensor. If a light sensor is enabled in the motor controller's firmware, then the :class:`sangaboard.Sangaboard` will gain an optional module which is an instance of this class. It can be used to access the light sensor (usually via the I2C bus). """ valid_gains = None _valid_gains_int = None integration_time = QueriedProperty( get_cmd="light_sensor_integration_time?", set_cmd="light_sensor_integration_time %d", response_string="light sensor integration time %d ms", doc="Get or set the integration time of the light sensor in milliseconds.", ) intensity = QueriedProperty( get_cmd="light_sensor_intensity?", response_string="%d", doc=( "Read the current intensity measured by the light sensor (arbitrary units)." ), ) def __init__(self, available, parent=None, model="Generic"): super(LightSensor, self).__init__( available, parent=parent, module_type="LightSensor", model=model ) if available: self.valid_gains = self.__get_gain_values() self._valid_gains_int = [int(g) for g in self.valid_gains] @property def gain(self): """ "Get or set the current gain value of the light sensor. Valid gain values are defined in the `valid_gains` property, and should be floating-point numbers. """ self.confirm_available() gain = self._parent.query("light_sensor_gain?") matches = re.search(r"[0-9\.]+(?=x)", gain) assert matches is not None, 'Cannot read gain string: "{}"'.format(gain) # gain is a float as non integer gains exist but are set with floor of value return float(matches.group()) @gain.setter def gain(self, val): self.confirm_available() assert int(val) in self._valid_gains_int, ( "Gain {} not valid must be one of: {}".format(val, self.valid_gains) ) gain = self._parent.query("light_sensor_gain %d" % (int(val))) matches = re.search(r"[0-9\.]+(?=x)", gain) assert matches is not None, 'Cannot read gain string: "{}"'.format(gain) # gain is a float as non integer gains exist but are set with floor of value assert int(val) == int(float(matches.group())), ( 'Gain of {} set, "{}" returned'.format(val, gain) ) def __get_gain_values(self): """Read the allowable values for the light sensor's gain. This function will attempt to return a list of floating-point numbers which may be used as values of the `gain` property. If the stage returns non-floating-point values, the list will be of strings. """ self.confirm_available() gains = self._parent.query("light_sensor_gain_values?") try: matches = re.findall(r"[0-9\.]+(?=x)", gains) return [float(gain) for gain in matches] except ValueError: # Fall back to strings if we don't get floats (unlikely) gain_strings = gains[20:].split(", ") return gain_strings
[docs] class Endstops(OptionalModule): """An optional module for use with endstops. If endstops are installed in the firmware the :class:`sangaboard.Sangaboard` will gain an optional module which is an instance of this class. It can be used to retrieve the type, state of the endstops, read and write maximum positions, and home. """ installed = [] """List of installed endstop types (min, max, soft)""" def __init__(self, available, parent=None, model="min"): super(Endstops, self).__init__(available, parent=parent, model="Endstops") self.installed = model.split(" ") status = QueriedProperty( get_cmd="endstops?", response_string=r"%d %d %d", doc=( "Get endstops status as {-1,0,1} for {min,no,max} endstop triggered for " "each axis" ), ) maxima = QueriedProperty( get_cmd="max_p?", set_cmd="max %d %d %d", response_string="%d %d %d", doc=( "Vector of maximum positions, homing to max endstops will measure this, " "can be set to a known value for use with max only and min+soft endstops" ), )
[docs] def home(self, direction="min", axes=("x", "y", "z")): """Home given/all axes in the given direction (min/max/both) :param direction: one of {min,max,both} :param axes: list of axes e.g. ['x','y'] """ ax = 0 if "x" in axes: ax += 1 if "y" in axes: ax += 2 if "z" in axes: ax += 4 if direction == "min" or direction == "both": self._parent.query("home_min {}".format(ax)) if direction == "max" or direction == "both": self._parent.query("home_max {}".format(ax))
[docs] class Illumination(OptionalModule): """An optional module for illumination control. If illumination module is built into the firmware the :class:`sangaboard.Sangaboard` will gain an optional module which is an instance of this class. Only one constant current channel is currently supported. """ def __init__(self, available, parent=None): super(Illumination, self).__init__(available, parent=parent, model="Endstops") self._channels = self.channels channels = QueriedProperty( get_cmd="led_channels?", response_string=r"CC:%d PWM:%d", doc="Get the number of supported CC and PWM channels", ) pwm_frequency = QueriedProperty( get_cmd="led_frequency?", set_cmd="led_frequency %d", response_string="PWM Frequency:%dHz", doc=( "Get or set the PWM channels frequency\n\n" "Only supported on RP2040 based platforms. Maximum 1MHz, resolution only " "~6bits at that frequency" ), ) cc_led = QueriedProperty( get_cmd="led_cc?", set_cmd="led_cc %f", response_string="CC LED:%f", doc="Get or set the CC value 0-1.0\n\n" + "On Sangaboard v0.5 this corresponds to 0-100mA with 32 steps.", )
[docs] def set_pwm_led(self, channel: int, value: float): if channel > self._channels[1]: LOGGER.error("Requested invalid PWM channel {channel}") raise ValueError("Invalid PWM channel {channel}") if value < 0: value = 0 self._parent.query(f"led_pwm {channel} {value}")
[docs] def get_pwm_led(self, channel: int = None) -> Union[float, List[float]]: if channel is not None and channel > self._channels[1]: LOGGER.error("Requested invalid PWM channel {channel}") raise ValueError("Invalid PWM channel {channel}") return_string = "PWM:" for _i in range(self._channels[1]): return_string += "%f;" pwm_values = self._parent.parsed_query( "led_pwm?", response_string=return_string ) return pwm_values[channel] if channel is not None else pwm_values