Source code for sangaboard.testing

"""Classes that enable unit testing by mocking the serial responses."""

import re
import time


[docs] class DummySerialDevice: """This class exists to allow unit testing without serial port hardware. It can be passed to an ``ExtensibleSerialInstrument`` in place of the serial port name, and will emulate a device using stored responses. It works on a line-by-line basis, so it will only do anything once it gets a termination character (by default, newline). It tries to give the same interface as ``pyserial.Serial`` at least as far as the methods used by ``ExtensibleSerialInstrument`` go. The ``timeout`` property will be used to cause delays if there is no response to give - but infinite timeouts raise an exception rather than attempt to do anything clever with threads. You can set ``print_buffers`` to True in order to make it print the input and output buffers each time the "port" is written to or read from. """ termination_character = "\n" responses = () unmatched_response = "error!\n" timeout = 0 print_buffers = False read_buffer = b"" write_buffer = b"" def __init__(self): pass # Ignore N802 (function name not snake case) as this is mocking a `Serial` function.
[docs] def isOpen(self): # noqa: N802 return True
[docs] def write(self, bytes_buffer): """Accept data being written to the port This is data being sent out of the serial port, and is stored in the ``write buffer``. """ self.write_buffer += bytes_buffer if self.print_buffers: print("Write buffer: [[{}]]".format(self.write_buffer)) # Process messages once we've received a whole line index = self.write_buffer.find(self.termination_character.encode()) if index > 0: self.respond_to(self.write_buffer[:index].decode("utf8")) self.write_buffer = self.write_buffer[index + 1 :]
# Ignore N802 (function name not snake case) as this is mocking a `Serial` function.
[docs] def inWaiting(self): # noqa: N802 """Return the number of bytes in the input buffer Note the input buffer is for bytes being read from the port """ return len(self.read_buffer)
# Ignore N802 (function name not snake case) as this is mocking a `Serial` function.
[docs] def flushInput(self): # noqa: N802 """Flush the input buffer, so there are no previously-arrived bytes when we read a response.""" self.read_buffer = b""
[docs] def register_response(self, regular_expression, response_function): """Match a query and respond to it with a function. See ``respond_to`` for details of how this works. """ if isinstance(response_function, str): response_str = response_function # handling that the response function might be a string is easiest with # assigning a lambda function to the existing variable. response_function = lambda _groups: response_str # noqa: E731 self.responses += ((regular_expression, response_function),)
[docs] def remove_response(self, regex_to_remove): """Remove a previously registered response The regular expression string must be exactly the same as the one you are trying to remove. """ self.responses = tuple( (regex, response) for (regex, response) in self.responses if regex != regex_to_remove )
[docs] def replace_response(self, regex, response_function): """Remove a response and replace it with another. Order is not preserved. """ self.remove_response(regex) self.register_response(regex, response_function)
[docs] def respond_to(self, message): """Do something in response to a message. Each time we receive a termination character, we attempt to match the message (which doesn't include the termination character) with the regular expressions in the ``responses`` property. The first one that matches is used. The regular expression match object's ``groups()`` value is passed to the function, which can then use it to return its response. """ for regular_expression, response_function in self.responses: match = re.match(regular_expression, message) if match: self.read_buffer += response_function(match.groups()).encode() return self.read_buffer += self.unmatched_response.encode()
[docs] def readline(self): """Emulate reading a line from the serial port.""" if self.print_buffers: print("Read buffer [[{}]]".format(self.read_buffer)) index = self.read_buffer.find(self.termination_character.encode()) if index > 0: line = self.read_buffer[: index + 1] self.read_buffer = self.read_buffer[index + 1 :] return line else: if self.timeout: time.sleep(self.timeout) else: raise IOError( "readline was called with no timeout, and there's nothing to read." )
[docs] def read(self, size=1): """Emulate reading some bytes from the serial port.""" if len(self.read_buffer) < size: chunk = self.read_buffer[:size] self.read_buffer = self.read_buffer[size:] return chunk else: raise IOError( "Currently this dummy class can't cope with timeouts properly" )
[docs] def open(self): """Open the serial port (currently does nothing)""" pass
[docs] def close(self): """Close the serial port (currently does nothing)""" pass
[docs] class DummySangaboardDevice(DummySerialDevice): """Create a dummy port, with the minimal responses to work When the Sangaboard class is initialised, it checks the board, the version, and the optional modules. All of those commands must therefore return reasonable values. """ blocking_moves = False def __init__(self, firmware_version="Sangaboard Firmware v0.5.1"): self.pwm_channels_num = 2 self.pwm_frequency = 64000 self.pwm_vals = [0.0, 0.0, 0.0] self.cc_val = 0.0 self.register_response("version", f"{firmware_version}\r\n") self.register_response("list_modules", "Illumination: default\r\n--END--\r\n") self.register_response("board", "Sangaboard v0.5\r\n") self.register_response("led_channels", self.pwm_channels) self.register_response("led_frequency\\?? ?([0-9?]+)", self.led_frequency) self.register_response("led_pwm\\?? ?([0-9?])? ?([0-9.]+)?", self.led_pwm) self.register_response("led_cc\\?? ?([0-9.?]+)?", self.led_cc) self.register_response("blocking_moves (false|true)", self.set_blocking_moves) self.print_buffers = True
[docs] def set_blocking_moves(self, groups): self.blocking_moves = "true" in groups[0] return "done\r\n"
[docs] def pwm_channels(self, _groups): return f"CC:1 PWM:{self.pwm_channels_num}\r\n"
[docs] def led_frequency(self, groups): if groups[0] is not None and groups[0] != "?" and groups[0] != "": self.pwm_frequency = int(groups[0]) return f"PWM Frequency:{self.pwm_frequency}Hz\r\n"
[docs] def led_pwm(self, groups): if groups[0] is not None and groups[0] != "?" and groups[0] != "": index = int(groups[0]) value = float(groups[1]) self.pwm_vals[index] = value return f"PWM:{index}:{value}\r\n" res = "PWM:" for i in range(self.pwm_channels_num): res += f"{self.pwm_vals[i]};" res += "\r\n" return res
[docs] def led_cc(self, groups): print(groups) if groups[0] is not None and groups[0] != "?" and groups[0] != "": self.cc_val = float(groups[0]) return f"CC LED:{self.cc_val}\r\n"