library.robot_driver
Robot driver module for OT-2 simulation control and computer vision integration.
This module provides the RobotDriver class for controlling the OT-2 robot in simulation using either PID or RL controllers, and the get_coord_from_plate function for processing specimen images through the complete computer vision pipeline to extract root tip coordinates.
The RobotDriver handles:
- Moving the robot to target positions
- Dispensing inoculum at root tip locations
- Supporting both PID and RL control strategies
- Settling detection and tolerance checking
The get_coord_from_plate function integrates:
- U-Net inference for shoot and root segmentation
- Shoot mask cleaning and validation
- Root mask edge repair
- Complete root-shoot matching pipeline
- Coordinate conversion to robot frame
Typical Usage:
>>> from robot_driver import RobotDriver, get_coord_from_plate >>> >>> # Get coordinates from specimen image >>> df = get_coord_from_plate( ... plate_image_path='path/to/image.png', ... shoot_model='models/shoot_unet.h5', ... root_model='models/root_unet.h5' ... ) >>> >>> # Create robot driver with PID controller >>> driver = RobotDriver(sim, pid_controller) >>> >>> # Move to each endpoint and dispense >>> for idx, row in df.iterrows(): ... if not np.isnan(row['endpoint_robot_x']): ... target = [row['endpoint_robot_x'], row['endpoint_robot_y'], ... row['endpoint_robot_z'], 1] ... driver.move_to(target, dispense_pause=60)
1"""Robot driver module for OT-2 simulation control and computer vision integration. 2 3This module provides the RobotDriver class for controlling the OT-2 robot in simulation 4using either PID or RL controllers, and the get_coord_from_plate function for processing 5specimen images through the complete computer vision pipeline to extract root tip coordinates. 6 7The RobotDriver handles: 8- Moving the robot to target positions 9- Dispensing inoculum at root tip locations 10- Supporting both PID and RL control strategies 11- Settling detection and tolerance checking 12 13The get_coord_from_plate function integrates: 14- U-Net inference for shoot and root segmentation 15- Shoot mask cleaning and validation 16- Root mask edge repair 17- Complete root-shoot matching pipeline 18- Coordinate conversion to robot frame 19 20Typical Usage: 21 >>> from robot_driver import RobotDriver, get_coord_from_plate 22 >>> 23 >>> # Get coordinates from specimen image 24 >>> df = get_coord_from_plate( 25 ... plate_image_path='path/to/image.png', 26 ... shoot_model='models/shoot_unet.h5', 27 ... root_model='models/root_unet.h5' 28 ... ) 29 >>> 30 >>> # Create robot driver with PID controller 31 >>> driver = RobotDriver(sim, pid_controller) 32 >>> 33 >>> # Move to each endpoint and dispense 34 >>> for idx, row in df.iterrows(): 35 ... if not np.isnan(row['endpoint_robot_x']): 36 ... target = [row['endpoint_robot_x'], row['endpoint_robot_y'], 37 ... row['endpoint_robot_z'], 1] 38 ... driver.move_to(target, dispense_pause=60) 39""" 40 41import numpy as np 42import random 43import tempfile 44from pathlib import Path 45from library.robot_control import * 46from library.mask_processing import repair_root_mask_edges 47from library.run_inference import run_inference 48from library.shoot_mask_cleaning import calculate_global_y_stats, clean_shoot_mask_pipeline 49from library.root_shoot_matching import RootMatchingConfig, match_roots_to_shoots_complete 50 51 52# Global Y-pixel statistics from inference batch 53 54GLOBAL_STATS = {'global_mean': 522.6373561098932, 55 'global_std': 40.64363903376843, 56 'all_stats': [{'mean': 522.4183627153725, 'std': 67.58658539471621}, 57 {'mean': 452.95398262798005, 'std': 41.56525140223841}, 58 {'mean': 490.8661926308985, 'std': 27.346885868291096}, 59 {'mean': 491.92876883767013, 'std': 24.893401245590812}, 60 {'mean': 551.0319193616127, 'std': 13.30130229117151}, 61 {'mean': 543.4997856836691, 'std': 20.34707684591397}, 62 {'mean': 507.7737608193482, 'std': 58.07985726549244}, 63 {'mean': 497.31604365278474, 'std': 48.721657949404666}, 64 {'mean': 482.4776291596063, 'std': 32.85790632020763}, 65 {'mean': 506.78758850478965, 'std': 20.708410447953284}, 66 {'mean': 657.317774466855, 'std': 72.20521285868364}, 67 {'mean': 510.7383455353232, 'std': 30.858226987648244}, 68 {'mean': 486.4174450352062, 'std': 77.22634688922165}, 69 {'mean': 536.5708980097133, 'std': 43.147085974995804}, 70 {'mean': 476.0206638821719, 'std': 21.836197778799253}, 71 {'mean': 517.2567251461988, 'std': 19.93044627424427}, 72 {'mean': 638.9460841916085, 'std': 72.86684596583322}, 73 {'mean': 537.351987845024, 'std': 19.926337738115237}, 74 {'mean': 522.4358079821378, 'std': 58.824106143078836}], 75 'all_means': [522.4183627153725, 76 452.95398262798005, 77 490.8661926308985, 78 491.92876883767013, 79 551.0319193616127, 80 543.4997856836691, 81 507.7737608193482, 82 497.31604365278474, 83 482.4776291596063, 84 506.78758850478965, 85 657.317774466855, 86 510.7383455353232, 87 486.4174450352062, 88 536.5708980097133, 89 476.0206638821719, 90 517.2567251461988, 91 638.9460841916085, 92 537.351987845024, 93 522.4358079821378], 94 'all_stds': [67.58658539471621, 95 41.56525140223841, 96 27.346885868291096, 97 24.893401245590812, 98 13.30130229117151, 99 20.34707684591397, 100 58.07985726549244, 101 48.721657949404666, 102 32.85790632020763, 103 20.708410447953284, 104 72.20521285868364, 105 30.858226987648244, 106 77.22634688922165, 107 43.147085974995804, 108 21.836197778799253, 109 19.93044627424427, 110 72.86684596583322, 111 19.926337738115237, 112 58.824106143078836], 113 'y_min': 200, 114 'y_max': 750} 115 116 117def pause(sim, steps): 118 """Pause simulation for specified number of steps. 119 120 Sends zero-velocity commands to maintain robot position without movement. 121 122 Args: 123 sim (object): Simulation environment instance with run() method. 124 steps (int): Number of simulation steps to pause. 125 126 Returns: 127 None 128 """ 129 sim.run([[0, 0, 0, 0]], num_steps=steps) 130 131 132class RobotDriver: 133 """Driver class for controlling OT-2 robot in simulation. 134 135 Provides high-level control interface for moving the robot to target positions 136 and dispensing inoculum. Supports both PID and RL-based controllers with 137 automatic settling detection and position tolerance checking. 138 139 Attributes: 140 sim (object): Simulation environment instance. 141 controller (object): Either PID controller (with axes attribute) or RL model. 142 settle_steps (int): Number of consecutive steps within tolerance to consider settled. 143 max_steps (int): Maximum steps allowed per movement attempt. 144 pause_steps (int): Default pause duration after dispensing. 145 tolerance (float): Position error threshold in meters for settling detection. 146 147 Examples: 148 >>> # With PID controller 149 >>> from library.robot_control import PIDController 150 >>> pid = PIDController() 151 >>> driver = RobotDriver(sim, pid, settle_steps=10, tolerance=0.0005) 152 >>> 153 >>> # Move to position and dispense 154 >>> target = [0.15, 0.10, 0.17, 1] # [x, y, z, dispense] 155 >>> success = driver.move_to(target, dispense_pause=60) 156 >>> 157 >>> # Move without dispensing 158 >>> target = [0.15, 0.10, 0.17, 0] 159 >>> success = driver.move_to(target) 160 """ 161 162 def __init__(self, sim, controller, settle_steps=10, max_steps=350, pause_steps=60, 163 tolerance=0.0005): 164 """Initialize RobotDriver. 165 166 Args: 167 sim (object): Simulation environment instance with run() method. 168 controller (object): PID controller or RL model for generating actions. 169 settle_steps (int, optional): Steps within tolerance to confirm settling. Defaults to 10. 170 max_steps (int, optional): Maximum steps per movement. Defaults to 350. 171 pause_steps (int, optional): Default pause after dispensing. Defaults to 60. 172 tolerance (float, optional): Position error threshold in meters. Defaults to 0.0005. 173 """ 174 self.sim = sim 175 self.controller = controller 176 self.settle_steps = settle_steps 177 self.max_steps = max_steps 178 self.pause_steps = pause_steps 179 self.tolerance = tolerance 180 181 def move_to(self, target, dispense_pause=60, pause_after=False): 182 """Move robot to target position with optional dispensing. 183 184 Moves the robot end-effector to the specified 3D target position using the 185 configured controller. Optionally dispenses inoculum upon arrival and pauses 186 afterward for settling. 187 188 Args: 189 target (list): Target as [x, y, z, dispense] where x, y, z are floats in meters 190 and dispense is 0 (no dispense) or 1 (dispense). 191 dispense_pause (int, optional): Pause duration after dispensing in steps. Defaults to 60. 192 pause_after (bool, optional): Whether to pause after movement completes. Defaults to False. 193 194 Returns: 195 bool: True if target was reached within tolerance, False otherwise. 196 197 Raises: 198 ValueError: If target is not a 4-element list. 199 200 Examples: 201 >>> # Move and dispense 202 >>> success = driver.move_to([0.15, 0.10, 0.17, 1], dispense_pause=60) 203 >>> 204 >>> # Move only 205 >>> success = driver.move_to([0.15, 0.10, 0.17, 0]) 206 """ 207 # Parse target [x, y, z, dispense] 208 if len(target) != 4: 209 raise ValueError("Target must be [x, y, z, dispense] where dispense is 0 or 1") 210 211 target_pos = {'x': target[0], 'y': target[1], 'z': target[2]} 212 dispense = bool(target[3]) 213 214 is_pid = self._is_pid_controller() 215 216 if is_pid: 217 success = self._move_to_pid(target_pos) 218 else: 219 success = self._move_to_rl(target_pos) 220 221 if success and dispense: 222 self._dispense(dispense_pause=dispense_pause) 223 224 if success and pause_after: 225 pause(self.sim, self.pause_steps) 226 227 return success 228 229 def move_to_with_metrics(self, target, dispense_pause=60, pause_after=False): 230 """Move robot to target with comprehensive metrics tracking. 231 232 High-level method that moves to target, tracks steps taken, calculates 233 final position error, and handles dispensing. Automatically detects 234 controller type (PID or RL) and uses appropriate control method. 235 236 Args: 237 target (list): Target as [x, y, z, dispense] where x, y, z are floats 238 in meters and dispense is 0 (no dispense) or 1 (dispense). 239 dispense_pause (int, optional): Pause duration after dispensing in steps. 240 Defaults to 60. 241 pause_after (bool, optional): Whether to pause after movement completes. 242 Defaults to False. 243 244 Returns: 245 dict: Metrics dictionary containing: 246 - 'success' (bool): Whether target was reached within tolerance 247 - 'steps' (int): Number of steps taken 248 - 'target' (dict): Target coordinates {'x', 'y', 'z'} 249 - 'final_position' (dict): Final position {'x', 'y', 'z'} 250 - 'error_euclidean' (float): 3D Euclidean distance error in meters 251 - 'error_x' (float): X-axis error in meters 252 - 'error_y' (float): Y-axis error in meters 253 - 'error_z' (float): Z-axis error in meters 254 - 'dispensed' (bool): Whether dispensing occurred 255 256 Raises: 257 ValueError: If target is not a 4-element list. 258 259 Example: 260 >>> target = [0.15, 0.10, 0.17, 1] 261 >>> metrics = driver.move_to_with_metrics(target) 262 >>> print(f"Success: {metrics['success']}, Steps: {metrics['steps']}") 263 >>> print(f"Error: {metrics['error_euclidean']*1000:.3f}mm") 264 """ 265 if len(target) != 4: 266 raise ValueError("Target must be [x, y, z, dispense]") 267 268 target_dict = {'x': target[0], 'y': target[1], 'z': target[2]} 269 dispense = target[3] 270 271 # Move to target using appropriate controller 272 if self._is_pid_controller(): 273 success, steps = self._move_to_pid_metrics(target_dict, tolerance=self.tolerance) 274 else: 275 success, steps = self._move_to_rl_metrics(target_dict, tolerance=self.tolerance) 276 277 # Get final position 278 final_pos = get_position(self.sim) 279 final_dict = {'x': final_pos[0], 'y': final_pos[1], 'z': final_pos[2]} 280 281 # Calculate errors 282 error_x = target_dict['x'] - final_pos[0] 283 error_y = target_dict['y'] - final_pos[1] 284 error_z = target_dict['z'] - final_pos[2] 285 error_euclidean = np.sqrt(error_x**2 + error_y**2 + error_z**2) 286 287 # Handle dispensing 288 dispensed = False 289 if dispense == 1 and success: 290 self.sim.dispense() 291 pause(self.sim, dispense_pause) 292 dispensed = True 293 294 if pause_after: 295 pause(self.sim, self.pause_steps) 296 297 # Build metrics dictionary 298 metrics = { 299 'success': success, 300 'steps': steps, 301 'target': target_dict.copy(), 302 'final_position': final_dict, 303 'error_euclidean': float(error_euclidean), 304 'error_x': float(error_x), 305 'error_y': float(error_y), 306 'error_z': float(error_z), 307 'dispensed': dispensed 308 } 309 310 return metrics 311 312 313 def _dispense(self, dispense_pause=60): 314 """Dispense a single drop with variable pause. 315 316 Args: 317 dispense_pause (int, optional): Pause duration in simulation steps. Defaults to 60. 318 319 Returns: 320 None 321 """ 322 dispense = [[0, 0, 0, 1]] 323 pause = [[0, 0, 0, 0]] 324 self.sim.run(dispense, num_steps=1) 325 self.sim.run(pause, num_steps=dispense_pause) 326 327 def _is_pid_controller(self): 328 """Check if controller is PID or RL model. 329 330 Returns: 331 bool: True if controller is PID (has axes attribute), False for RL. 332 """ 333 return hasattr(self.controller, 'axes') 334 335 def _move_to_pid(self, target): 336 """Move to target using PID controller with settling detection. 337 338 Args: 339 target (dict): Target position with keys 'x', 'y', 'z' in meters. 340 341 Returns: 342 bool: True if target reached and settled, False if max_steps exceeded. 343 """ 344 pid_controller = self.controller 345 for axis in ['x', 'y', 'z']: 346 pid_controller.axes[axis].setpoint = target[axis] 347 348 settled_count = 0 349 350 for step in range(self.max_steps): 351 current_pos = get_position(self.sim) 352 current_dict = {'x': float(current_pos[0]), 'y': float(current_pos[1]), 'z': float(current_pos[2])} 353 354 error_x = target['x'] - current_pos[0] 355 error_y = target['y'] - current_pos[1] 356 error_z = target['z'] - current_pos[2] 357 total_error = (error_x**2 + error_y**2 + error_z**2)**0.5 358 359 velocities = pid_controller(current_dict, 0.01) 360 361 # Check both position error and velocity magnitude 362 velocity_magnitude = (velocities['x']**2 + velocities['y']**2 + velocities['z']**2)**0.5 363 364 if total_error <= self.tolerance and velocity_magnitude <= 0.01: 365 settled_count += 1 366 if settled_count >= self.settle_steps: 367 print(f'target reached and settled at step {step}') 368 return True 369 else: 370 settled_count = 0 371 372 actions = [[velocities['x'], velocities['y'], velocities['z'], 0]] 373 self.sim.run(actions, 1) 374 375 return False 376 377 def _move_to_pid_metrics(self, target, tolerance=0.001, velocity_threshold=0.01): 378 """ 379 Move to target using PID controller with settling detection and step tracking. 380 381 Args: 382 target (dict): Target position with keys 'x', 'y', 'z' in meters. 383 tolerance (float, optional): Distance threshold in meters. Defaults to 0.001. 384 velocity_threshold (float, optional): Maximum velocity magnitude for settling. 385 Defaults to 0.01 m/s. 386 387 Returns: 388 tuple: (success, steps) where success is bool and steps is int. 389 """ 390 pid_controller = self.controller 391 392 # Set setpoint for each axis 393 for axis in ['x', 'y', 'z']: 394 pid_controller.axes[axis].setpoint = target[axis] 395 396 settled_count = 0 397 steps = 0 398 399 for step in range(self.max_steps): 400 steps += 1 401 402 current_pos = get_position(self.sim) 403 current_dict = {'x': float(current_pos[0]), 'y': float(current_pos[1]), 'z': float(current_pos[2])} 404 405 error_x = target['x'] - current_pos[0] 406 error_y = target['y'] - current_pos[1] 407 error_z = target['z'] - current_pos[2] 408 total_error = (error_x**2 + error_y**2 + error_z**2)**0.5 409 410 velocities = pid_controller(current_dict, 0.01) 411 412 # Check both position error and velocity magnitude 413 velocity_magnitude = (velocities['x']**2 + velocities['y']**2 + velocities['z']**2)**0.5 414 415 if total_error <= self.tolerance and velocity_magnitude <= velocity_threshold: 416 settled_count += 1 417 if settled_count >= self.settle_steps: 418 print(f'target reached and settled at step {step}') 419 return True, steps 420 else: 421 settled_count = 0 422 423 actions = [[velocities['x'], velocities['y'], velocities['z'], 0]] 424 self.sim.run(actions, 1) 425 426 return False, steps 427 428 429 def _move_to_rl(self, target, tolerance=0.002, velocity_threshold=0.04): 430 """ 431 Move to target using RL controller with single crossing detection. 432 433 Uses single crossing approach: stops commanding on first crossing into 434 tolerance with sufficiently low velocity. More appropriate for systems 435 that naturally hold position when commands stop. 436 437 Args: 438 target (dict): Target position with keys 'x', 'y', 'z' in meters. 439 tolerance (float, optional): Distance threshold in meters. Defaults to 0.002 (2mm). 440 velocity_threshold (float, optional): Maximum velocity magnitude for success. 441 Defaults to 0.04 m/s (from testing: 92% success at 2mm). 442 443 Returns: 444 bool: True if target reached with low velocity, False if max_steps exceeded. 445 446 Notes: 447 - Based on comprehensive testing showing 92% success at 2mm with 0.04 m/s threshold 448 - Uses workspace bounds that must match training configuration 449 - Model outputs normalized actions [-1, 1] scaled to velocities [-2, 2] m/s 450 """ 451 # Workspace bounds (must match training exactly) 452 WORKSPACE_LOW = np.array([-0.1871, -0.1706, 0.1700], dtype=np.float32) 453 WORKSPACE_HIGH = np.array([0.2532, 0.2197, 0.2897], dtype=np.float32) 454 455 def normalize_position(position): 456 """Normalize position from workspace bounds to [-1, 1].""" 457 position = np.asarray(position, dtype=np.float32) 458 normalized = 2.0 * (position - WORKSPACE_LOW) / (WORKSPACE_HIGH - WORKSPACE_LOW) - 1.0 459 return normalized.astype(np.float32) 460 461 def scale_action_to_velocity(action, max_velocity=2.0): 462 """Scale normalized action [-1, 1] to velocity commands.""" 463 action = np.asarray(action, dtype=np.float32) 464 velocity = action * max_velocity 465 return velocity.astype(np.float32) 466 467 # Convert target dict to array 468 goal_position = np.array([target['x'], target['y'], target['z']], dtype=np.float32) 469 goal_normalized = normalize_position(goal_position) 470 471 for step in range(self.max_steps): 472 # Get current position 473 current_pos = np.array(get_position(self.sim), dtype=np.float32) 474 current_normalized = normalize_position(current_pos) 475 476 # Build observation [current_x, current_y, current_z, goal_x, goal_y, goal_z] 477 obs = np.concatenate([current_normalized, goal_normalized], dtype=np.float32) 478 479 # Predict action (use deterministic=False for deployment) 480 action, _states = self.controller.predict(obs, deterministic=False) 481 482 # Scale to velocity 483 velocity = scale_action_to_velocity(action) 484 485 # Calculate distance and velocity magnitude 486 distance = np.linalg.norm(current_pos - goal_position) 487 velocity_magnitude = np.linalg.norm(velocity) 488 489 # Single crossing check - stop on first crossing with low velocity 490 if distance <= tolerance and velocity_magnitude <= velocity_threshold: 491 print(f'target reached at step {step}, distance: {distance*1000:.3f}mm, velocity: {velocity_magnitude:.4f}m/s') 492 return True 493 494 # Send velocity command to simulation 495 actions = [[float(velocity[0]), float(velocity[1]), float(velocity[2]), 0]] 496 self.sim.run(actions, num_steps=1) 497 498 # Max steps reached 499 final_pos = np.array(get_position(self.sim), dtype=np.float32) 500 final_distance = float(np.linalg.norm(final_pos - goal_position)) 501 print(f'max steps reached, final distance: {final_distance*1000:.3f}mm') 502 return False 503 504 def _move_to_rl_metrics(self, target, tolerance=0.002, velocity_threshold=0.04): 505 """ 506 Move to target using RL controller with single crossing detection and step tracking. 507 508 Uses single crossing approach: stops commanding on first crossing into 509 tolerance with sufficiently low velocity. More appropriate for systems 510 that naturally hold position when commands stop. 511 512 Args: 513 target (dict): Target position with keys 'x', 'y', 'z' in meters. 514 tolerance (float, optional): Distance threshold in meters. Defaults to 0.002 (2mm). 515 velocity_threshold (float, optional): Maximum velocity magnitude for success. 516 Defaults to 0.04 m/s (from testing: 92% success at 2mm). 517 518 Returns: 519 tuple: (success, steps) where success is bool and steps is int. 520 521 Notes: 522 - Based on comprehensive testing showing 92% success at 2mm with 0.04 m/s threshold 523 - Uses workspace bounds that must match training configuration 524 - Model outputs normalized actions [-1, 1] scaled to velocities [-2, 2] m/s 525 """ 526 # Workspace bounds (must match training exactly) 527 WORKSPACE_LOW = np.array([-0.1871, -0.1706, 0.1700], dtype=np.float32) 528 WORKSPACE_HIGH = np.array([0.2532, 0.2197, 0.2897], dtype=np.float32) 529 530 def normalize_position(position): 531 """Normalize position from workspace bounds to [-1, 1].""" 532 position = np.asarray(position, dtype=np.float32) 533 normalized = 2.0 * (position - WORKSPACE_LOW) / (WORKSPACE_HIGH - WORKSPACE_LOW) - 1.0 534 return normalized.astype(np.float32) 535 536 def scale_action_to_velocity(action, max_velocity=2.0): 537 """Scale normalized action [-1, 1] to velocity commands.""" 538 action = np.asarray(action, dtype=np.float32) 539 velocity = action * max_velocity 540 return velocity.astype(np.float32) 541 542 # Convert target dict to array 543 goal_position = np.array([target['x'], target['y'], target['z']], dtype=np.float32) 544 goal_normalized = normalize_position(goal_position) 545 546 steps = 0 547 548 for step in range(self.max_steps): 549 steps += 1 550 551 # Get current position 552 current_pos = np.array(get_position(self.sim), dtype=np.float32) 553 current_normalized = normalize_position(current_pos) 554 555 # Build observation [current_x, current_y, current_z, goal_x, goal_y, goal_z] 556 obs = np.concatenate([current_normalized, goal_normalized], dtype=np.float32) 557 558 # Predict action (use deterministic=False for deployment) 559 action, _states = self.controller.predict(obs, deterministic=False) 560 561 # Scale to velocity 562 velocity = scale_action_to_velocity(action) 563 564 # Calculate distance and velocity magnitude 565 distance = np.linalg.norm(current_pos - goal_position) 566 velocity_magnitude = np.linalg.norm(velocity) 567 568 # Single crossing check - stop on first crossing with low velocity 569 if distance <= tolerance and velocity_magnitude <= velocity_threshold: 570 print(f'target reached at step {step}, distance: {distance*1000:.3f}mm, velocity: {velocity_magnitude:.4f}m/s') 571 return True, steps 572 573 # Send velocity command to simulation 574 actions = [[float(velocity[0]), float(velocity[1]), float(velocity[2]), 0]] 575 self.sim.run(actions, num_steps=1) 576 577 # Max steps reached 578 final_pos = np.array(get_position(self.sim), dtype=np.float32) 579 final_distance = float(np.linalg.norm(final_pos - goal_position)) 580 print(f'max steps reached, final distance: {final_distance*1000:.3f}mm') 581 return False, steps 582 583def get_coord_from_plate(plate_image_path, shoot_model, root_model, temp_path=None): 584 """Extract root tip coordinates from specimen image using complete CV pipeline. 585 586 Runs the full computer vision workflow: U-Net inference for segmentation, 587 mask cleaning and repair, root-shoot matching, and coordinate conversion to 588 robot frame. Returns DataFrame with measurements and coordinates for all 5 plants. 589 590 Args: 591 plate_image_path (str or Path): Path to specimen plate image. 592 shoot_model (str or Path): Path to trained shoot segmentation U-Net model (.h5). 593 root_model (str or Path): Path to trained root segmentation U-Net model (.h5). 594 temp_path (str or Path, optional): Directory for intermediate outputs. 595 Creates temporary directory if None. Defaults to None. 596 597 Returns: 598 pd.DataFrame: DataFrame with 5 rows (one per plant) containing: 599 - plant_order: 1-5 (left to right) 600 - Plant ID: Image identifier 601 - Length (px): Root length in pixels 602 - top_node_x/y: Pixel coordinates of root connection point 603 - endpoint_x/y: Pixel coordinates of root tip 604 - top_node_robot_x/y/z: Robot coordinates of connection point (meters) 605 - endpoint_robot_x/y/z: Robot coordinates of root tip (meters) 606 607 Examples: 608 >>> df = get_coord_from_plate( 609 ... 'specimen_01.png', 610 ... 'models/shoot_model.h5', 611 ... 'models/root_model.h5' 612 ... ) 613 >>> print(df[['plant_order', 'Length (px)', 'endpoint_robot_x', 'endpoint_robot_y']]) 614 615 Notes: 616 - Uses pre-computed GLOBAL_STATS for shoot mask quality validation 617 - Applies edge repair to root masks for improved skeleton quality 618 - Returns NaN coordinates for ungerminated seeds (no detected root) 619 - All robot coordinates are in meters relative to simulation origin 620 """ 621 config = RootMatchingConfig() 622 if not temp_path: 623 temp_path = tempfile.mkdtemp() 624 else: 625 temp_path = Path(temp_path) 626 627 root_model = Path(root_model) 628 shoot_model = Path(shoot_model) 629 630 images = [str(plate_image_path)] 631 632 # Run root inference 633 mask_type = "root" 634 n_processed_root, output_dir_root = run_inference( 635 model_path=root_model, 636 image_paths=images, 637 output_dir=temp_path, 638 mask_type=mask_type, 639 verbose=False 640 ) 641 642 # Run shoot inference 643 mask_type = "shoot" 644 n_processed_shoot, output_dir_shoot = run_inference( 645 model_path=shoot_model, 646 image_paths=images, 647 output_dir=temp_path, 648 mask_type=mask_type, 649 verbose=False 650 ) 651 652 # Get inference outputs 653 root_inference_path = next(output_dir_root.glob('*.png'), None) 654 shoot_inference_path = next(output_dir_shoot.glob('*.png'), None) 655 656 assert root_inference_path, "Root inference failed - no output mask found" 657 assert shoot_inference_path, "Shoot inference failed - no output mask found" 658 659 # Clean shoot mask 660 results = clean_shoot_mask_pipeline( 661 shoot_inference_path, 662 GLOBAL_STATS, 663 quality_check_threshold=2.0 664 ) 665 cleaned_shoot_mask = results['cleaned_mask'] 666 667 # Repair root mask 668 repaired_root_mask = repair_root_mask_edges(root_inference_path) 669 670 # Run complete matching pipeline 671 df = match_roots_to_shoots_complete( 672 shoot_mask=cleaned_shoot_mask, 673 root_mask=repaired_root_mask, 674 image_path=plate_image_path, 675 config=config, 676 return_dataframe=True, 677 sample_idx=0, 678 verbose=True 679 ) 680 681 return df
118def pause(sim, steps): 119 """Pause simulation for specified number of steps. 120 121 Sends zero-velocity commands to maintain robot position without movement. 122 123 Args: 124 sim (object): Simulation environment instance with run() method. 125 steps (int): Number of simulation steps to pause. 126 127 Returns: 128 None 129 """ 130 sim.run([[0, 0, 0, 0]], num_steps=steps)
Pause simulation for specified number of steps.
Sends zero-velocity commands to maintain robot position without movement.
Arguments:
- sim (object): Simulation environment instance with run() method.
- steps (int): Number of simulation steps to pause.
Returns:
None
133class RobotDriver: 134 """Driver class for controlling OT-2 robot in simulation. 135 136 Provides high-level control interface for moving the robot to target positions 137 and dispensing inoculum. Supports both PID and RL-based controllers with 138 automatic settling detection and position tolerance checking. 139 140 Attributes: 141 sim (object): Simulation environment instance. 142 controller (object): Either PID controller (with axes attribute) or RL model. 143 settle_steps (int): Number of consecutive steps within tolerance to consider settled. 144 max_steps (int): Maximum steps allowed per movement attempt. 145 pause_steps (int): Default pause duration after dispensing. 146 tolerance (float): Position error threshold in meters for settling detection. 147 148 Examples: 149 >>> # With PID controller 150 >>> from library.robot_control import PIDController 151 >>> pid = PIDController() 152 >>> driver = RobotDriver(sim, pid, settle_steps=10, tolerance=0.0005) 153 >>> 154 >>> # Move to position and dispense 155 >>> target = [0.15, 0.10, 0.17, 1] # [x, y, z, dispense] 156 >>> success = driver.move_to(target, dispense_pause=60) 157 >>> 158 >>> # Move without dispensing 159 >>> target = [0.15, 0.10, 0.17, 0] 160 >>> success = driver.move_to(target) 161 """ 162 163 def __init__(self, sim, controller, settle_steps=10, max_steps=350, pause_steps=60, 164 tolerance=0.0005): 165 """Initialize RobotDriver. 166 167 Args: 168 sim (object): Simulation environment instance with run() method. 169 controller (object): PID controller or RL model for generating actions. 170 settle_steps (int, optional): Steps within tolerance to confirm settling. Defaults to 10. 171 max_steps (int, optional): Maximum steps per movement. Defaults to 350. 172 pause_steps (int, optional): Default pause after dispensing. Defaults to 60. 173 tolerance (float, optional): Position error threshold in meters. Defaults to 0.0005. 174 """ 175 self.sim = sim 176 self.controller = controller 177 self.settle_steps = settle_steps 178 self.max_steps = max_steps 179 self.pause_steps = pause_steps 180 self.tolerance = tolerance 181 182 def move_to(self, target, dispense_pause=60, pause_after=False): 183 """Move robot to target position with optional dispensing. 184 185 Moves the robot end-effector to the specified 3D target position using the 186 configured controller. Optionally dispenses inoculum upon arrival and pauses 187 afterward for settling. 188 189 Args: 190 target (list): Target as [x, y, z, dispense] where x, y, z are floats in meters 191 and dispense is 0 (no dispense) or 1 (dispense). 192 dispense_pause (int, optional): Pause duration after dispensing in steps. Defaults to 60. 193 pause_after (bool, optional): Whether to pause after movement completes. Defaults to False. 194 195 Returns: 196 bool: True if target was reached within tolerance, False otherwise. 197 198 Raises: 199 ValueError: If target is not a 4-element list. 200 201 Examples: 202 >>> # Move and dispense 203 >>> success = driver.move_to([0.15, 0.10, 0.17, 1], dispense_pause=60) 204 >>> 205 >>> # Move only 206 >>> success = driver.move_to([0.15, 0.10, 0.17, 0]) 207 """ 208 # Parse target [x, y, z, dispense] 209 if len(target) != 4: 210 raise ValueError("Target must be [x, y, z, dispense] where dispense is 0 or 1") 211 212 target_pos = {'x': target[0], 'y': target[1], 'z': target[2]} 213 dispense = bool(target[3]) 214 215 is_pid = self._is_pid_controller() 216 217 if is_pid: 218 success = self._move_to_pid(target_pos) 219 else: 220 success = self._move_to_rl(target_pos) 221 222 if success and dispense: 223 self._dispense(dispense_pause=dispense_pause) 224 225 if success and pause_after: 226 pause(self.sim, self.pause_steps) 227 228 return success 229 230 def move_to_with_metrics(self, target, dispense_pause=60, pause_after=False): 231 """Move robot to target with comprehensive metrics tracking. 232 233 High-level method that moves to target, tracks steps taken, calculates 234 final position error, and handles dispensing. Automatically detects 235 controller type (PID or RL) and uses appropriate control method. 236 237 Args: 238 target (list): Target as [x, y, z, dispense] where x, y, z are floats 239 in meters and dispense is 0 (no dispense) or 1 (dispense). 240 dispense_pause (int, optional): Pause duration after dispensing in steps. 241 Defaults to 60. 242 pause_after (bool, optional): Whether to pause after movement completes. 243 Defaults to False. 244 245 Returns: 246 dict: Metrics dictionary containing: 247 - 'success' (bool): Whether target was reached within tolerance 248 - 'steps' (int): Number of steps taken 249 - 'target' (dict): Target coordinates {'x', 'y', 'z'} 250 - 'final_position' (dict): Final position {'x', 'y', 'z'} 251 - 'error_euclidean' (float): 3D Euclidean distance error in meters 252 - 'error_x' (float): X-axis error in meters 253 - 'error_y' (float): Y-axis error in meters 254 - 'error_z' (float): Z-axis error in meters 255 - 'dispensed' (bool): Whether dispensing occurred 256 257 Raises: 258 ValueError: If target is not a 4-element list. 259 260 Example: 261 >>> target = [0.15, 0.10, 0.17, 1] 262 >>> metrics = driver.move_to_with_metrics(target) 263 >>> print(f"Success: {metrics['success']}, Steps: {metrics['steps']}") 264 >>> print(f"Error: {metrics['error_euclidean']*1000:.3f}mm") 265 """ 266 if len(target) != 4: 267 raise ValueError("Target must be [x, y, z, dispense]") 268 269 target_dict = {'x': target[0], 'y': target[1], 'z': target[2]} 270 dispense = target[3] 271 272 # Move to target using appropriate controller 273 if self._is_pid_controller(): 274 success, steps = self._move_to_pid_metrics(target_dict, tolerance=self.tolerance) 275 else: 276 success, steps = self._move_to_rl_metrics(target_dict, tolerance=self.tolerance) 277 278 # Get final position 279 final_pos = get_position(self.sim) 280 final_dict = {'x': final_pos[0], 'y': final_pos[1], 'z': final_pos[2]} 281 282 # Calculate errors 283 error_x = target_dict['x'] - final_pos[0] 284 error_y = target_dict['y'] - final_pos[1] 285 error_z = target_dict['z'] - final_pos[2] 286 error_euclidean = np.sqrt(error_x**2 + error_y**2 + error_z**2) 287 288 # Handle dispensing 289 dispensed = False 290 if dispense == 1 and success: 291 self.sim.dispense() 292 pause(self.sim, dispense_pause) 293 dispensed = True 294 295 if pause_after: 296 pause(self.sim, self.pause_steps) 297 298 # Build metrics dictionary 299 metrics = { 300 'success': success, 301 'steps': steps, 302 'target': target_dict.copy(), 303 'final_position': final_dict, 304 'error_euclidean': float(error_euclidean), 305 'error_x': float(error_x), 306 'error_y': float(error_y), 307 'error_z': float(error_z), 308 'dispensed': dispensed 309 } 310 311 return metrics 312 313 314 def _dispense(self, dispense_pause=60): 315 """Dispense a single drop with variable pause. 316 317 Args: 318 dispense_pause (int, optional): Pause duration in simulation steps. Defaults to 60. 319 320 Returns: 321 None 322 """ 323 dispense = [[0, 0, 0, 1]] 324 pause = [[0, 0, 0, 0]] 325 self.sim.run(dispense, num_steps=1) 326 self.sim.run(pause, num_steps=dispense_pause) 327 328 def _is_pid_controller(self): 329 """Check if controller is PID or RL model. 330 331 Returns: 332 bool: True if controller is PID (has axes attribute), False for RL. 333 """ 334 return hasattr(self.controller, 'axes') 335 336 def _move_to_pid(self, target): 337 """Move to target using PID controller with settling detection. 338 339 Args: 340 target (dict): Target position with keys 'x', 'y', 'z' in meters. 341 342 Returns: 343 bool: True if target reached and settled, False if max_steps exceeded. 344 """ 345 pid_controller = self.controller 346 for axis in ['x', 'y', 'z']: 347 pid_controller.axes[axis].setpoint = target[axis] 348 349 settled_count = 0 350 351 for step in range(self.max_steps): 352 current_pos = get_position(self.sim) 353 current_dict = {'x': float(current_pos[0]), 'y': float(current_pos[1]), 'z': float(current_pos[2])} 354 355 error_x = target['x'] - current_pos[0] 356 error_y = target['y'] - current_pos[1] 357 error_z = target['z'] - current_pos[2] 358 total_error = (error_x**2 + error_y**2 + error_z**2)**0.5 359 360 velocities = pid_controller(current_dict, 0.01) 361 362 # Check both position error and velocity magnitude 363 velocity_magnitude = (velocities['x']**2 + velocities['y']**2 + velocities['z']**2)**0.5 364 365 if total_error <= self.tolerance and velocity_magnitude <= 0.01: 366 settled_count += 1 367 if settled_count >= self.settle_steps: 368 print(f'target reached and settled at step {step}') 369 return True 370 else: 371 settled_count = 0 372 373 actions = [[velocities['x'], velocities['y'], velocities['z'], 0]] 374 self.sim.run(actions, 1) 375 376 return False 377 378 def _move_to_pid_metrics(self, target, tolerance=0.001, velocity_threshold=0.01): 379 """ 380 Move to target using PID controller with settling detection and step tracking. 381 382 Args: 383 target (dict): Target position with keys 'x', 'y', 'z' in meters. 384 tolerance (float, optional): Distance threshold in meters. Defaults to 0.001. 385 velocity_threshold (float, optional): Maximum velocity magnitude for settling. 386 Defaults to 0.01 m/s. 387 388 Returns: 389 tuple: (success, steps) where success is bool and steps is int. 390 """ 391 pid_controller = self.controller 392 393 # Set setpoint for each axis 394 for axis in ['x', 'y', 'z']: 395 pid_controller.axes[axis].setpoint = target[axis] 396 397 settled_count = 0 398 steps = 0 399 400 for step in range(self.max_steps): 401 steps += 1 402 403 current_pos = get_position(self.sim) 404 current_dict = {'x': float(current_pos[0]), 'y': float(current_pos[1]), 'z': float(current_pos[2])} 405 406 error_x = target['x'] - current_pos[0] 407 error_y = target['y'] - current_pos[1] 408 error_z = target['z'] - current_pos[2] 409 total_error = (error_x**2 + error_y**2 + error_z**2)**0.5 410 411 velocities = pid_controller(current_dict, 0.01) 412 413 # Check both position error and velocity magnitude 414 velocity_magnitude = (velocities['x']**2 + velocities['y']**2 + velocities['z']**2)**0.5 415 416 if total_error <= self.tolerance and velocity_magnitude <= velocity_threshold: 417 settled_count += 1 418 if settled_count >= self.settle_steps: 419 print(f'target reached and settled at step {step}') 420 return True, steps 421 else: 422 settled_count = 0 423 424 actions = [[velocities['x'], velocities['y'], velocities['z'], 0]] 425 self.sim.run(actions, 1) 426 427 return False, steps 428 429 430 def _move_to_rl(self, target, tolerance=0.002, velocity_threshold=0.04): 431 """ 432 Move to target using RL controller with single crossing detection. 433 434 Uses single crossing approach: stops commanding on first crossing into 435 tolerance with sufficiently low velocity. More appropriate for systems 436 that naturally hold position when commands stop. 437 438 Args: 439 target (dict): Target position with keys 'x', 'y', 'z' in meters. 440 tolerance (float, optional): Distance threshold in meters. Defaults to 0.002 (2mm). 441 velocity_threshold (float, optional): Maximum velocity magnitude for success. 442 Defaults to 0.04 m/s (from testing: 92% success at 2mm). 443 444 Returns: 445 bool: True if target reached with low velocity, False if max_steps exceeded. 446 447 Notes: 448 - Based on comprehensive testing showing 92% success at 2mm with 0.04 m/s threshold 449 - Uses workspace bounds that must match training configuration 450 - Model outputs normalized actions [-1, 1] scaled to velocities [-2, 2] m/s 451 """ 452 # Workspace bounds (must match training exactly) 453 WORKSPACE_LOW = np.array([-0.1871, -0.1706, 0.1700], dtype=np.float32) 454 WORKSPACE_HIGH = np.array([0.2532, 0.2197, 0.2897], dtype=np.float32) 455 456 def normalize_position(position): 457 """Normalize position from workspace bounds to [-1, 1].""" 458 position = np.asarray(position, dtype=np.float32) 459 normalized = 2.0 * (position - WORKSPACE_LOW) / (WORKSPACE_HIGH - WORKSPACE_LOW) - 1.0 460 return normalized.astype(np.float32) 461 462 def scale_action_to_velocity(action, max_velocity=2.0): 463 """Scale normalized action [-1, 1] to velocity commands.""" 464 action = np.asarray(action, dtype=np.float32) 465 velocity = action * max_velocity 466 return velocity.astype(np.float32) 467 468 # Convert target dict to array 469 goal_position = np.array([target['x'], target['y'], target['z']], dtype=np.float32) 470 goal_normalized = normalize_position(goal_position) 471 472 for step in range(self.max_steps): 473 # Get current position 474 current_pos = np.array(get_position(self.sim), dtype=np.float32) 475 current_normalized = normalize_position(current_pos) 476 477 # Build observation [current_x, current_y, current_z, goal_x, goal_y, goal_z] 478 obs = np.concatenate([current_normalized, goal_normalized], dtype=np.float32) 479 480 # Predict action (use deterministic=False for deployment) 481 action, _states = self.controller.predict(obs, deterministic=False) 482 483 # Scale to velocity 484 velocity = scale_action_to_velocity(action) 485 486 # Calculate distance and velocity magnitude 487 distance = np.linalg.norm(current_pos - goal_position) 488 velocity_magnitude = np.linalg.norm(velocity) 489 490 # Single crossing check - stop on first crossing with low velocity 491 if distance <= tolerance and velocity_magnitude <= velocity_threshold: 492 print(f'target reached at step {step}, distance: {distance*1000:.3f}mm, velocity: {velocity_magnitude:.4f}m/s') 493 return True 494 495 # Send velocity command to simulation 496 actions = [[float(velocity[0]), float(velocity[1]), float(velocity[2]), 0]] 497 self.sim.run(actions, num_steps=1) 498 499 # Max steps reached 500 final_pos = np.array(get_position(self.sim), dtype=np.float32) 501 final_distance = float(np.linalg.norm(final_pos - goal_position)) 502 print(f'max steps reached, final distance: {final_distance*1000:.3f}mm') 503 return False 504 505 def _move_to_rl_metrics(self, target, tolerance=0.002, velocity_threshold=0.04): 506 """ 507 Move to target using RL controller with single crossing detection and step tracking. 508 509 Uses single crossing approach: stops commanding on first crossing into 510 tolerance with sufficiently low velocity. More appropriate for systems 511 that naturally hold position when commands stop. 512 513 Args: 514 target (dict): Target position with keys 'x', 'y', 'z' in meters. 515 tolerance (float, optional): Distance threshold in meters. Defaults to 0.002 (2mm). 516 velocity_threshold (float, optional): Maximum velocity magnitude for success. 517 Defaults to 0.04 m/s (from testing: 92% success at 2mm). 518 519 Returns: 520 tuple: (success, steps) where success is bool and steps is int. 521 522 Notes: 523 - Based on comprehensive testing showing 92% success at 2mm with 0.04 m/s threshold 524 - Uses workspace bounds that must match training configuration 525 - Model outputs normalized actions [-1, 1] scaled to velocities [-2, 2] m/s 526 """ 527 # Workspace bounds (must match training exactly) 528 WORKSPACE_LOW = np.array([-0.1871, -0.1706, 0.1700], dtype=np.float32) 529 WORKSPACE_HIGH = np.array([0.2532, 0.2197, 0.2897], dtype=np.float32) 530 531 def normalize_position(position): 532 """Normalize position from workspace bounds to [-1, 1].""" 533 position = np.asarray(position, dtype=np.float32) 534 normalized = 2.0 * (position - WORKSPACE_LOW) / (WORKSPACE_HIGH - WORKSPACE_LOW) - 1.0 535 return normalized.astype(np.float32) 536 537 def scale_action_to_velocity(action, max_velocity=2.0): 538 """Scale normalized action [-1, 1] to velocity commands.""" 539 action = np.asarray(action, dtype=np.float32) 540 velocity = action * max_velocity 541 return velocity.astype(np.float32) 542 543 # Convert target dict to array 544 goal_position = np.array([target['x'], target['y'], target['z']], dtype=np.float32) 545 goal_normalized = normalize_position(goal_position) 546 547 steps = 0 548 549 for step in range(self.max_steps): 550 steps += 1 551 552 # Get current position 553 current_pos = np.array(get_position(self.sim), dtype=np.float32) 554 current_normalized = normalize_position(current_pos) 555 556 # Build observation [current_x, current_y, current_z, goal_x, goal_y, goal_z] 557 obs = np.concatenate([current_normalized, goal_normalized], dtype=np.float32) 558 559 # Predict action (use deterministic=False for deployment) 560 action, _states = self.controller.predict(obs, deterministic=False) 561 562 # Scale to velocity 563 velocity = scale_action_to_velocity(action) 564 565 # Calculate distance and velocity magnitude 566 distance = np.linalg.norm(current_pos - goal_position) 567 velocity_magnitude = np.linalg.norm(velocity) 568 569 # Single crossing check - stop on first crossing with low velocity 570 if distance <= tolerance and velocity_magnitude <= velocity_threshold: 571 print(f'target reached at step {step}, distance: {distance*1000:.3f}mm, velocity: {velocity_magnitude:.4f}m/s') 572 return True, steps 573 574 # Send velocity command to simulation 575 actions = [[float(velocity[0]), float(velocity[1]), float(velocity[2]), 0]] 576 self.sim.run(actions, num_steps=1) 577 578 # Max steps reached 579 final_pos = np.array(get_position(self.sim), dtype=np.float32) 580 final_distance = float(np.linalg.norm(final_pos - goal_position)) 581 print(f'max steps reached, final distance: {final_distance*1000:.3f}mm') 582 return False, steps
Driver class for controlling OT-2 robot in simulation.
Provides high-level control interface for moving the robot to target positions and dispensing inoculum. Supports both PID and RL-based controllers with automatic settling detection and position tolerance checking.
Attributes:
- sim (object): Simulation environment instance.
- controller (object): Either PID controller (with axes attribute) or RL model.
- settle_steps (int): Number of consecutive steps within tolerance to consider settled.
- max_steps (int): Maximum steps allowed per movement attempt.
- pause_steps (int): Default pause duration after dispensing.
- tolerance (float): Position error threshold in meters for settling detection.
Examples:
>>> # With PID controller >>> from library.robot_control import PIDController >>> pid = PIDController() >>> driver = RobotDriver(sim, pid, settle_steps=10, tolerance=0.0005) >>> >>> # Move to position and dispense >>> target = [0.15, 0.10, 0.17, 1] # [x, y, z, dispense] >>> success = driver.move_to(target, dispense_pause=60) >>> >>> # Move without dispensing >>> target = [0.15, 0.10, 0.17, 0] >>> success = driver.move_to(target)
163 def __init__(self, sim, controller, settle_steps=10, max_steps=350, pause_steps=60, 164 tolerance=0.0005): 165 """Initialize RobotDriver. 166 167 Args: 168 sim (object): Simulation environment instance with run() method. 169 controller (object): PID controller or RL model for generating actions. 170 settle_steps (int, optional): Steps within tolerance to confirm settling. Defaults to 10. 171 max_steps (int, optional): Maximum steps per movement. Defaults to 350. 172 pause_steps (int, optional): Default pause after dispensing. Defaults to 60. 173 tolerance (float, optional): Position error threshold in meters. Defaults to 0.0005. 174 """ 175 self.sim = sim 176 self.controller = controller 177 self.settle_steps = settle_steps 178 self.max_steps = max_steps 179 self.pause_steps = pause_steps 180 self.tolerance = tolerance
Initialize RobotDriver.
Arguments:
- sim (object): Simulation environment instance with run() method.
- controller (object): PID controller or RL model for generating actions.
- settle_steps (int, optional): Steps within tolerance to confirm settling. Defaults to 10.
- max_steps (int, optional): Maximum steps per movement. Defaults to 350.
- pause_steps (int, optional): Default pause after dispensing. Defaults to 60.
- tolerance (float, optional): Position error threshold in meters. Defaults to 0.0005.
182 def move_to(self, target, dispense_pause=60, pause_after=False): 183 """Move robot to target position with optional dispensing. 184 185 Moves the robot end-effector to the specified 3D target position using the 186 configured controller. Optionally dispenses inoculum upon arrival and pauses 187 afterward for settling. 188 189 Args: 190 target (list): Target as [x, y, z, dispense] where x, y, z are floats in meters 191 and dispense is 0 (no dispense) or 1 (dispense). 192 dispense_pause (int, optional): Pause duration after dispensing in steps. Defaults to 60. 193 pause_after (bool, optional): Whether to pause after movement completes. Defaults to False. 194 195 Returns: 196 bool: True if target was reached within tolerance, False otherwise. 197 198 Raises: 199 ValueError: If target is not a 4-element list. 200 201 Examples: 202 >>> # Move and dispense 203 >>> success = driver.move_to([0.15, 0.10, 0.17, 1], dispense_pause=60) 204 >>> 205 >>> # Move only 206 >>> success = driver.move_to([0.15, 0.10, 0.17, 0]) 207 """ 208 # Parse target [x, y, z, dispense] 209 if len(target) != 4: 210 raise ValueError("Target must be [x, y, z, dispense] where dispense is 0 or 1") 211 212 target_pos = {'x': target[0], 'y': target[1], 'z': target[2]} 213 dispense = bool(target[3]) 214 215 is_pid = self._is_pid_controller() 216 217 if is_pid: 218 success = self._move_to_pid(target_pos) 219 else: 220 success = self._move_to_rl(target_pos) 221 222 if success and dispense: 223 self._dispense(dispense_pause=dispense_pause) 224 225 if success and pause_after: 226 pause(self.sim, self.pause_steps) 227 228 return success
Move robot to target position with optional dispensing.
Moves the robot end-effector to the specified 3D target position using the configured controller. Optionally dispenses inoculum upon arrival and pauses afterward for settling.
Arguments:
- target (list): Target as [x, y, z, dispense] where x, y, z are floats in meters and dispense is 0 (no dispense) or 1 (dispense).
- dispense_pause (int, optional): Pause duration after dispensing in steps. Defaults to 60.
- pause_after (bool, optional): Whether to pause after movement completes. Defaults to False.
Returns:
bool: True if target was reached within tolerance, False otherwise.
Raises:
- ValueError: If target is not a 4-element list.
Examples:
>>> # Move and dispense >>> success = driver.move_to([0.15, 0.10, 0.17, 1], dispense_pause=60) >>> >>> # Move only >>> success = driver.move_to([0.15, 0.10, 0.17, 0])
230 def move_to_with_metrics(self, target, dispense_pause=60, pause_after=False): 231 """Move robot to target with comprehensive metrics tracking. 232 233 High-level method that moves to target, tracks steps taken, calculates 234 final position error, and handles dispensing. Automatically detects 235 controller type (PID or RL) and uses appropriate control method. 236 237 Args: 238 target (list): Target as [x, y, z, dispense] where x, y, z are floats 239 in meters and dispense is 0 (no dispense) or 1 (dispense). 240 dispense_pause (int, optional): Pause duration after dispensing in steps. 241 Defaults to 60. 242 pause_after (bool, optional): Whether to pause after movement completes. 243 Defaults to False. 244 245 Returns: 246 dict: Metrics dictionary containing: 247 - 'success' (bool): Whether target was reached within tolerance 248 - 'steps' (int): Number of steps taken 249 - 'target' (dict): Target coordinates {'x', 'y', 'z'} 250 - 'final_position' (dict): Final position {'x', 'y', 'z'} 251 - 'error_euclidean' (float): 3D Euclidean distance error in meters 252 - 'error_x' (float): X-axis error in meters 253 - 'error_y' (float): Y-axis error in meters 254 - 'error_z' (float): Z-axis error in meters 255 - 'dispensed' (bool): Whether dispensing occurred 256 257 Raises: 258 ValueError: If target is not a 4-element list. 259 260 Example: 261 >>> target = [0.15, 0.10, 0.17, 1] 262 >>> metrics = driver.move_to_with_metrics(target) 263 >>> print(f"Success: {metrics['success']}, Steps: {metrics['steps']}") 264 >>> print(f"Error: {metrics['error_euclidean']*1000:.3f}mm") 265 """ 266 if len(target) != 4: 267 raise ValueError("Target must be [x, y, z, dispense]") 268 269 target_dict = {'x': target[0], 'y': target[1], 'z': target[2]} 270 dispense = target[3] 271 272 # Move to target using appropriate controller 273 if self._is_pid_controller(): 274 success, steps = self._move_to_pid_metrics(target_dict, tolerance=self.tolerance) 275 else: 276 success, steps = self._move_to_rl_metrics(target_dict, tolerance=self.tolerance) 277 278 # Get final position 279 final_pos = get_position(self.sim) 280 final_dict = {'x': final_pos[0], 'y': final_pos[1], 'z': final_pos[2]} 281 282 # Calculate errors 283 error_x = target_dict['x'] - final_pos[0] 284 error_y = target_dict['y'] - final_pos[1] 285 error_z = target_dict['z'] - final_pos[2] 286 error_euclidean = np.sqrt(error_x**2 + error_y**2 + error_z**2) 287 288 # Handle dispensing 289 dispensed = False 290 if dispense == 1 and success: 291 self.sim.dispense() 292 pause(self.sim, dispense_pause) 293 dispensed = True 294 295 if pause_after: 296 pause(self.sim, self.pause_steps) 297 298 # Build metrics dictionary 299 metrics = { 300 'success': success, 301 'steps': steps, 302 'target': target_dict.copy(), 303 'final_position': final_dict, 304 'error_euclidean': float(error_euclidean), 305 'error_x': float(error_x), 306 'error_y': float(error_y), 307 'error_z': float(error_z), 308 'dispensed': dispensed 309 } 310 311 return metrics
Move robot to target with comprehensive metrics tracking.
High-level method that moves to target, tracks steps taken, calculates final position error, and handles dispensing. Automatically detects controller type (PID or RL) and uses appropriate control method.
Arguments:
- target (list): Target as [x, y, z, dispense] where x, y, z are floats in meters and dispense is 0 (no dispense) or 1 (dispense).
- dispense_pause (int, optional): Pause duration after dispensing in steps. Defaults to 60.
- pause_after (bool, optional): Whether to pause after movement completes. Defaults to False.
Returns:
dict: Metrics dictionary containing: - 'success' (bool): Whether target was reached within tolerance - 'steps' (int): Number of steps taken - 'target' (dict): Target coordinates {'x', 'y', 'z'} - 'final_position' (dict): Final position {'x', 'y', 'z'} - 'error_euclidean' (float): 3D Euclidean distance error in meters - 'error_x' (float): X-axis error in meters - 'error_y' (float): Y-axis error in meters - 'error_z' (float): Z-axis error in meters - 'dispensed' (bool): Whether dispensing occurred
Raises:
- ValueError: If target is not a 4-element list.
Example:
>>> target = [0.15, 0.10, 0.17, 1] >>> metrics = driver.move_to_with_metrics(target) >>> print(f"Success: {metrics['success']}, Steps: {metrics['steps']}") >>> print(f"Error: {metrics['error_euclidean']*1000:.3f}mm")
584def get_coord_from_plate(plate_image_path, shoot_model, root_model, temp_path=None): 585 """Extract root tip coordinates from specimen image using complete CV pipeline. 586 587 Runs the full computer vision workflow: U-Net inference for segmentation, 588 mask cleaning and repair, root-shoot matching, and coordinate conversion to 589 robot frame. Returns DataFrame with measurements and coordinates for all 5 plants. 590 591 Args: 592 plate_image_path (str or Path): Path to specimen plate image. 593 shoot_model (str or Path): Path to trained shoot segmentation U-Net model (.h5). 594 root_model (str or Path): Path to trained root segmentation U-Net model (.h5). 595 temp_path (str or Path, optional): Directory for intermediate outputs. 596 Creates temporary directory if None. Defaults to None. 597 598 Returns: 599 pd.DataFrame: DataFrame with 5 rows (one per plant) containing: 600 - plant_order: 1-5 (left to right) 601 - Plant ID: Image identifier 602 - Length (px): Root length in pixels 603 - top_node_x/y: Pixel coordinates of root connection point 604 - endpoint_x/y: Pixel coordinates of root tip 605 - top_node_robot_x/y/z: Robot coordinates of connection point (meters) 606 - endpoint_robot_x/y/z: Robot coordinates of root tip (meters) 607 608 Examples: 609 >>> df = get_coord_from_plate( 610 ... 'specimen_01.png', 611 ... 'models/shoot_model.h5', 612 ... 'models/root_model.h5' 613 ... ) 614 >>> print(df[['plant_order', 'Length (px)', 'endpoint_robot_x', 'endpoint_robot_y']]) 615 616 Notes: 617 - Uses pre-computed GLOBAL_STATS for shoot mask quality validation 618 - Applies edge repair to root masks for improved skeleton quality 619 - Returns NaN coordinates for ungerminated seeds (no detected root) 620 - All robot coordinates are in meters relative to simulation origin 621 """ 622 config = RootMatchingConfig() 623 if not temp_path: 624 temp_path = tempfile.mkdtemp() 625 else: 626 temp_path = Path(temp_path) 627 628 root_model = Path(root_model) 629 shoot_model = Path(shoot_model) 630 631 images = [str(plate_image_path)] 632 633 # Run root inference 634 mask_type = "root" 635 n_processed_root, output_dir_root = run_inference( 636 model_path=root_model, 637 image_paths=images, 638 output_dir=temp_path, 639 mask_type=mask_type, 640 verbose=False 641 ) 642 643 # Run shoot inference 644 mask_type = "shoot" 645 n_processed_shoot, output_dir_shoot = run_inference( 646 model_path=shoot_model, 647 image_paths=images, 648 output_dir=temp_path, 649 mask_type=mask_type, 650 verbose=False 651 ) 652 653 # Get inference outputs 654 root_inference_path = next(output_dir_root.glob('*.png'), None) 655 shoot_inference_path = next(output_dir_shoot.glob('*.png'), None) 656 657 assert root_inference_path, "Root inference failed - no output mask found" 658 assert shoot_inference_path, "Shoot inference failed - no output mask found" 659 660 # Clean shoot mask 661 results = clean_shoot_mask_pipeline( 662 shoot_inference_path, 663 GLOBAL_STATS, 664 quality_check_threshold=2.0 665 ) 666 cleaned_shoot_mask = results['cleaned_mask'] 667 668 # Repair root mask 669 repaired_root_mask = repair_root_mask_edges(root_inference_path) 670 671 # Run complete matching pipeline 672 df = match_roots_to_shoots_complete( 673 shoot_mask=cleaned_shoot_mask, 674 root_mask=repaired_root_mask, 675 image_path=plate_image_path, 676 config=config, 677 return_dataframe=True, 678 sample_idx=0, 679 verbose=True 680 ) 681 682 return df
Extract root tip coordinates from specimen image using complete CV pipeline.
Runs the full computer vision workflow: U-Net inference for segmentation, mask cleaning and repair, root-shoot matching, and coordinate conversion to robot frame. Returns DataFrame with measurements and coordinates for all 5 plants.
Arguments:
- plate_image_path (str or Path): Path to specimen plate image.
- shoot_model (str or Path): Path to trained shoot segmentation U-Net model (.h5).
- root_model (str or Path): Path to trained root segmentation U-Net model (.h5).
- temp_path (str or Path, optional): Directory for intermediate outputs. Creates temporary directory if None. Defaults to None.
Returns:
pd.DataFrame: DataFrame with 5 rows (one per plant) containing: - plant_order: 1-5 (left to right) - Plant ID: Image identifier - Length (px): Root length in pixels - top_node_x/y: Pixel coordinates of root connection point - endpoint_x/y: Pixel coordinates of root tip - top_node_robot_x/y/z: Robot coordinates of connection point (meters) - endpoint_robot_x/y/z: Robot coordinates of root tip (meters)
Examples:
>>> df = get_coord_from_plate( ... 'specimen_01.png', ... 'models/shoot_model.h5', ... 'models/root_model.h5' ... ) >>> print(df[['plant_order', 'Length (px)', 'endpoint_robot_x', 'endpoint_robot_y']])
Notes:
- Uses pre-computed GLOBAL_STATS for shoot mask quality validation
- Applies edge repair to root masks for improved skeleton quality
- Returns NaN coordinates for ungerminated seeds (no detected root)
- All robot coordinates are in meters relative to simulation origin