Validating against log file example#

All communication between PLR and the outside world (the hardware) happens through the io layer. This is a layer below backends, and is responsible for sending and receiving messages to and from the hardware. Schematically,

Frontends <-> backends <-> io <-> hardware

PLR supports capturing all communication in the io layer, both write and read commands. This can later be played back to validate that a protocol has not changed. The key here is that if we send the same commands to the hardware, the hardware will do the same thing. “Reading” data (from the capture file) is useful because some protocols are dynamic and depend on responses from the hardware.

In this notebook, we will run a simple protocol on a robot while capturing all data passing through the io layer. We will then replay the capture file while executing the protocol again to demonstrate how validation works. Finally, we slightly modify the protocol and show that the validation fails.

Defining a simple protocol#

import pylabrobot
from pylabrobot.liquid_handling import LiquidHandler
from pylabrobot.liquid_handling.backends import STAR
from pylabrobot.resources.hamilton import STARDeck

backend = STAR()
lh = LiquidHandler(backend=backend, deck=STARDeck())
from pylabrobot.resources import TIP_CAR_480_A00, HT
tip_car = TIP_CAR_480_A00(name="tip_car")
tip_car[0] = tr = HT(name="ht")
lh.deck.assign_child_resource(tip_car, rails=1)
async def simple_protocol(tips):
  await lh.setup()
  await lh.pick_up_tips(tips)
  await lh.return_tips()
  await lh.stop()

Capturing data during protocol execution#

Do a real run first, without validating against a capture file. This will generate the capture file you can later compare against.

While it might seem cumbersome, this actually ensures you have a real working protocol before doing validation. The benefit of using capture files is whenever you change your protocol and have seen it run, you can just grab the capture file and use it for validation. No need to manually write tests.

validation_file = "./validation.json"
pylabrobot.start_capture(validation_file)
await simple_protocol(tr["A1:H1"])
pylabrobot.stop_capture()
Validation file written to validation.json

The validation file is just json:

!head -n15 validation.json
{
  "version": "0.1.6",
  "commands": [
    {
      "module": "usb",
      "device_id": "[0x8af:0x8000][][]",
      "action": "write",
      "data": "C0RTid0001"
    },
    {
      "module": "usb",
      "device_id": "[0x8af:0x8000][][]",
      "action": "read",
      "data": "C0RTid0001er00/00rt0 0 0 0 0 0 0 0"
    },

Replaying the capture file for validation#

On validation, before calling setup, run pylabrobot.validate to enable the validation. Pass a capture file that contains the commands we should check against.

Call pylabrobot.end_validation at the end to make sure there are no remaining commands in the capture file. This marks the end of the validation.

pylabrobot.validate(validation_file)
await simple_protocol(tr["A1:H1"])
pylabrobot.end_validation()
Validation successful!

Failing validation#

When validation is not successful, we use the Needleman-Wunsch algorithm to find the difference between the expected and the actual output.

pylabrobot.validate(validation_file)
await simple_protocol(tr["A2:H2"])
pylabrobot.end_validation()
expected: C0TPid0009xp01179 01179 01179 01179 01179 01179 01179 01179yp1458 1368 1278 1188 1098 1008 0918 0828tm1 1 1 1 1 1 1 1tt01tp2266tz2166th2450td0
actual:   C0TPid0009xp01269 01269 01269 01269 01269 01269 01269 01269yp1458 1368 1278 1188 1098 1008 0918 0828tm1 1 1 1 1 1 1 1tt01tp2266tz2166th2450td0
                        ^^    ^^    ^^    ^^    ^^    ^^    ^^    ^^                                                                                    
---------------------------------------------------------------------------
ValidationError                           Traceback (most recent call last)
Cell In[7], line 2
      1 pylabrobot.validate(validation_file)
----> 2 await simple_protocol(tr["A2:H2"])
      3 pylabrobot.end_validation()

Cell In[3], line 3, in simple_protocol(tips)
      1 async def simple_protocol(tips):
      2   await lh.setup()
----> 3   await lh.pick_up_tips(tips)
      4   await lh.return_tips()
      5   await lh.stop()

File ~/retro/pylabrobot/pylabrobot/machines/machine.py:35, in need_setup_finished.<locals>.wrapper(*args, **kwargs)
     33 if not self.setup_finished:
     34   raise RuntimeError("The setup has not finished. See `setup`.")
---> 35 return await func(*args, **kwargs)

File ~/retro/pylabrobot/pylabrobot/liquid_handling/liquid_handler.py:467, in LiquidHandler.pick_up_tips(self, tip_spots, use_channels, offsets, **backend_kwargs)
    464   (self.head[channel].commit if success else self.head[channel].rollback)()
    466 # trigger callback
--> 467 self._trigger_callback(
    468   "pick_up_tips",
    469   liquid_handler=self,
    470   operations=pickups,
    471   use_channels=use_channels,
    472   error=error,
    473   **backend_kwargs,
    474 )

File ~/retro/pylabrobot/pylabrobot/liquid_handling/liquid_handler.py:2204, in LiquidHandler._trigger_callback(self, method_name, error, *args, **kwargs)
   2202   callback(self, *args, error=error, **kwargs)
   2203 elif error is not None:
-> 2204   raise error

File ~/retro/pylabrobot/pylabrobot/liquid_handling/liquid_handler.py:451, in LiquidHandler.pick_up_tips(self, tip_spots, use_channels, offsets, **backend_kwargs)
    449 error: Optional[Exception] = None
    450 try:
--> 451   await self.backend.pick_up_tips(ops=pickups, use_channels=use_channels, **backend_kwargs)
    452 except Exception as e:
    453   error = e

File ~/retro/pylabrobot/pylabrobot/liquid_handling/backends/hamilton/STAR.py:1484, in STAR.pick_up_tips(self, ops, use_channels, begin_tip_pick_up_process, end_tip_pick_up_process, minimum_traverse_height_at_beginning_of_a_command, pickup_method)
   1481 pickup_method = pickup_method or tip.pickup_method
   1483 try:
-> 1484   return await self.pick_up_tip(
   1485     x_positions=x_positions,
   1486     y_positions=y_positions,
   1487     tip_pattern=channels_involved,
   1488     tip_type_idx=ttti,
   1489     begin_tip_pick_up_process=begin_tip_pick_up_process,
   1490     end_tip_pick_up_process=end_tip_pick_up_process,
   1491     minimum_traverse_height_at_beginning_of_a_command=minimum_traverse_height_at_beginning_of_a_command,
   1492     pickup_method=pickup_method,
   1493   )
   1494 except STARFirmwareError as e:
   1495   if plr_e := convert_star_firmware_error_to_plr_error(e):

File ~/retro/pylabrobot/pylabrobot/liquid_handling/backends/hamilton/STAR.py:98, in need_iswap_parked.<locals>.wrapper(self, *args, **kwargs)
     93 if self.iswap_installed and not self.iswap_parked:
     94   await self.park_iswap(
     95     minimum_traverse_height_at_beginning_of_a_command=int(self._iswap_traversal_height * 10)
     96   )
---> 98 result = await method(self, *args, **kwargs)
    100 return result

File ~/retro/pylabrobot/pylabrobot/liquid_handling/backends/hamilton/STAR.py:4062, in STAR.pick_up_tip(self, x_positions, y_positions, tip_pattern, tip_type_idx, begin_tip_pick_up_process, end_tip_pick_up_process, minimum_traverse_height_at_beginning_of_a_command, pickup_method)
   4055 assert (
   4056   0 <= end_tip_pick_up_process <= 3600
   4057 ), "end_tip_pick_up_process must be between 0 and 3600"
   4058 assert (
   4059   0 <= minimum_traverse_height_at_beginning_of_a_command <= 3600
   4060 ), "minimum_traverse_height_at_beginning_of_a_command must be between 0 and 3600"
-> 4062 return await self.send_command(
   4063   module="C0",
   4064   command="TP",
   4065   tip_pattern=tip_pattern,
   4066   read_timeout=60,
   4067   xp=[f"{x:05}" for x in x_positions],
   4068   yp=[f"{y:04}" for y in y_positions],
   4069   tm=tip_pattern,
   4070   tt=f"{tip_type_idx:02}",
   4071   tp=f"{begin_tip_pick_up_process:04}",
   4072   tz=f"{end_tip_pick_up_process:04}",
   4073   th=f"{minimum_traverse_height_at_beginning_of_a_command:04}",
   4074   td=pickup_method.value,
   4075 )

File ~/retro/pylabrobot/pylabrobot/liquid_handling/backends/hamilton/base.py:247, in HamiltonLiquidHandler.send_command(self, module, command, auto_id, tip_pattern, write_timeout, read_timeout, wait, fmt, **kwargs)
    222 """Send a firmware command to the Hamilton machine.
    223 
    224 Args:
   (...)
    237   A dictionary containing the parsed response, or None if no response was read within `timeout`.
    238 """
    240 cmd, id_ = self._assemble_command(
    241   module=module,
    242   command=command,
   (...)
    245   **kwargs,
    246 )
--> 247 resp = await self._write_and_read_command(
    248   id_=id_,
    249   cmd=cmd,
    250   write_timeout=write_timeout,
    251   read_timeout=read_timeout,
    252   wait=wait,
    253 )
    254 if resp is not None and fmt is not None:
    255   return self._parse_response(resp, fmt)

File ~/retro/pylabrobot/pylabrobot/liquid_handling/backends/hamilton/base.py:267, in HamiltonLiquidHandler._write_and_read_command(self, id_, cmd, write_timeout, read_timeout, wait)
    258 async def _write_and_read_command(
    259   self,
    260   id_: Optional[int],
   (...)
    264   wait: bool = True,
    265 ) -> Optional[str]:
    266   """Write a command to the Hamilton machine and read the response."""
--> 267   self.io.write(cmd.encode(), timeout=write_timeout)
    269   if not wait:
    270     return None

File ~/retro/pylabrobot/pylabrobot/io/usb.py:325, in USBValidator.write(self, data, timeout)
    323 if not next_command.data == data.decode("unicode_escape"):
    324   align_sequences(expected=next_command.data, actual=data.decode("unicode_escape"))
--> 325   raise ValidationError("Data mismatch: difference was written to stdout.")

ValidationError: Data mismatch: difference was written to stdout.