library.robot_driver

Robot driver module for OT-2 simulation control and computer vision integration.

This module provides the RobotDriver class for controlling the OT-2 robot in simulation using either PID or RL controllers, and the get_coord_from_plate function for processing specimen images through the complete computer vision pipeline to extract root tip coordinates.

The RobotDriver handles:

  • Moving the robot to target positions
  • Dispensing inoculum at root tip locations
  • Supporting both PID and RL control strategies
  • Settling detection and tolerance checking

The get_coord_from_plate function integrates:

  • U-Net inference for shoot and root segmentation
  • Shoot mask cleaning and validation
  • Root mask edge repair
  • Complete root-shoot matching pipeline
  • Coordinate conversion to robot frame
Typical Usage:
>>> from robot_driver import RobotDriver, get_coord_from_plate
>>> 
>>> # Get coordinates from specimen image
>>> df = get_coord_from_plate(
...     plate_image_path='path/to/image.png',
...     shoot_model='models/shoot_unet.h5',
...     root_model='models/root_unet.h5'
... )
>>> 
>>> # Create robot driver with PID controller
>>> driver = RobotDriver(sim, pid_controller)
>>> 
>>> # Move to each endpoint and dispense
>>> for idx, row in df.iterrows():
...     if not np.isnan(row['endpoint_robot_x']):
...         target = [row['endpoint_robot_x'], row['endpoint_robot_y'], 
...                  row['endpoint_robot_z'], 1]
...         driver.move_to(target, dispense_pause=60)
  1"""Robot driver module for OT-2 simulation control and computer vision integration.
  2
  3This module provides the RobotDriver class for controlling the OT-2 robot in simulation
  4using either PID or RL controllers, and the get_coord_from_plate function for processing
  5specimen images through the complete computer vision pipeline to extract root tip coordinates.
  6
  7The RobotDriver handles:
  8- Moving the robot to target positions
  9- Dispensing inoculum at root tip locations
 10- Supporting both PID and RL control strategies
 11- Settling detection and tolerance checking
 12
 13The get_coord_from_plate function integrates:
 14- U-Net inference for shoot and root segmentation
 15- Shoot mask cleaning and validation
 16- Root mask edge repair
 17- Complete root-shoot matching pipeline
 18- Coordinate conversion to robot frame
 19
 20Typical Usage:
 21    >>> from robot_driver import RobotDriver, get_coord_from_plate
 22    >>> 
 23    >>> # Get coordinates from specimen image
 24    >>> df = get_coord_from_plate(
 25    ...     plate_image_path='path/to/image.png',
 26    ...     shoot_model='models/shoot_unet.h5',
 27    ...     root_model='models/root_unet.h5'
 28    ... )
 29    >>> 
 30    >>> # Create robot driver with PID controller
 31    >>> driver = RobotDriver(sim, pid_controller)
 32    >>> 
 33    >>> # Move to each endpoint and dispense
 34    >>> for idx, row in df.iterrows():
 35    ...     if not np.isnan(row['endpoint_robot_x']):
 36    ...         target = [row['endpoint_robot_x'], row['endpoint_robot_y'], 
 37    ...                  row['endpoint_robot_z'], 1]
 38    ...         driver.move_to(target, dispense_pause=60)
 39"""
 40
 41import numpy as np
 42import random
 43import tempfile
 44from pathlib import Path
 45from library.robot_control import *
 46from library.mask_processing import repair_root_mask_edges
 47from library.run_inference import run_inference
 48from library.shoot_mask_cleaning import calculate_global_y_stats, clean_shoot_mask_pipeline
 49from library.root_shoot_matching import RootMatchingConfig, match_roots_to_shoots_complete
 50
 51
 52# Global Y-pixel statistics from inference batch
 53
 54GLOBAL_STATS = {'global_mean': 522.6373561098932,
 55 'global_std': 40.64363903376843,
 56 'all_stats': [{'mean': 522.4183627153725, 'std': 67.58658539471621},
 57  {'mean': 452.95398262798005, 'std': 41.56525140223841},
 58  {'mean': 490.8661926308985, 'std': 27.346885868291096},
 59  {'mean': 491.92876883767013, 'std': 24.893401245590812},
 60  {'mean': 551.0319193616127, 'std': 13.30130229117151},
 61  {'mean': 543.4997856836691, 'std': 20.34707684591397},
 62  {'mean': 507.7737608193482, 'std': 58.07985726549244},
 63  {'mean': 497.31604365278474, 'std': 48.721657949404666},
 64  {'mean': 482.4776291596063, 'std': 32.85790632020763},
 65  {'mean': 506.78758850478965, 'std': 20.708410447953284},
 66  {'mean': 657.317774466855, 'std': 72.20521285868364},
 67  {'mean': 510.7383455353232, 'std': 30.858226987648244},
 68  {'mean': 486.4174450352062, 'std': 77.22634688922165},
 69  {'mean': 536.5708980097133, 'std': 43.147085974995804},
 70  {'mean': 476.0206638821719, 'std': 21.836197778799253},
 71  {'mean': 517.2567251461988, 'std': 19.93044627424427},
 72  {'mean': 638.9460841916085, 'std': 72.86684596583322},
 73  {'mean': 537.351987845024, 'std': 19.926337738115237},
 74  {'mean': 522.4358079821378, 'std': 58.824106143078836}],
 75 'all_means': [522.4183627153725,
 76  452.95398262798005,
 77  490.8661926308985,
 78  491.92876883767013,
 79  551.0319193616127,
 80  543.4997856836691,
 81  507.7737608193482,
 82  497.31604365278474,
 83  482.4776291596063,
 84  506.78758850478965,
 85  657.317774466855,
 86  510.7383455353232,
 87  486.4174450352062,
 88  536.5708980097133,
 89  476.0206638821719,
 90  517.2567251461988,
 91  638.9460841916085,
 92  537.351987845024,
 93  522.4358079821378],
 94 'all_stds': [67.58658539471621,
 95  41.56525140223841,
 96  27.346885868291096,
 97  24.893401245590812,
 98  13.30130229117151,
 99  20.34707684591397,
100  58.07985726549244,
101  48.721657949404666,
102  32.85790632020763,
103  20.708410447953284,
104  72.20521285868364,
105  30.858226987648244,
106  77.22634688922165,
107  43.147085974995804,
108  21.836197778799253,
109  19.93044627424427,
110  72.86684596583322,
111  19.926337738115237,
112  58.824106143078836],
113 'y_min': 200,
114 'y_max': 750}
115
116
117def pause(sim, steps):
118    """Pause simulation for specified number of steps.
119    
120    Sends zero-velocity commands to maintain robot position without movement.
121    
122    Args:
123        sim (object): Simulation environment instance with run() method.
124        steps (int): Number of simulation steps to pause.
125        
126    Returns:
127        None
128    """
129    sim.run([[0, 0, 0, 0]], num_steps=steps)
130
131
132class RobotDriver:
133    """Driver class for controlling OT-2 robot in simulation.
134    
135    Provides high-level control interface for moving the robot to target positions
136    and dispensing inoculum. Supports both PID and RL-based controllers with
137    automatic settling detection and position tolerance checking.
138    
139    Attributes:
140        sim (object): Simulation environment instance.
141        controller (object): Either PID controller (with axes attribute) or RL model.
142        settle_steps (int): Number of consecutive steps within tolerance to consider settled.
143        max_steps (int): Maximum steps allowed per movement attempt.
144        pause_steps (int): Default pause duration after dispensing.
145        tolerance (float): Position error threshold in meters for settling detection.
146        
147    Examples:
148        >>> # With PID controller
149        >>> from library.robot_control import PIDController
150        >>> pid = PIDController()
151        >>> driver = RobotDriver(sim, pid, settle_steps=10, tolerance=0.0005)
152        >>> 
153        >>> # Move to position and dispense
154        >>> target = [0.15, 0.10, 0.17, 1]  # [x, y, z, dispense]
155        >>> success = driver.move_to(target, dispense_pause=60)
156        >>> 
157        >>> # Move without dispensing
158        >>> target = [0.15, 0.10, 0.17, 0]
159        >>> success = driver.move_to(target)
160    """
161    
162    def __init__(self, sim, controller, settle_steps=10, max_steps=350, pause_steps=60,
163                 tolerance=0.0005):
164        """Initialize RobotDriver.
165        
166        Args:
167            sim (object): Simulation environment instance with run() method.
168            controller (object): PID controller or RL model for generating actions.
169            settle_steps (int, optional): Steps within tolerance to confirm settling. Defaults to 10.
170            max_steps (int, optional): Maximum steps per movement. Defaults to 350.
171            pause_steps (int, optional): Default pause after dispensing. Defaults to 60.
172            tolerance (float, optional): Position error threshold in meters. Defaults to 0.0005.
173        """
174        self.sim = sim
175        self.controller = controller
176        self.settle_steps = settle_steps
177        self.max_steps = max_steps
178        self.pause_steps = pause_steps
179        self.tolerance = tolerance
180    
181    def move_to(self, target, dispense_pause=60, pause_after=False):
182        """Move robot to target position with optional dispensing.
183        
184        Moves the robot end-effector to the specified 3D target position using the
185        configured controller. Optionally dispenses inoculum upon arrival and pauses
186        afterward for settling.
187        
188        Args:
189            target (list): Target as [x, y, z, dispense] where x, y, z are floats in meters
190                          and dispense is 0 (no dispense) or 1 (dispense).
191            dispense_pause (int, optional): Pause duration after dispensing in steps. Defaults to 60.
192            pause_after (bool, optional): Whether to pause after movement completes. Defaults to False.
193            
194        Returns:
195            bool: True if target was reached within tolerance, False otherwise.
196            
197        Raises:
198            ValueError: If target is not a 4-element list.
199            
200        Examples:
201            >>> # Move and dispense
202            >>> success = driver.move_to([0.15, 0.10, 0.17, 1], dispense_pause=60)
203            >>> 
204            >>> # Move only
205            >>> success = driver.move_to([0.15, 0.10, 0.17, 0])
206        """
207        # Parse target [x, y, z, dispense]
208        if len(target) != 4:
209            raise ValueError("Target must be [x, y, z, dispense] where dispense is 0 or 1")
210
211        target_pos = {'x': target[0], 'y': target[1], 'z': target[2]}
212        dispense = bool(target[3])
213
214        is_pid = self._is_pid_controller()
215
216        if is_pid:
217            success = self._move_to_pid(target_pos)
218        else:
219            success = self._move_to_rl(target_pos)
220
221        if success and dispense:
222            self._dispense(dispense_pause=dispense_pause)
223
224        if success and pause_after:
225            pause(self.sim, self.pause_steps)
226
227        return success
228
229    def move_to_with_metrics(self, target, dispense_pause=60, pause_after=False):
230        """Move robot to target with comprehensive metrics tracking.
231        
232        High-level method that moves to target, tracks steps taken, calculates
233        final position error, and handles dispensing. Automatically detects
234        controller type (PID or RL) and uses appropriate control method.
235        
236        Args:
237            target (list): Target as [x, y, z, dispense] where x, y, z are floats 
238                in meters and dispense is 0 (no dispense) or 1 (dispense).
239            dispense_pause (int, optional): Pause duration after dispensing in steps.
240                Defaults to 60.
241            pause_after (bool, optional): Whether to pause after movement completes.
242                Defaults to False.
243                
244        Returns:
245            dict: Metrics dictionary containing:
246                - 'success' (bool): Whether target was reached within tolerance
247                - 'steps' (int): Number of steps taken
248                - 'target' (dict): Target coordinates {'x', 'y', 'z'}
249                - 'final_position' (dict): Final position {'x', 'y', 'z'}
250                - 'error_euclidean' (float): 3D Euclidean distance error in meters
251                - 'error_x' (float): X-axis error in meters
252                - 'error_y' (float): Y-axis error in meters
253                - 'error_z' (float): Z-axis error in meters
254                - 'dispensed' (bool): Whether dispensing occurred
255                
256        Raises:
257            ValueError: If target is not a 4-element list.
258            
259        Example:
260            >>> target = [0.15, 0.10, 0.17, 1]
261            >>> metrics = driver.move_to_with_metrics(target)
262            >>> print(f"Success: {metrics['success']}, Steps: {metrics['steps']}")
263            >>> print(f"Error: {metrics['error_euclidean']*1000:.3f}mm")
264        """
265        if len(target) != 4:
266            raise ValueError("Target must be [x, y, z, dispense]")
267        
268        target_dict = {'x': target[0], 'y': target[1], 'z': target[2]}
269        dispense = target[3]
270        
271        # Move to target using appropriate controller
272        if self._is_pid_controller():
273            success, steps = self._move_to_pid_metrics(target_dict, tolerance=self.tolerance)
274        else:
275            success, steps = self._move_to_rl_metrics(target_dict, tolerance=self.tolerance)
276        
277        # Get final position
278        final_pos = get_position(self.sim)
279        final_dict = {'x': final_pos[0], 'y': final_pos[1], 'z': final_pos[2]}
280        
281        # Calculate errors
282        error_x = target_dict['x'] - final_pos[0]
283        error_y = target_dict['y'] - final_pos[1]
284        error_z = target_dict['z'] - final_pos[2]
285        error_euclidean = np.sqrt(error_x**2 + error_y**2 + error_z**2)
286        
287        # Handle dispensing
288        dispensed = False
289        if dispense == 1 and success:
290            self.sim.dispense()
291            pause(self.sim, dispense_pause)
292            dispensed = True
293        
294        if pause_after:
295            pause(self.sim, self.pause_steps)
296        
297        # Build metrics dictionary
298        metrics = {
299            'success': success,
300            'steps': steps,
301            'target': target_dict.copy(),
302            'final_position': final_dict,
303            'error_euclidean': float(error_euclidean),
304            'error_x': float(error_x),
305            'error_y': float(error_y),
306            'error_z': float(error_z),
307            'dispensed': dispensed
308        }
309        
310        return metrics
311
312
313    def _dispense(self, dispense_pause=60):
314        """Dispense a single drop with variable pause.
315        
316        Args:
317            dispense_pause (int, optional): Pause duration in simulation steps. Defaults to 60.
318            
319        Returns:
320            None
321        """
322        dispense = [[0, 0, 0, 1]]
323        pause = [[0, 0, 0, 0]]
324        self.sim.run(dispense, num_steps=1)
325        self.sim.run(pause, num_steps=dispense_pause)
326
327    def _is_pid_controller(self):
328        """Check if controller is PID or RL model.
329        
330        Returns:
331            bool: True if controller is PID (has axes attribute), False for RL.
332        """
333        return hasattr(self.controller, 'axes')
334
335    def _move_to_pid(self, target):
336        """Move to target using PID controller with settling detection.
337        
338        Args:
339            target (dict): Target position with keys 'x', 'y', 'z' in meters.
340            
341        Returns:
342            bool: True if target reached and settled, False if max_steps exceeded.
343        """
344        pid_controller = self.controller
345        for axis in ['x', 'y', 'z']:
346            pid_controller.axes[axis].setpoint = target[axis]
347
348        settled_count = 0 
349        
350        for step in range(self.max_steps):
351            current_pos = get_position(self.sim)
352            current_dict = {'x': float(current_pos[0]), 'y': float(current_pos[1]), 'z': float(current_pos[2])}
353
354            error_x = target['x'] - current_pos[0]
355            error_y = target['y'] - current_pos[1]
356            error_z = target['z'] - current_pos[2]
357            total_error = (error_x**2 + error_y**2 + error_z**2)**0.5
358
359            velocities = pid_controller(current_dict, 0.01)
360            
361            # Check both position error and velocity magnitude
362            velocity_magnitude = (velocities['x']**2 + velocities['y']**2 + velocities['z']**2)**0.5
363            
364            if total_error <= self.tolerance and velocity_magnitude <= 0.01:
365                settled_count += 1
366                if settled_count >= self.settle_steps:
367                    print(f'target reached and settled at step {step}')
368                    return True
369            else:
370                settled_count = 0
371        
372            actions = [[velocities['x'], velocities['y'], velocities['z'], 0]]
373            self.sim.run(actions, 1)
374        
375        return False
376    
377    def _move_to_pid_metrics(self, target, tolerance=0.001, velocity_threshold=0.01):
378        """
379        Move to target using PID controller with settling detection and step tracking.
380        
381        Args:
382            target (dict): Target position with keys 'x', 'y', 'z' in meters.
383            tolerance (float, optional): Distance threshold in meters. Defaults to 0.001.
384            velocity_threshold (float, optional): Maximum velocity magnitude for settling.
385                Defaults to 0.01 m/s.
386                
387        Returns:
388            tuple: (success, steps) where success is bool and steps is int.
389        """
390        pid_controller = self.controller
391        
392        # Set setpoint for each axis
393        for axis in ['x', 'y', 'z']:
394            pid_controller.axes[axis].setpoint = target[axis]
395
396        settled_count = 0 
397        steps = 0
398        
399        for step in range(self.max_steps):
400            steps += 1
401            
402            current_pos = get_position(self.sim)
403            current_dict = {'x': float(current_pos[0]), 'y': float(current_pos[1]), 'z': float(current_pos[2])}
404
405            error_x = target['x'] - current_pos[0]
406            error_y = target['y'] - current_pos[1]
407            error_z = target['z'] - current_pos[2]
408            total_error = (error_x**2 + error_y**2 + error_z**2)**0.5
409
410            velocities = pid_controller(current_dict, 0.01)
411            
412            # Check both position error and velocity magnitude
413            velocity_magnitude = (velocities['x']**2 + velocities['y']**2 + velocities['z']**2)**0.5
414            
415            if total_error <= self.tolerance and velocity_magnitude <= velocity_threshold:
416                settled_count += 1
417                if settled_count >= self.settle_steps:
418                    print(f'target reached and settled at step {step}')
419                    return True, steps
420            else:
421                settled_count = 0
422        
423            actions = [[velocities['x'], velocities['y'], velocities['z'], 0]]
424            self.sim.run(actions, 1)
425        
426        return False, steps
427
428
429    def _move_to_rl(self, target, tolerance=0.002, velocity_threshold=0.04):
430        """
431        Move to target using RL controller with single crossing detection.
432        
433        Uses single crossing approach: stops commanding on first crossing into
434        tolerance with sufficiently low velocity. More appropriate for systems
435        that naturally hold position when commands stop.
436        
437        Args:
438            target (dict): Target position with keys 'x', 'y', 'z' in meters.
439            tolerance (float, optional): Distance threshold in meters. Defaults to 0.002 (2mm).
440            velocity_threshold (float, optional): Maximum velocity magnitude for success.
441                Defaults to 0.04 m/s (from testing: 92% success at 2mm).
442                
443        Returns:
444            bool: True if target reached with low velocity, False if max_steps exceeded.
445            
446        Notes:
447            - Based on comprehensive testing showing 92% success at 2mm with 0.04 m/s threshold
448            - Uses workspace bounds that must match training configuration
449            - Model outputs normalized actions [-1, 1] scaled to velocities [-2, 2] m/s
450        """
451        # Workspace bounds (must match training exactly)
452        WORKSPACE_LOW = np.array([-0.1871, -0.1706, 0.1700], dtype=np.float32)
453        WORKSPACE_HIGH = np.array([0.2532, 0.2197, 0.2897], dtype=np.float32)
454        
455        def normalize_position(position):
456            """Normalize position from workspace bounds to [-1, 1]."""
457            position = np.asarray(position, dtype=np.float32)
458            normalized = 2.0 * (position - WORKSPACE_LOW) / (WORKSPACE_HIGH - WORKSPACE_LOW) - 1.0
459            return normalized.astype(np.float32)
460        
461        def scale_action_to_velocity(action, max_velocity=2.0):
462            """Scale normalized action [-1, 1] to velocity commands."""
463            action = np.asarray(action, dtype=np.float32)
464            velocity = action * max_velocity
465            return velocity.astype(np.float32)
466        
467        # Convert target dict to array
468        goal_position = np.array([target['x'], target['y'], target['z']], dtype=np.float32)
469        goal_normalized = normalize_position(goal_position)
470        
471        for step in range(self.max_steps):
472            # Get current position
473            current_pos = np.array(get_position(self.sim), dtype=np.float32)
474            current_normalized = normalize_position(current_pos)
475            
476            # Build observation [current_x, current_y, current_z, goal_x, goal_y, goal_z]
477            obs = np.concatenate([current_normalized, goal_normalized], dtype=np.float32)
478            
479            # Predict action (use deterministic=False for deployment)
480            action, _states = self.controller.predict(obs, deterministic=False)
481            
482            # Scale to velocity
483            velocity = scale_action_to_velocity(action)
484            
485            # Calculate distance and velocity magnitude
486            distance = np.linalg.norm(current_pos - goal_position)
487            velocity_magnitude = np.linalg.norm(velocity)
488            
489            # Single crossing check - stop on first crossing with low velocity
490            if distance <= tolerance and velocity_magnitude <= velocity_threshold:
491                print(f'target reached at step {step}, distance: {distance*1000:.3f}mm, velocity: {velocity_magnitude:.4f}m/s')
492                return True
493            
494            # Send velocity command to simulation
495            actions = [[float(velocity[0]), float(velocity[1]), float(velocity[2]), 0]]
496            self.sim.run(actions, num_steps=1)
497        
498        # Max steps reached
499        final_pos = np.array(get_position(self.sim), dtype=np.float32)
500        final_distance = float(np.linalg.norm(final_pos - goal_position))
501        print(f'max steps reached, final distance: {final_distance*1000:.3f}mm')
502        return False      
503
504    def _move_to_rl_metrics(self, target, tolerance=0.002, velocity_threshold=0.04):
505        """
506        Move to target using RL controller with single crossing detection and step tracking.
507        
508        Uses single crossing approach: stops commanding on first crossing into
509        tolerance with sufficiently low velocity. More appropriate for systems
510        that naturally hold position when commands stop.
511        
512        Args:
513            target (dict): Target position with keys 'x', 'y', 'z' in meters.
514            tolerance (float, optional): Distance threshold in meters. Defaults to 0.002 (2mm).
515            velocity_threshold (float, optional): Maximum velocity magnitude for success.
516                Defaults to 0.04 m/s (from testing: 92% success at 2mm).
517                
518        Returns:
519            tuple: (success, steps) where success is bool and steps is int.
520            
521        Notes:
522            - Based on comprehensive testing showing 92% success at 2mm with 0.04 m/s threshold
523            - Uses workspace bounds that must match training configuration
524            - Model outputs normalized actions [-1, 1] scaled to velocities [-2, 2] m/s
525        """
526        # Workspace bounds (must match training exactly)
527        WORKSPACE_LOW = np.array([-0.1871, -0.1706, 0.1700], dtype=np.float32)
528        WORKSPACE_HIGH = np.array([0.2532, 0.2197, 0.2897], dtype=np.float32)
529        
530        def normalize_position(position):
531            """Normalize position from workspace bounds to [-1, 1]."""
532            position = np.asarray(position, dtype=np.float32)
533            normalized = 2.0 * (position - WORKSPACE_LOW) / (WORKSPACE_HIGH - WORKSPACE_LOW) - 1.0
534            return normalized.astype(np.float32)
535        
536        def scale_action_to_velocity(action, max_velocity=2.0):
537            """Scale normalized action [-1, 1] to velocity commands."""
538            action = np.asarray(action, dtype=np.float32)
539            velocity = action * max_velocity
540            return velocity.astype(np.float32)
541        
542        # Convert target dict to array
543        goal_position = np.array([target['x'], target['y'], target['z']], dtype=np.float32)
544        goal_normalized = normalize_position(goal_position)
545        
546        steps = 0
547        
548        for step in range(self.max_steps):
549            steps += 1
550            
551            # Get current position
552            current_pos = np.array(get_position(self.sim), dtype=np.float32)
553            current_normalized = normalize_position(current_pos)
554            
555            # Build observation [current_x, current_y, current_z, goal_x, goal_y, goal_z]
556            obs = np.concatenate([current_normalized, goal_normalized], dtype=np.float32)
557            
558            # Predict action (use deterministic=False for deployment)
559            action, _states = self.controller.predict(obs, deterministic=False)
560            
561            # Scale to velocity
562            velocity = scale_action_to_velocity(action)
563            
564            # Calculate distance and velocity magnitude
565            distance = np.linalg.norm(current_pos - goal_position)
566            velocity_magnitude = np.linalg.norm(velocity)
567            
568            # Single crossing check - stop on first crossing with low velocity
569            if distance <= tolerance and velocity_magnitude <= velocity_threshold:
570                print(f'target reached at step {step}, distance: {distance*1000:.3f}mm, velocity: {velocity_magnitude:.4f}m/s')
571                return True, steps
572            
573            # Send velocity command to simulation
574            actions = [[float(velocity[0]), float(velocity[1]), float(velocity[2]), 0]]
575            self.sim.run(actions, num_steps=1)
576        
577        # Max steps reached
578        final_pos = np.array(get_position(self.sim), dtype=np.float32)
579        final_distance = float(np.linalg.norm(final_pos - goal_position))
580        print(f'max steps reached, final distance: {final_distance*1000:.3f}mm')
581        return False, steps
582
583def get_coord_from_plate(plate_image_path, shoot_model, root_model, temp_path=None):
584    """Extract root tip coordinates from specimen image using complete CV pipeline.
585    
586    Runs the full computer vision workflow: U-Net inference for segmentation,
587    mask cleaning and repair, root-shoot matching, and coordinate conversion to
588    robot frame. Returns DataFrame with measurements and coordinates for all 5 plants.
589    
590    Args:
591        plate_image_path (str or Path): Path to specimen plate image.
592        shoot_model (str or Path): Path to trained shoot segmentation U-Net model (.h5).
593        root_model (str or Path): Path to trained root segmentation U-Net model (.h5).
594        temp_path (str or Path, optional): Directory for intermediate outputs. 
595            Creates temporary directory if None. Defaults to None.
596            
597    Returns:
598        pd.DataFrame: DataFrame with 5 rows (one per plant) containing:
599            - plant_order: 1-5 (left to right)
600            - Plant ID: Image identifier
601            - Length (px): Root length in pixels
602            - top_node_x/y: Pixel coordinates of root connection point
603            - endpoint_x/y: Pixel coordinates of root tip
604            - top_node_robot_x/y/z: Robot coordinates of connection point (meters)
605            - endpoint_robot_x/y/z: Robot coordinates of root tip (meters)
606            
607    Examples:
608        >>> df = get_coord_from_plate(
609        ...     'specimen_01.png',
610        ...     'models/shoot_model.h5',
611        ...     'models/root_model.h5'
612        ... )
613        >>> print(df[['plant_order', 'Length (px)', 'endpoint_robot_x', 'endpoint_robot_y']])
614        
615    Notes:
616        - Uses pre-computed GLOBAL_STATS for shoot mask quality validation
617        - Applies edge repair to root masks for improved skeleton quality
618        - Returns NaN coordinates for ungerminated seeds (no detected root)
619        - All robot coordinates are in meters relative to simulation origin
620    """
621    config = RootMatchingConfig()
622    if not temp_path:
623        temp_path = tempfile.mkdtemp()
624    else:
625        temp_path = Path(temp_path)
626
627    root_model = Path(root_model)
628    shoot_model = Path(shoot_model)
629
630    images = [str(plate_image_path)]
631
632    # Run root inference
633    mask_type = "root"
634    n_processed_root, output_dir_root = run_inference(
635        model_path=root_model,
636        image_paths=images,
637        output_dir=temp_path,
638        mask_type=mask_type,
639        verbose=False
640    )
641
642    # Run shoot inference
643    mask_type = "shoot"
644    n_processed_shoot, output_dir_shoot = run_inference(
645        model_path=shoot_model,
646        image_paths=images,
647        output_dir=temp_path,
648        mask_type=mask_type,
649        verbose=False
650    )
651
652    # Get inference outputs
653    root_inference_path = next(output_dir_root.glob('*.png'), None)
654    shoot_inference_path = next(output_dir_shoot.glob('*.png'), None)
655
656    assert root_inference_path, "Root inference failed - no output mask found"
657    assert shoot_inference_path, "Shoot inference failed - no output mask found"
658
659    # Clean shoot mask
660    results = clean_shoot_mask_pipeline(
661        shoot_inference_path, 
662        GLOBAL_STATS, 
663        quality_check_threshold=2.0
664    )
665    cleaned_shoot_mask = results['cleaned_mask']
666    
667    # Repair root mask
668    repaired_root_mask = repair_root_mask_edges(root_inference_path)
669
670    # Run complete matching pipeline
671    df = match_roots_to_shoots_complete(
672        shoot_mask=cleaned_shoot_mask,
673        root_mask=repaired_root_mask,
674        image_path=plate_image_path,
675        config=config,
676        return_dataframe=True,
677        sample_idx=0,
678        verbose=True
679    )
680
681    return df
GLOBAL_STATS = {'global_mean': 522.6373561098932, 'global_std': 40.64363903376843, 'all_stats': [{'mean': 522.4183627153725, 'std': 67.58658539471621}, {'mean': 452.95398262798005, 'std': 41.56525140223841}, {'mean': 490.8661926308985, 'std': 27.346885868291096}, {'mean': 491.92876883767013, 'std': 24.893401245590812}, {'mean': 551.0319193616127, 'std': 13.30130229117151}, {'mean': 543.4997856836691, 'std': 20.34707684591397}, {'mean': 507.7737608193482, 'std': 58.07985726549244}, {'mean': 497.31604365278474, 'std': 48.721657949404666}, {'mean': 482.4776291596063, 'std': 32.85790632020763}, {'mean': 506.78758850478965, 'std': 20.708410447953284}, {'mean': 657.317774466855, 'std': 72.20521285868364}, {'mean': 510.7383455353232, 'std': 30.858226987648244}, {'mean': 486.4174450352062, 'std': 77.22634688922165}, {'mean': 536.5708980097133, 'std': 43.147085974995804}, {'mean': 476.0206638821719, 'std': 21.836197778799253}, {'mean': 517.2567251461988, 'std': 19.93044627424427}, {'mean': 638.9460841916085, 'std': 72.86684596583322}, {'mean': 537.351987845024, 'std': 19.926337738115237}, {'mean': 522.4358079821378, 'std': 58.824106143078836}], 'all_means': [522.4183627153725, 452.95398262798005, 490.8661926308985, 491.92876883767013, 551.0319193616127, 543.4997856836691, 507.7737608193482, 497.31604365278474, 482.4776291596063, 506.78758850478965, 657.317774466855, 510.7383455353232, 486.4174450352062, 536.5708980097133, 476.0206638821719, 517.2567251461988, 638.9460841916085, 537.351987845024, 522.4358079821378], 'all_stds': [67.58658539471621, 41.56525140223841, 27.346885868291096, 24.893401245590812, 13.30130229117151, 20.34707684591397, 58.07985726549244, 48.721657949404666, 32.85790632020763, 20.708410447953284, 72.20521285868364, 30.858226987648244, 77.22634688922165, 43.147085974995804, 21.836197778799253, 19.93044627424427, 72.86684596583322, 19.926337738115237, 58.824106143078836], 'y_min': 200, 'y_max': 750}
def pause(sim, steps):
118def pause(sim, steps):
119    """Pause simulation for specified number of steps.
120    
121    Sends zero-velocity commands to maintain robot position without movement.
122    
123    Args:
124        sim (object): Simulation environment instance with run() method.
125        steps (int): Number of simulation steps to pause.
126        
127    Returns:
128        None
129    """
130    sim.run([[0, 0, 0, 0]], num_steps=steps)

Pause simulation for specified number of steps.

Sends zero-velocity commands to maintain robot position without movement.

Arguments:
  • sim (object): Simulation environment instance with run() method.
  • steps (int): Number of simulation steps to pause.
Returns:

None

class RobotDriver:
133class RobotDriver:
134    """Driver class for controlling OT-2 robot in simulation.
135    
136    Provides high-level control interface for moving the robot to target positions
137    and dispensing inoculum. Supports both PID and RL-based controllers with
138    automatic settling detection and position tolerance checking.
139    
140    Attributes:
141        sim (object): Simulation environment instance.
142        controller (object): Either PID controller (with axes attribute) or RL model.
143        settle_steps (int): Number of consecutive steps within tolerance to consider settled.
144        max_steps (int): Maximum steps allowed per movement attempt.
145        pause_steps (int): Default pause duration after dispensing.
146        tolerance (float): Position error threshold in meters for settling detection.
147        
148    Examples:
149        >>> # With PID controller
150        >>> from library.robot_control import PIDController
151        >>> pid = PIDController()
152        >>> driver = RobotDriver(sim, pid, settle_steps=10, tolerance=0.0005)
153        >>> 
154        >>> # Move to position and dispense
155        >>> target = [0.15, 0.10, 0.17, 1]  # [x, y, z, dispense]
156        >>> success = driver.move_to(target, dispense_pause=60)
157        >>> 
158        >>> # Move without dispensing
159        >>> target = [0.15, 0.10, 0.17, 0]
160        >>> success = driver.move_to(target)
161    """
162    
163    def __init__(self, sim, controller, settle_steps=10, max_steps=350, pause_steps=60,
164                 tolerance=0.0005):
165        """Initialize RobotDriver.
166        
167        Args:
168            sim (object): Simulation environment instance with run() method.
169            controller (object): PID controller or RL model for generating actions.
170            settle_steps (int, optional): Steps within tolerance to confirm settling. Defaults to 10.
171            max_steps (int, optional): Maximum steps per movement. Defaults to 350.
172            pause_steps (int, optional): Default pause after dispensing. Defaults to 60.
173            tolerance (float, optional): Position error threshold in meters. Defaults to 0.0005.
174        """
175        self.sim = sim
176        self.controller = controller
177        self.settle_steps = settle_steps
178        self.max_steps = max_steps
179        self.pause_steps = pause_steps
180        self.tolerance = tolerance
181    
182    def move_to(self, target, dispense_pause=60, pause_after=False):
183        """Move robot to target position with optional dispensing.
184        
185        Moves the robot end-effector to the specified 3D target position using the
186        configured controller. Optionally dispenses inoculum upon arrival and pauses
187        afterward for settling.
188        
189        Args:
190            target (list): Target as [x, y, z, dispense] where x, y, z are floats in meters
191                          and dispense is 0 (no dispense) or 1 (dispense).
192            dispense_pause (int, optional): Pause duration after dispensing in steps. Defaults to 60.
193            pause_after (bool, optional): Whether to pause after movement completes. Defaults to False.
194            
195        Returns:
196            bool: True if target was reached within tolerance, False otherwise.
197            
198        Raises:
199            ValueError: If target is not a 4-element list.
200            
201        Examples:
202            >>> # Move and dispense
203            >>> success = driver.move_to([0.15, 0.10, 0.17, 1], dispense_pause=60)
204            >>> 
205            >>> # Move only
206            >>> success = driver.move_to([0.15, 0.10, 0.17, 0])
207        """
208        # Parse target [x, y, z, dispense]
209        if len(target) != 4:
210            raise ValueError("Target must be [x, y, z, dispense] where dispense is 0 or 1")
211
212        target_pos = {'x': target[0], 'y': target[1], 'z': target[2]}
213        dispense = bool(target[3])
214
215        is_pid = self._is_pid_controller()
216
217        if is_pid:
218            success = self._move_to_pid(target_pos)
219        else:
220            success = self._move_to_rl(target_pos)
221
222        if success and dispense:
223            self._dispense(dispense_pause=dispense_pause)
224
225        if success and pause_after:
226            pause(self.sim, self.pause_steps)
227
228        return success
229
230    def move_to_with_metrics(self, target, dispense_pause=60, pause_after=False):
231        """Move robot to target with comprehensive metrics tracking.
232        
233        High-level method that moves to target, tracks steps taken, calculates
234        final position error, and handles dispensing. Automatically detects
235        controller type (PID or RL) and uses appropriate control method.
236        
237        Args:
238            target (list): Target as [x, y, z, dispense] where x, y, z are floats 
239                in meters and dispense is 0 (no dispense) or 1 (dispense).
240            dispense_pause (int, optional): Pause duration after dispensing in steps.
241                Defaults to 60.
242            pause_after (bool, optional): Whether to pause after movement completes.
243                Defaults to False.
244                
245        Returns:
246            dict: Metrics dictionary containing:
247                - 'success' (bool): Whether target was reached within tolerance
248                - 'steps' (int): Number of steps taken
249                - 'target' (dict): Target coordinates {'x', 'y', 'z'}
250                - 'final_position' (dict): Final position {'x', 'y', 'z'}
251                - 'error_euclidean' (float): 3D Euclidean distance error in meters
252                - 'error_x' (float): X-axis error in meters
253                - 'error_y' (float): Y-axis error in meters
254                - 'error_z' (float): Z-axis error in meters
255                - 'dispensed' (bool): Whether dispensing occurred
256                
257        Raises:
258            ValueError: If target is not a 4-element list.
259            
260        Example:
261            >>> target = [0.15, 0.10, 0.17, 1]
262            >>> metrics = driver.move_to_with_metrics(target)
263            >>> print(f"Success: {metrics['success']}, Steps: {metrics['steps']}")
264            >>> print(f"Error: {metrics['error_euclidean']*1000:.3f}mm")
265        """
266        if len(target) != 4:
267            raise ValueError("Target must be [x, y, z, dispense]")
268        
269        target_dict = {'x': target[0], 'y': target[1], 'z': target[2]}
270        dispense = target[3]
271        
272        # Move to target using appropriate controller
273        if self._is_pid_controller():
274            success, steps = self._move_to_pid_metrics(target_dict, tolerance=self.tolerance)
275        else:
276            success, steps = self._move_to_rl_metrics(target_dict, tolerance=self.tolerance)
277        
278        # Get final position
279        final_pos = get_position(self.sim)
280        final_dict = {'x': final_pos[0], 'y': final_pos[1], 'z': final_pos[2]}
281        
282        # Calculate errors
283        error_x = target_dict['x'] - final_pos[0]
284        error_y = target_dict['y'] - final_pos[1]
285        error_z = target_dict['z'] - final_pos[2]
286        error_euclidean = np.sqrt(error_x**2 + error_y**2 + error_z**2)
287        
288        # Handle dispensing
289        dispensed = False
290        if dispense == 1 and success:
291            self.sim.dispense()
292            pause(self.sim, dispense_pause)
293            dispensed = True
294        
295        if pause_after:
296            pause(self.sim, self.pause_steps)
297        
298        # Build metrics dictionary
299        metrics = {
300            'success': success,
301            'steps': steps,
302            'target': target_dict.copy(),
303            'final_position': final_dict,
304            'error_euclidean': float(error_euclidean),
305            'error_x': float(error_x),
306            'error_y': float(error_y),
307            'error_z': float(error_z),
308            'dispensed': dispensed
309        }
310        
311        return metrics
312
313
314    def _dispense(self, dispense_pause=60):
315        """Dispense a single drop with variable pause.
316        
317        Args:
318            dispense_pause (int, optional): Pause duration in simulation steps. Defaults to 60.
319            
320        Returns:
321            None
322        """
323        dispense = [[0, 0, 0, 1]]
324        pause = [[0, 0, 0, 0]]
325        self.sim.run(dispense, num_steps=1)
326        self.sim.run(pause, num_steps=dispense_pause)
327
328    def _is_pid_controller(self):
329        """Check if controller is PID or RL model.
330        
331        Returns:
332            bool: True if controller is PID (has axes attribute), False for RL.
333        """
334        return hasattr(self.controller, 'axes')
335
336    def _move_to_pid(self, target):
337        """Move to target using PID controller with settling detection.
338        
339        Args:
340            target (dict): Target position with keys 'x', 'y', 'z' in meters.
341            
342        Returns:
343            bool: True if target reached and settled, False if max_steps exceeded.
344        """
345        pid_controller = self.controller
346        for axis in ['x', 'y', 'z']:
347            pid_controller.axes[axis].setpoint = target[axis]
348
349        settled_count = 0 
350        
351        for step in range(self.max_steps):
352            current_pos = get_position(self.sim)
353            current_dict = {'x': float(current_pos[0]), 'y': float(current_pos[1]), 'z': float(current_pos[2])}
354
355            error_x = target['x'] - current_pos[0]
356            error_y = target['y'] - current_pos[1]
357            error_z = target['z'] - current_pos[2]
358            total_error = (error_x**2 + error_y**2 + error_z**2)**0.5
359
360            velocities = pid_controller(current_dict, 0.01)
361            
362            # Check both position error and velocity magnitude
363            velocity_magnitude = (velocities['x']**2 + velocities['y']**2 + velocities['z']**2)**0.5
364            
365            if total_error <= self.tolerance and velocity_magnitude <= 0.01:
366                settled_count += 1
367                if settled_count >= self.settle_steps:
368                    print(f'target reached and settled at step {step}')
369                    return True
370            else:
371                settled_count = 0
372        
373            actions = [[velocities['x'], velocities['y'], velocities['z'], 0]]
374            self.sim.run(actions, 1)
375        
376        return False
377    
378    def _move_to_pid_metrics(self, target, tolerance=0.001, velocity_threshold=0.01):
379        """
380        Move to target using PID controller with settling detection and step tracking.
381        
382        Args:
383            target (dict): Target position with keys 'x', 'y', 'z' in meters.
384            tolerance (float, optional): Distance threshold in meters. Defaults to 0.001.
385            velocity_threshold (float, optional): Maximum velocity magnitude for settling.
386                Defaults to 0.01 m/s.
387                
388        Returns:
389            tuple: (success, steps) where success is bool and steps is int.
390        """
391        pid_controller = self.controller
392        
393        # Set setpoint for each axis
394        for axis in ['x', 'y', 'z']:
395            pid_controller.axes[axis].setpoint = target[axis]
396
397        settled_count = 0 
398        steps = 0
399        
400        for step in range(self.max_steps):
401            steps += 1
402            
403            current_pos = get_position(self.sim)
404            current_dict = {'x': float(current_pos[0]), 'y': float(current_pos[1]), 'z': float(current_pos[2])}
405
406            error_x = target['x'] - current_pos[0]
407            error_y = target['y'] - current_pos[1]
408            error_z = target['z'] - current_pos[2]
409            total_error = (error_x**2 + error_y**2 + error_z**2)**0.5
410
411            velocities = pid_controller(current_dict, 0.01)
412            
413            # Check both position error and velocity magnitude
414            velocity_magnitude = (velocities['x']**2 + velocities['y']**2 + velocities['z']**2)**0.5
415            
416            if total_error <= self.tolerance and velocity_magnitude <= velocity_threshold:
417                settled_count += 1
418                if settled_count >= self.settle_steps:
419                    print(f'target reached and settled at step {step}')
420                    return True, steps
421            else:
422                settled_count = 0
423        
424            actions = [[velocities['x'], velocities['y'], velocities['z'], 0]]
425            self.sim.run(actions, 1)
426        
427        return False, steps
428
429
430    def _move_to_rl(self, target, tolerance=0.002, velocity_threshold=0.04):
431        """
432        Move to target using RL controller with single crossing detection.
433        
434        Uses single crossing approach: stops commanding on first crossing into
435        tolerance with sufficiently low velocity. More appropriate for systems
436        that naturally hold position when commands stop.
437        
438        Args:
439            target (dict): Target position with keys 'x', 'y', 'z' in meters.
440            tolerance (float, optional): Distance threshold in meters. Defaults to 0.002 (2mm).
441            velocity_threshold (float, optional): Maximum velocity magnitude for success.
442                Defaults to 0.04 m/s (from testing: 92% success at 2mm).
443                
444        Returns:
445            bool: True if target reached with low velocity, False if max_steps exceeded.
446            
447        Notes:
448            - Based on comprehensive testing showing 92% success at 2mm with 0.04 m/s threshold
449            - Uses workspace bounds that must match training configuration
450            - Model outputs normalized actions [-1, 1] scaled to velocities [-2, 2] m/s
451        """
452        # Workspace bounds (must match training exactly)
453        WORKSPACE_LOW = np.array([-0.1871, -0.1706, 0.1700], dtype=np.float32)
454        WORKSPACE_HIGH = np.array([0.2532, 0.2197, 0.2897], dtype=np.float32)
455        
456        def normalize_position(position):
457            """Normalize position from workspace bounds to [-1, 1]."""
458            position = np.asarray(position, dtype=np.float32)
459            normalized = 2.0 * (position - WORKSPACE_LOW) / (WORKSPACE_HIGH - WORKSPACE_LOW) - 1.0
460            return normalized.astype(np.float32)
461        
462        def scale_action_to_velocity(action, max_velocity=2.0):
463            """Scale normalized action [-1, 1] to velocity commands."""
464            action = np.asarray(action, dtype=np.float32)
465            velocity = action * max_velocity
466            return velocity.astype(np.float32)
467        
468        # Convert target dict to array
469        goal_position = np.array([target['x'], target['y'], target['z']], dtype=np.float32)
470        goal_normalized = normalize_position(goal_position)
471        
472        for step in range(self.max_steps):
473            # Get current position
474            current_pos = np.array(get_position(self.sim), dtype=np.float32)
475            current_normalized = normalize_position(current_pos)
476            
477            # Build observation [current_x, current_y, current_z, goal_x, goal_y, goal_z]
478            obs = np.concatenate([current_normalized, goal_normalized], dtype=np.float32)
479            
480            # Predict action (use deterministic=False for deployment)
481            action, _states = self.controller.predict(obs, deterministic=False)
482            
483            # Scale to velocity
484            velocity = scale_action_to_velocity(action)
485            
486            # Calculate distance and velocity magnitude
487            distance = np.linalg.norm(current_pos - goal_position)
488            velocity_magnitude = np.linalg.norm(velocity)
489            
490            # Single crossing check - stop on first crossing with low velocity
491            if distance <= tolerance and velocity_magnitude <= velocity_threshold:
492                print(f'target reached at step {step}, distance: {distance*1000:.3f}mm, velocity: {velocity_magnitude:.4f}m/s')
493                return True
494            
495            # Send velocity command to simulation
496            actions = [[float(velocity[0]), float(velocity[1]), float(velocity[2]), 0]]
497            self.sim.run(actions, num_steps=1)
498        
499        # Max steps reached
500        final_pos = np.array(get_position(self.sim), dtype=np.float32)
501        final_distance = float(np.linalg.norm(final_pos - goal_position))
502        print(f'max steps reached, final distance: {final_distance*1000:.3f}mm')
503        return False      
504
505    def _move_to_rl_metrics(self, target, tolerance=0.002, velocity_threshold=0.04):
506        """
507        Move to target using RL controller with single crossing detection and step tracking.
508        
509        Uses single crossing approach: stops commanding on first crossing into
510        tolerance with sufficiently low velocity. More appropriate for systems
511        that naturally hold position when commands stop.
512        
513        Args:
514            target (dict): Target position with keys 'x', 'y', 'z' in meters.
515            tolerance (float, optional): Distance threshold in meters. Defaults to 0.002 (2mm).
516            velocity_threshold (float, optional): Maximum velocity magnitude for success.
517                Defaults to 0.04 m/s (from testing: 92% success at 2mm).
518                
519        Returns:
520            tuple: (success, steps) where success is bool and steps is int.
521            
522        Notes:
523            - Based on comprehensive testing showing 92% success at 2mm with 0.04 m/s threshold
524            - Uses workspace bounds that must match training configuration
525            - Model outputs normalized actions [-1, 1] scaled to velocities [-2, 2] m/s
526        """
527        # Workspace bounds (must match training exactly)
528        WORKSPACE_LOW = np.array([-0.1871, -0.1706, 0.1700], dtype=np.float32)
529        WORKSPACE_HIGH = np.array([0.2532, 0.2197, 0.2897], dtype=np.float32)
530        
531        def normalize_position(position):
532            """Normalize position from workspace bounds to [-1, 1]."""
533            position = np.asarray(position, dtype=np.float32)
534            normalized = 2.0 * (position - WORKSPACE_LOW) / (WORKSPACE_HIGH - WORKSPACE_LOW) - 1.0
535            return normalized.astype(np.float32)
536        
537        def scale_action_to_velocity(action, max_velocity=2.0):
538            """Scale normalized action [-1, 1] to velocity commands."""
539            action = np.asarray(action, dtype=np.float32)
540            velocity = action * max_velocity
541            return velocity.astype(np.float32)
542        
543        # Convert target dict to array
544        goal_position = np.array([target['x'], target['y'], target['z']], dtype=np.float32)
545        goal_normalized = normalize_position(goal_position)
546        
547        steps = 0
548        
549        for step in range(self.max_steps):
550            steps += 1
551            
552            # Get current position
553            current_pos = np.array(get_position(self.sim), dtype=np.float32)
554            current_normalized = normalize_position(current_pos)
555            
556            # Build observation [current_x, current_y, current_z, goal_x, goal_y, goal_z]
557            obs = np.concatenate([current_normalized, goal_normalized], dtype=np.float32)
558            
559            # Predict action (use deterministic=False for deployment)
560            action, _states = self.controller.predict(obs, deterministic=False)
561            
562            # Scale to velocity
563            velocity = scale_action_to_velocity(action)
564            
565            # Calculate distance and velocity magnitude
566            distance = np.linalg.norm(current_pos - goal_position)
567            velocity_magnitude = np.linalg.norm(velocity)
568            
569            # Single crossing check - stop on first crossing with low velocity
570            if distance <= tolerance and velocity_magnitude <= velocity_threshold:
571                print(f'target reached at step {step}, distance: {distance*1000:.3f}mm, velocity: {velocity_magnitude:.4f}m/s')
572                return True, steps
573            
574            # Send velocity command to simulation
575            actions = [[float(velocity[0]), float(velocity[1]), float(velocity[2]), 0]]
576            self.sim.run(actions, num_steps=1)
577        
578        # Max steps reached
579        final_pos = np.array(get_position(self.sim), dtype=np.float32)
580        final_distance = float(np.linalg.norm(final_pos - goal_position))
581        print(f'max steps reached, final distance: {final_distance*1000:.3f}mm')
582        return False, steps

Driver class for controlling OT-2 robot in simulation.

Provides high-level control interface for moving the robot to target positions and dispensing inoculum. Supports both PID and RL-based controllers with automatic settling detection and position tolerance checking.

Attributes:
  • sim (object): Simulation environment instance.
  • controller (object): Either PID controller (with axes attribute) or RL model.
  • settle_steps (int): Number of consecutive steps within tolerance to consider settled.
  • max_steps (int): Maximum steps allowed per movement attempt.
  • pause_steps (int): Default pause duration after dispensing.
  • tolerance (float): Position error threshold in meters for settling detection.
Examples:
>>> # With PID controller
>>> from library.robot_control import PIDController
>>> pid = PIDController()
>>> driver = RobotDriver(sim, pid, settle_steps=10, tolerance=0.0005)
>>> 
>>> # Move to position and dispense
>>> target = [0.15, 0.10, 0.17, 1]  # [x, y, z, dispense]
>>> success = driver.move_to(target, dispense_pause=60)
>>> 
>>> # Move without dispensing
>>> target = [0.15, 0.10, 0.17, 0]
>>> success = driver.move_to(target)
RobotDriver( sim, controller, settle_steps=10, max_steps=350, pause_steps=60, tolerance=0.0005)
163    def __init__(self, sim, controller, settle_steps=10, max_steps=350, pause_steps=60,
164                 tolerance=0.0005):
165        """Initialize RobotDriver.
166        
167        Args:
168            sim (object): Simulation environment instance with run() method.
169            controller (object): PID controller or RL model for generating actions.
170            settle_steps (int, optional): Steps within tolerance to confirm settling. Defaults to 10.
171            max_steps (int, optional): Maximum steps per movement. Defaults to 350.
172            pause_steps (int, optional): Default pause after dispensing. Defaults to 60.
173            tolerance (float, optional): Position error threshold in meters. Defaults to 0.0005.
174        """
175        self.sim = sim
176        self.controller = controller
177        self.settle_steps = settle_steps
178        self.max_steps = max_steps
179        self.pause_steps = pause_steps
180        self.tolerance = tolerance

Initialize RobotDriver.

Arguments:
  • sim (object): Simulation environment instance with run() method.
  • controller (object): PID controller or RL model for generating actions.
  • settle_steps (int, optional): Steps within tolerance to confirm settling. Defaults to 10.
  • max_steps (int, optional): Maximum steps per movement. Defaults to 350.
  • pause_steps (int, optional): Default pause after dispensing. Defaults to 60.
  • tolerance (float, optional): Position error threshold in meters. Defaults to 0.0005.
sim
controller
settle_steps
max_steps
pause_steps
tolerance
def move_to(self, target, dispense_pause=60, pause_after=False):
182    def move_to(self, target, dispense_pause=60, pause_after=False):
183        """Move robot to target position with optional dispensing.
184        
185        Moves the robot end-effector to the specified 3D target position using the
186        configured controller. Optionally dispenses inoculum upon arrival and pauses
187        afterward for settling.
188        
189        Args:
190            target (list): Target as [x, y, z, dispense] where x, y, z are floats in meters
191                          and dispense is 0 (no dispense) or 1 (dispense).
192            dispense_pause (int, optional): Pause duration after dispensing in steps. Defaults to 60.
193            pause_after (bool, optional): Whether to pause after movement completes. Defaults to False.
194            
195        Returns:
196            bool: True if target was reached within tolerance, False otherwise.
197            
198        Raises:
199            ValueError: If target is not a 4-element list.
200            
201        Examples:
202            >>> # Move and dispense
203            >>> success = driver.move_to([0.15, 0.10, 0.17, 1], dispense_pause=60)
204            >>> 
205            >>> # Move only
206            >>> success = driver.move_to([0.15, 0.10, 0.17, 0])
207        """
208        # Parse target [x, y, z, dispense]
209        if len(target) != 4:
210            raise ValueError("Target must be [x, y, z, dispense] where dispense is 0 or 1")
211
212        target_pos = {'x': target[0], 'y': target[1], 'z': target[2]}
213        dispense = bool(target[3])
214
215        is_pid = self._is_pid_controller()
216
217        if is_pid:
218            success = self._move_to_pid(target_pos)
219        else:
220            success = self._move_to_rl(target_pos)
221
222        if success and dispense:
223            self._dispense(dispense_pause=dispense_pause)
224
225        if success and pause_after:
226            pause(self.sim, self.pause_steps)
227
228        return success

Move robot to target position with optional dispensing.

Moves the robot end-effector to the specified 3D target position using the configured controller. Optionally dispenses inoculum upon arrival and pauses afterward for settling.

Arguments:
  • target (list): Target as [x, y, z, dispense] where x, y, z are floats in meters and dispense is 0 (no dispense) or 1 (dispense).
  • dispense_pause (int, optional): Pause duration after dispensing in steps. Defaults to 60.
  • pause_after (bool, optional): Whether to pause after movement completes. Defaults to False.
Returns:

bool: True if target was reached within tolerance, False otherwise.

Raises:
  • ValueError: If target is not a 4-element list.
Examples:
>>> # Move and dispense
>>> success = driver.move_to([0.15, 0.10, 0.17, 1], dispense_pause=60)
>>> 
>>> # Move only
>>> success = driver.move_to([0.15, 0.10, 0.17, 0])
def move_to_with_metrics(self, target, dispense_pause=60, pause_after=False):
230    def move_to_with_metrics(self, target, dispense_pause=60, pause_after=False):
231        """Move robot to target with comprehensive metrics tracking.
232        
233        High-level method that moves to target, tracks steps taken, calculates
234        final position error, and handles dispensing. Automatically detects
235        controller type (PID or RL) and uses appropriate control method.
236        
237        Args:
238            target (list): Target as [x, y, z, dispense] where x, y, z are floats 
239                in meters and dispense is 0 (no dispense) or 1 (dispense).
240            dispense_pause (int, optional): Pause duration after dispensing in steps.
241                Defaults to 60.
242            pause_after (bool, optional): Whether to pause after movement completes.
243                Defaults to False.
244                
245        Returns:
246            dict: Metrics dictionary containing:
247                - 'success' (bool): Whether target was reached within tolerance
248                - 'steps' (int): Number of steps taken
249                - 'target' (dict): Target coordinates {'x', 'y', 'z'}
250                - 'final_position' (dict): Final position {'x', 'y', 'z'}
251                - 'error_euclidean' (float): 3D Euclidean distance error in meters
252                - 'error_x' (float): X-axis error in meters
253                - 'error_y' (float): Y-axis error in meters
254                - 'error_z' (float): Z-axis error in meters
255                - 'dispensed' (bool): Whether dispensing occurred
256                
257        Raises:
258            ValueError: If target is not a 4-element list.
259            
260        Example:
261            >>> target = [0.15, 0.10, 0.17, 1]
262            >>> metrics = driver.move_to_with_metrics(target)
263            >>> print(f"Success: {metrics['success']}, Steps: {metrics['steps']}")
264            >>> print(f"Error: {metrics['error_euclidean']*1000:.3f}mm")
265        """
266        if len(target) != 4:
267            raise ValueError("Target must be [x, y, z, dispense]")
268        
269        target_dict = {'x': target[0], 'y': target[1], 'z': target[2]}
270        dispense = target[3]
271        
272        # Move to target using appropriate controller
273        if self._is_pid_controller():
274            success, steps = self._move_to_pid_metrics(target_dict, tolerance=self.tolerance)
275        else:
276            success, steps = self._move_to_rl_metrics(target_dict, tolerance=self.tolerance)
277        
278        # Get final position
279        final_pos = get_position(self.sim)
280        final_dict = {'x': final_pos[0], 'y': final_pos[1], 'z': final_pos[2]}
281        
282        # Calculate errors
283        error_x = target_dict['x'] - final_pos[0]
284        error_y = target_dict['y'] - final_pos[1]
285        error_z = target_dict['z'] - final_pos[2]
286        error_euclidean = np.sqrt(error_x**2 + error_y**2 + error_z**2)
287        
288        # Handle dispensing
289        dispensed = False
290        if dispense == 1 and success:
291            self.sim.dispense()
292            pause(self.sim, dispense_pause)
293            dispensed = True
294        
295        if pause_after:
296            pause(self.sim, self.pause_steps)
297        
298        # Build metrics dictionary
299        metrics = {
300            'success': success,
301            'steps': steps,
302            'target': target_dict.copy(),
303            'final_position': final_dict,
304            'error_euclidean': float(error_euclidean),
305            'error_x': float(error_x),
306            'error_y': float(error_y),
307            'error_z': float(error_z),
308            'dispensed': dispensed
309        }
310        
311        return metrics

Move robot to target with comprehensive metrics tracking.

High-level method that moves to target, tracks steps taken, calculates final position error, and handles dispensing. Automatically detects controller type (PID or RL) and uses appropriate control method.

Arguments:
  • target (list): Target as [x, y, z, dispense] where x, y, z are floats in meters and dispense is 0 (no dispense) or 1 (dispense).
  • dispense_pause (int, optional): Pause duration after dispensing in steps. Defaults to 60.
  • pause_after (bool, optional): Whether to pause after movement completes. Defaults to False.
Returns:

dict: Metrics dictionary containing: - 'success' (bool): Whether target was reached within tolerance - 'steps' (int): Number of steps taken - 'target' (dict): Target coordinates {'x', 'y', 'z'} - 'final_position' (dict): Final position {'x', 'y', 'z'} - 'error_euclidean' (float): 3D Euclidean distance error in meters - 'error_x' (float): X-axis error in meters - 'error_y' (float): Y-axis error in meters - 'error_z' (float): Z-axis error in meters - 'dispensed' (bool): Whether dispensing occurred

Raises:
  • ValueError: If target is not a 4-element list.
Example:
>>> target = [0.15, 0.10, 0.17, 1]
>>> metrics = driver.move_to_with_metrics(target)
>>> print(f"Success: {metrics['success']}, Steps: {metrics['steps']}")
>>> print(f"Error: {metrics['error_euclidean']*1000:.3f}mm")
def get_coord_from_plate(plate_image_path, shoot_model, root_model, temp_path=None):
584def get_coord_from_plate(plate_image_path, shoot_model, root_model, temp_path=None):
585    """Extract root tip coordinates from specimen image using complete CV pipeline.
586    
587    Runs the full computer vision workflow: U-Net inference for segmentation,
588    mask cleaning and repair, root-shoot matching, and coordinate conversion to
589    robot frame. Returns DataFrame with measurements and coordinates for all 5 plants.
590    
591    Args:
592        plate_image_path (str or Path): Path to specimen plate image.
593        shoot_model (str or Path): Path to trained shoot segmentation U-Net model (.h5).
594        root_model (str or Path): Path to trained root segmentation U-Net model (.h5).
595        temp_path (str or Path, optional): Directory for intermediate outputs. 
596            Creates temporary directory if None. Defaults to None.
597            
598    Returns:
599        pd.DataFrame: DataFrame with 5 rows (one per plant) containing:
600            - plant_order: 1-5 (left to right)
601            - Plant ID: Image identifier
602            - Length (px): Root length in pixels
603            - top_node_x/y: Pixel coordinates of root connection point
604            - endpoint_x/y: Pixel coordinates of root tip
605            - top_node_robot_x/y/z: Robot coordinates of connection point (meters)
606            - endpoint_robot_x/y/z: Robot coordinates of root tip (meters)
607            
608    Examples:
609        >>> df = get_coord_from_plate(
610        ...     'specimen_01.png',
611        ...     'models/shoot_model.h5',
612        ...     'models/root_model.h5'
613        ... )
614        >>> print(df[['plant_order', 'Length (px)', 'endpoint_robot_x', 'endpoint_robot_y']])
615        
616    Notes:
617        - Uses pre-computed GLOBAL_STATS for shoot mask quality validation
618        - Applies edge repair to root masks for improved skeleton quality
619        - Returns NaN coordinates for ungerminated seeds (no detected root)
620        - All robot coordinates are in meters relative to simulation origin
621    """
622    config = RootMatchingConfig()
623    if not temp_path:
624        temp_path = tempfile.mkdtemp()
625    else:
626        temp_path = Path(temp_path)
627
628    root_model = Path(root_model)
629    shoot_model = Path(shoot_model)
630
631    images = [str(plate_image_path)]
632
633    # Run root inference
634    mask_type = "root"
635    n_processed_root, output_dir_root = run_inference(
636        model_path=root_model,
637        image_paths=images,
638        output_dir=temp_path,
639        mask_type=mask_type,
640        verbose=False
641    )
642
643    # Run shoot inference
644    mask_type = "shoot"
645    n_processed_shoot, output_dir_shoot = run_inference(
646        model_path=shoot_model,
647        image_paths=images,
648        output_dir=temp_path,
649        mask_type=mask_type,
650        verbose=False
651    )
652
653    # Get inference outputs
654    root_inference_path = next(output_dir_root.glob('*.png'), None)
655    shoot_inference_path = next(output_dir_shoot.glob('*.png'), None)
656
657    assert root_inference_path, "Root inference failed - no output mask found"
658    assert shoot_inference_path, "Shoot inference failed - no output mask found"
659
660    # Clean shoot mask
661    results = clean_shoot_mask_pipeline(
662        shoot_inference_path, 
663        GLOBAL_STATS, 
664        quality_check_threshold=2.0
665    )
666    cleaned_shoot_mask = results['cleaned_mask']
667    
668    # Repair root mask
669    repaired_root_mask = repair_root_mask_edges(root_inference_path)
670
671    # Run complete matching pipeline
672    df = match_roots_to_shoots_complete(
673        shoot_mask=cleaned_shoot_mask,
674        root_mask=repaired_root_mask,
675        image_path=plate_image_path,
676        config=config,
677        return_dataframe=True,
678        sample_idx=0,
679        verbose=True
680    )
681
682    return df

Extract root tip coordinates from specimen image using complete CV pipeline.

Runs the full computer vision workflow: U-Net inference for segmentation, mask cleaning and repair, root-shoot matching, and coordinate conversion to robot frame. Returns DataFrame with measurements and coordinates for all 5 plants.

Arguments:
  • plate_image_path (str or Path): Path to specimen plate image.
  • shoot_model (str or Path): Path to trained shoot segmentation U-Net model (.h5).
  • root_model (str or Path): Path to trained root segmentation U-Net model (.h5).
  • temp_path (str or Path, optional): Directory for intermediate outputs. Creates temporary directory if None. Defaults to None.
Returns:

pd.DataFrame: DataFrame with 5 rows (one per plant) containing: - plant_order: 1-5 (left to right) - Plant ID: Image identifier - Length (px): Root length in pixels - top_node_x/y: Pixel coordinates of root connection point - endpoint_x/y: Pixel coordinates of root tip - top_node_robot_x/y/z: Robot coordinates of connection point (meters) - endpoint_robot_x/y/z: Robot coordinates of root tip (meters)

Examples:
>>> df = get_coord_from_plate(
...     'specimen_01.png',
...     'models/shoot_model.h5',
...     'models/root_model.h5'
... )
>>> print(df[['plant_order', 'Length (px)', 'endpoint_robot_x', 'endpoint_robot_y']])
Notes:
  • Uses pre-computed GLOBAL_STATS for shoot mask quality validation
  • Applies edge repair to root masks for improved skeleton quality
  • Returns NaN coordinates for ungerminated seeds (no detected root)
  • All robot coordinates are in meters relative to simulation origin