Source code for sangaboard.testing
"""Classes that enable unit testing by mocking the serial responses."""
import re
import time
[docs]
class DummySerialDevice:
"""This class exists to allow unit testing without serial port hardware.
It can be passed to an ``ExtensibleSerialInstrument`` in place of the
serial port name, and will emulate a device using stored responses.
It works on a line-by-line basis, so it will only do anything once
it gets a termination character (by default, newline). It tries to give
the same interface as ``pyserial.Serial`` at least as far as the
methods used by ``ExtensibleSerialInstrument`` go.
The ``timeout`` property will be used to cause delays if there is
no response to give - but infinite timeouts raise an exception rather
than attempt to do anything clever with threads.
You can set ``print_buffers`` to True in order to make it print the
input and output buffers each time the "port" is written to or read
from.
"""
termination_character = "\n"
responses = ()
unmatched_response = "error!\n"
timeout = 0
print_buffers = False
read_buffer = b""
write_buffer = b""
def __init__(self):
pass
# Ignore N802 (function name not snake case) as this is mocking a `Serial` function.
[docs]
def isOpen(self): # noqa: N802
return True
[docs]
def write(self, bytes_buffer):
"""Accept data being written to the port
This is data being sent out of the serial port, and is
stored in the ``write buffer``.
"""
self.write_buffer += bytes_buffer
if self.print_buffers:
print("Write buffer: [[{}]]".format(self.write_buffer))
# Process messages once we've received a whole line
index = self.write_buffer.find(self.termination_character.encode())
if index > 0:
self.respond_to(self.write_buffer[:index].decode("utf8"))
self.write_buffer = self.write_buffer[index + 1 :]
# Ignore N802 (function name not snake case) as this is mocking a `Serial` function.
[docs]
def inWaiting(self): # noqa: N802
"""Return the number of bytes in the input buffer
Note the input buffer is for bytes being read from the port
"""
return len(self.read_buffer)
# Ignore N802 (function name not snake case) as this is mocking a `Serial` function.
[docs]
def register_response(self, regular_expression, response_function):
"""Match a query and respond to it with a function.
See ``respond_to`` for details of how this works.
"""
if isinstance(response_function, str):
response_str = response_function
# handling that the response function might be a string is easiest with
# assigning a lambda function to the existing variable.
response_function = lambda _groups: response_str # noqa: E731
self.responses += ((regular_expression, response_function),)
[docs]
def remove_response(self, regex_to_remove):
"""Remove a previously registered response
The regular expression string must be exactly the same as the
one you are trying to remove.
"""
self.responses = tuple(
(regex, response)
for (regex, response) in self.responses
if regex != regex_to_remove
)
[docs]
def replace_response(self, regex, response_function):
"""Remove a response and replace it with another.
Order is not preserved.
"""
self.remove_response(regex)
self.register_response(regex, response_function)
[docs]
def respond_to(self, message):
"""Do something in response to a message.
Each time we receive a termination character, we attempt to match
the message (which doesn't include the termination character) with
the regular expressions in the ``responses`` property. The first
one that matches is used.
The regular expression match object's ``groups()`` value is passed
to the function, which can then use it to return its response.
"""
for regular_expression, response_function in self.responses:
match = re.match(regular_expression, message)
if match:
self.read_buffer += response_function(match.groups()).encode()
return
self.read_buffer += self.unmatched_response.encode()
[docs]
def readline(self):
"""Emulate reading a line from the serial port."""
if self.print_buffers:
print("Read buffer [[{}]]".format(self.read_buffer))
index = self.read_buffer.find(self.termination_character.encode())
if index > 0:
line = self.read_buffer[: index + 1]
self.read_buffer = self.read_buffer[index + 1 :]
return line
else:
if self.timeout:
time.sleep(self.timeout)
else:
raise IOError(
"readline was called with no timeout, and there's nothing to read."
)
[docs]
def read(self, size=1):
"""Emulate reading some bytes from the serial port."""
if len(self.read_buffer) < size:
chunk = self.read_buffer[:size]
self.read_buffer = self.read_buffer[size:]
return chunk
else:
raise IOError(
"Currently this dummy class can't cope with timeouts properly"
)
[docs]
def open(self):
"""Open the serial port (currently does nothing)"""
pass
[docs]
def close(self):
"""Close the serial port (currently does nothing)"""
pass
[docs]
class DummySangaboardDevice(DummySerialDevice):
"""Create a dummy port, with the minimal responses to work
When the Sangaboard class is initialised, it checks the board,
the version, and the optional modules. All of those commands
must therefore return reasonable values.
"""
blocking_moves = False
def __init__(self, firmware_version="Sangaboard Firmware v0.5.1"):
self.pwm_channels_num = 2
self.pwm_frequency = 64000
self.pwm_vals = [0.0, 0.0, 0.0]
self.cc_val = 0.0
self.register_response("version", f"{firmware_version}\r\n")
self.register_response("list_modules", "Illumination: default\r\n--END--\r\n")
self.register_response("board", "Sangaboard v0.5\r\n")
self.register_response("led_channels", self.pwm_channels)
self.register_response("led_frequency\\?? ?([0-9?]+)", self.led_frequency)
self.register_response("led_pwm\\?? ?([0-9?])? ?([0-9.]+)?", self.led_pwm)
self.register_response("led_cc\\?? ?([0-9.?]+)?", self.led_cc)
self.register_response("blocking_moves (false|true)", self.set_blocking_moves)
self.print_buffers = True
[docs]
def set_blocking_moves(self, groups):
self.blocking_moves = "true" in groups[0]
return "done\r\n"
[docs]
def pwm_channels(self, _groups):
return f"CC:1 PWM:{self.pwm_channels_num}\r\n"
[docs]
def led_frequency(self, groups):
if groups[0] is not None and groups[0] != "?" and groups[0] != "":
self.pwm_frequency = int(groups[0])
return f"PWM Frequency:{self.pwm_frequency}Hz\r\n"
[docs]
def led_pwm(self, groups):
if groups[0] is not None and groups[0] != "?" and groups[0] != "":
index = int(groups[0])
value = float(groups[1])
self.pwm_vals[index] = value
return f"PWM:{index}:{value}\r\n"
res = "PWM:"
for i in range(self.pwm_channels_num):
res += f"{self.pwm_vals[i]};"
res += "\r\n"
return res
[docs]
def led_cc(self, groups):
print(groups)
if groups[0] is not None and groups[0] != "?" and groups[0] != "":
self.cc_val = float(groups[0])
return f"CC LED:{self.cc_val}\r\n"