Validating against log file example#
All communication between PLR and the outside world (the hardware) happens through the io layer. This is a layer below backends, and is responsible for sending and receiving messages to and from the hardware. Schematically,
Frontends <-> backends <-> io <-> hardware
PLR supports capturing all communication in the io layer, both write and read commands. This can later be played back to validate that a protocol has not changed. The key here is that if we send the same commands to the hardware, the hardware will do the same thing. “Reading” data (from the capture file) is useful because some protocols are dynamic and depend on responses from the hardware.
In this notebook, we will run a simple protocol on a robot while capturing all data passing through the io layer. We will then replay the capture file while executing the protocol again to demonstrate how validation works. Finally, we slightly modify the protocol and show that the validation fails.
Defining a simple protocol#
import pylabrobot
from pylabrobot.liquid_handling import LiquidHandler
from pylabrobot.liquid_handling.backends import STAR
from pylabrobot.resources.hamilton import STARDeck
backend = STAR()
lh = LiquidHandler(backend=backend, deck=STARDeck())
from pylabrobot.resources import TIP_CAR_480_A00, HT
tip_car = TIP_CAR_480_A00(name="tip_car")
tip_car[0] = tr = HT(name="ht")
lh.deck.assign_child_resource(tip_car, rails=1)
async def simple_protocol(tips):
await lh.setup()
await lh.pick_up_tips(tips)
await lh.return_tips()
await lh.stop()
Capturing data during protocol execution#
Do a real run first, without validating against a capture file. This will generate the capture file you can later compare against.
While it might seem cumbersome, this actually ensures you have a real working protocol before doing validation. The benefit of using capture files is whenever you change your protocol and have seen it run, you can just grab the capture file and use it for validation. No need to manually write tests.
validation_file = "./validation.json"
pylabrobot.start_capture(validation_file)
await simple_protocol(tr["A1:H1"])
pylabrobot.stop_capture()
Validation file written to validation.json
The validation file is just json:
!head -n15 validation.json
{
"version": "0.1.6",
"commands": [
{
"module": "usb",
"device_id": "[0x8af:0x8000][][]",
"action": "write",
"data": "C0RTid0001"
},
{
"module": "usb",
"device_id": "[0x8af:0x8000][][]",
"action": "read",
"data": "C0RTid0001er00/00rt0 0 0 0 0 0 0 0"
},
Replaying the capture file for validation#
On validation, before calling setup, run pylabrobot.validate
to enable the validation. Pass a capture file that contains the commands we should check against.
Call pylabrobot.end_validation
at the end to make sure there are no remaining commands in the capture file. This marks the end of the validation.
pylabrobot.validate(validation_file)
await simple_protocol(tr["A1:H1"])
pylabrobot.end_validation()
Validation successful!
Failing validation#
When validation is not successful, we use the Needleman-Wunsch algorithm to find the difference between the expected and the actual output.
pylabrobot.validate(validation_file)
await simple_protocol(tr["A2:H2"])
pylabrobot.end_validation()
expected: C0TPid0009xp01179 01179 01179 01179 01179 01179 01179 01179yp1458 1368 1278 1188 1098 1008 0918 0828tm1 1 1 1 1 1 1 1tt01tp2266tz2166th2450td0
actual: C0TPid0009xp01269 01269 01269 01269 01269 01269 01269 01269yp1458 1368 1278 1188 1098 1008 0918 0828tm1 1 1 1 1 1 1 1tt01tp2266tz2166th2450td0
^^ ^^ ^^ ^^ ^^ ^^ ^^ ^^
---------------------------------------------------------------------------
ValidationError Traceback (most recent call last)
Cell In[7], line 2
1 pylabrobot.validate(validation_file)
----> 2 await simple_protocol(tr["A2:H2"])
3 pylabrobot.end_validation()
Cell In[3], line 3, in simple_protocol(tips)
1 async def simple_protocol(tips):
2 await lh.setup()
----> 3 await lh.pick_up_tips(tips)
4 await lh.return_tips()
5 await lh.stop()
File ~/retro/pylabrobot/pylabrobot/machines/machine.py:35, in need_setup_finished.<locals>.wrapper(*args, **kwargs)
33 if not self.setup_finished:
34 raise RuntimeError("The setup has not finished. See `setup`.")
---> 35 return await func(*args, **kwargs)
File ~/retro/pylabrobot/pylabrobot/liquid_handling/liquid_handler.py:467, in LiquidHandler.pick_up_tips(self, tip_spots, use_channels, offsets, **backend_kwargs)
464 (self.head[channel].commit if success else self.head[channel].rollback)()
466 # trigger callback
--> 467 self._trigger_callback(
468 "pick_up_tips",
469 liquid_handler=self,
470 operations=pickups,
471 use_channels=use_channels,
472 error=error,
473 **backend_kwargs,
474 )
File ~/retro/pylabrobot/pylabrobot/liquid_handling/liquid_handler.py:2204, in LiquidHandler._trigger_callback(self, method_name, error, *args, **kwargs)
2202 callback(self, *args, error=error, **kwargs)
2203 elif error is not None:
-> 2204 raise error
File ~/retro/pylabrobot/pylabrobot/liquid_handling/liquid_handler.py:451, in LiquidHandler.pick_up_tips(self, tip_spots, use_channels, offsets, **backend_kwargs)
449 error: Optional[Exception] = None
450 try:
--> 451 await self.backend.pick_up_tips(ops=pickups, use_channels=use_channels, **backend_kwargs)
452 except Exception as e:
453 error = e
File ~/retro/pylabrobot/pylabrobot/liquid_handling/backends/hamilton/STAR.py:1484, in STAR.pick_up_tips(self, ops, use_channels, begin_tip_pick_up_process, end_tip_pick_up_process, minimum_traverse_height_at_beginning_of_a_command, pickup_method)
1481 pickup_method = pickup_method or tip.pickup_method
1483 try:
-> 1484 return await self.pick_up_tip(
1485 x_positions=x_positions,
1486 y_positions=y_positions,
1487 tip_pattern=channels_involved,
1488 tip_type_idx=ttti,
1489 begin_tip_pick_up_process=begin_tip_pick_up_process,
1490 end_tip_pick_up_process=end_tip_pick_up_process,
1491 minimum_traverse_height_at_beginning_of_a_command=minimum_traverse_height_at_beginning_of_a_command,
1492 pickup_method=pickup_method,
1493 )
1494 except STARFirmwareError as e:
1495 if plr_e := convert_star_firmware_error_to_plr_error(e):
File ~/retro/pylabrobot/pylabrobot/liquid_handling/backends/hamilton/STAR.py:98, in need_iswap_parked.<locals>.wrapper(self, *args, **kwargs)
93 if self.iswap_installed and not self.iswap_parked:
94 await self.park_iswap(
95 minimum_traverse_height_at_beginning_of_a_command=int(self._iswap_traversal_height * 10)
96 )
---> 98 result = await method(self, *args, **kwargs)
100 return result
File ~/retro/pylabrobot/pylabrobot/liquid_handling/backends/hamilton/STAR.py:4062, in STAR.pick_up_tip(self, x_positions, y_positions, tip_pattern, tip_type_idx, begin_tip_pick_up_process, end_tip_pick_up_process, minimum_traverse_height_at_beginning_of_a_command, pickup_method)
4055 assert (
4056 0 <= end_tip_pick_up_process <= 3600
4057 ), "end_tip_pick_up_process must be between 0 and 3600"
4058 assert (
4059 0 <= minimum_traverse_height_at_beginning_of_a_command <= 3600
4060 ), "minimum_traverse_height_at_beginning_of_a_command must be between 0 and 3600"
-> 4062 return await self.send_command(
4063 module="C0",
4064 command="TP",
4065 tip_pattern=tip_pattern,
4066 read_timeout=60,
4067 xp=[f"{x:05}" for x in x_positions],
4068 yp=[f"{y:04}" for y in y_positions],
4069 tm=tip_pattern,
4070 tt=f"{tip_type_idx:02}",
4071 tp=f"{begin_tip_pick_up_process:04}",
4072 tz=f"{end_tip_pick_up_process:04}",
4073 th=f"{minimum_traverse_height_at_beginning_of_a_command:04}",
4074 td=pickup_method.value,
4075 )
File ~/retro/pylabrobot/pylabrobot/liquid_handling/backends/hamilton/base.py:247, in HamiltonLiquidHandler.send_command(self, module, command, auto_id, tip_pattern, write_timeout, read_timeout, wait, fmt, **kwargs)
222 """Send a firmware command to the Hamilton machine.
223
224 Args:
(...)
237 A dictionary containing the parsed response, or None if no response was read within `timeout`.
238 """
240 cmd, id_ = self._assemble_command(
241 module=module,
242 command=command,
(...)
245 **kwargs,
246 )
--> 247 resp = await self._write_and_read_command(
248 id_=id_,
249 cmd=cmd,
250 write_timeout=write_timeout,
251 read_timeout=read_timeout,
252 wait=wait,
253 )
254 if resp is not None and fmt is not None:
255 return self._parse_response(resp, fmt)
File ~/retro/pylabrobot/pylabrobot/liquid_handling/backends/hamilton/base.py:267, in HamiltonLiquidHandler._write_and_read_command(self, id_, cmd, write_timeout, read_timeout, wait)
258 async def _write_and_read_command(
259 self,
260 id_: Optional[int],
(...)
264 wait: bool = True,
265 ) -> Optional[str]:
266 """Write a command to the Hamilton machine and read the response."""
--> 267 self.io.write(cmd.encode(), timeout=write_timeout)
269 if not wait:
270 return None
File ~/retro/pylabrobot/pylabrobot/io/usb.py:325, in USBValidator.write(self, data, timeout)
323 if not next_command.data == data.decode("unicode_escape"):
324 align_sequences(expected=next_command.data, actual=data.decode("unicode_escape"))
--> 325 raise ValidationError("Data mismatch: difference was written to stdout.")
ValidationError: Data mismatch: difference was written to stdout.