library.simulation
PyBullet simulation package for OT-2 gantry robot with specimen plates.
This package provides a simulation environment for liquid handling robotics with support for multiple agents, droplet physics, and collision detection.
1""" 2PyBullet simulation package for OT-2 gantry robot with specimen plates. 3 4This package provides a simulation environment for liquid handling robotics 5with support for multiple agents, droplet physics, and collision detection. 6""" 7 8from .sim_class import Simulation 9 10__all__ = ['Simulation']
class
Simulation:
12class Simulation: 13 def __init__(self, num_agents, render=True, rgb_array=False): 14 self.render = render 15 self.rgb_array = rgb_array 16 if render: 17 mode = p.GUI # for graphical version 18 else: 19 mode = p.DIRECT # for non-graphical version 20 # Set up the simulation 21 self.physicsClient = p.connect(mode) 22 # Hide the default GUI components 23 p.configureDebugVisualizer(p.COV_ENABLE_GUI, 0) 24 p.setAdditionalSearchPath(pybullet_data.getDataPath()) #optionally 25 p.setGravity(0,0,-10) 26 #p.setPhysicsEngineParameter(contactBreakingThreshold=0.000001) 27 # load a texture 28 # Get the directory where this file is located 29 self.current_dir = os.path.dirname(os.path.abspath(__file__)) 30 textures_dir = os.path.join(self.current_dir, "textures") 31 plates_dir = os.path.join(textures_dir, "_plates") 32 33 # Get only PNG files from textures directory (exclude directories and system files) 34 texture_list = sorted([f for f in os.listdir(textures_dir) if f.endswith('.png')]) 35 plates_list = sorted(os.listdir(plates_dir)) 36 37 # Select a random texture and use same index for corresponding plate image 38 random_texture = random.choice(texture_list) 39 random_texture_index = texture_list.index(random_texture) 40 self.plate_image_path = os.path.join(plates_dir, plates_list[random_texture_index]) 41 self.textureId = p.loadTexture(os.path.join(textures_dir, random_texture)) 42 #print(f'textureId: {self.textureId}') 43 44 # Set the camera parameters 45 cameraDistance = 1.1*(math.ceil((num_agents)**0.3)) # Distance from the target (zoom) 46 cameraYaw = 90 # Rotation around the vertical axis in degrees 47 cameraPitch = -35 # Rotation around the horizontal axis in degrees 48 cameraTargetPosition = [-0.2, -(math.ceil(num_agents**0.5)/2)+0.5, 0.1] # XYZ coordinates of the target position 49 50 # Reset the camera with the specified parameters 51 p.resetDebugVisualizerCamera(cameraDistance, cameraYaw, cameraPitch, cameraTargetPosition) 52 53 self.baseplaneId = p.loadURDF("plane.urdf") 54 # add collision shape to the plane 55 #p.createCollisionShape(shapeType=p.GEOM_BOX, halfExtents=[30, 305, 0.001]) 56 57 # define the pipette offset 58 self.pipette_offset = [0.073, 0.0895, 0.0895] 59 # dictionary to keep track of the current pipette position per robot 60 self.pipette_positions = {} 61 62 # Create the robots 63 self.create_robots(num_agents) 64 65 # list of sphere ids 66 self.sphereIds = [] 67 68 # dictionary to keep track of the droplet positions on specimens key for specimenId, list of droplet positions 69 self.droplet_positions = {} 70 71 # Function to compute view matrix based on these parameters 72 # def compute_camera_view(cameraDistance, cameraYaw, cameraPitch, cameraTargetPosition): 73 # camUpVector = (0, 0, 1) # Up vector in Z-direction 74 # camForward = (1, 0, 0) # Forward vector in X-direction 75 # camTargetPos = cameraTargetPosition 76 # camPos = p.multiplyTransforms(camTargetPos, p.getQuaternionFromEuler((0, 0, 0)), (0, 0, cameraDistance), p.getQuaternionFromEuler((cameraPitch, 0, cameraYaw)))[0] 77 # viewMatrix = p.computeViewMatrix(camPos, camTargetPos, camUpVector) 78 # return viewMatrix 79 80 # Capture the image 81 #self.view_matrix = compute_camera_view(cameraDistance, cameraYaw, cameraPitch, cameraTargetPosition) 82 #.projection_matrix = p.computeProjectionMatrixFOV(fov=60, aspect=640/480, nearVal=0.1, farVal=100) 83 84 # method to create n robots in a grid pattern 85 def create_robots(self, num_agents): 86 spacing = 1 # Adjust the spacing as needed 87 88 # Calculate the grid size to fit all agents 89 grid_size = math.ceil(num_agents ** 0.5) 90 91 self.robotIds = [] 92 self.specimenIds = [] 93 agent_count = 0 # Counter for the number of placed agents 94 95 for i in range(grid_size): 96 for j in range(grid_size): 97 if agent_count < num_agents: # Check if more agents need to be placed 98 # Calculate position for each robot 99 position = [-spacing * i, -spacing * j, 0.03] 100 robotId = p.loadURDF(os.path.join(self.current_dir, "ot_2_simulation_v6.urdf"), position, [0,0,0,1], 101 flags=p.URDF_USE_INERTIA_FROM_FILE) 102 start_position, start_orientation = p.getBasePositionAndOrientation(robotId) 103 p.createConstraint(parentBodyUniqueId=robotId, 104 parentLinkIndex=-1, 105 childBodyUniqueId=-1, 106 childLinkIndex=-1, 107 jointType=p.JOINT_FIXED, 108 jointAxis=[0, 0, 0], 109 parentFramePosition=[0, 0, 0], 110 childFramePosition=start_position, 111 childFrameOrientation=start_orientation) 112 113 # Create a fixed constraint between the robot and the base plane so the robot is fixed in space above the plane by its base link with an offset 114 #p.createConstraint(self.baseplaneId, -1, robotId, -1, p.JOINT_FIXED, [0, 0, 0], position, [0, 0, 0]) 115 116 # Load the specimen with an offset 117 offset = [0.18275-0.00005, 0.163-0.026, 0.057] 118 position_with_offset = [position[0] + offset[0], position[1] + offset[1], position[2] + offset[2]] 119 rotate_90 = p.getQuaternionFromEuler([0, 0, -math.pi/2]) 120 planeId = p.loadURDF(os.path.join(self.current_dir, "custom.urdf"), position_with_offset, rotate_90)#start_orientation) 121 # Disable collision between the robot and the specimen 122 p.setCollisionFilterPair(robotId, planeId, -1, -1, enableCollision=0) 123 spec_position, spec_orientation = p.getBasePositionAndOrientation(planeId) 124 125 #Constrain the specimen to the robot 126 # p.createConstraint(parentBodyUniqueId=robotId, 127 # parentLinkIndex=-1, 128 # childBodyUniqueId=planeId, 129 # childLinkIndex=-1, 130 # jointType=p.JOINT_FIXED, 131 # jointAxis=[0, 0, 0], 132 # parentFramePosition=start_position, 133 # #parentFrameOrientation=start_orientation, 134 # childFramePosition=[0, 0, 0], 135 # childFrameOrientation=[0, 0, 0, 1]) 136 #p.createConstraint(robotId, -1, planeId, -1, p.JOINT_FIXED, [0, 0, 0], offset, [0, 0, 0]) 137 p.createConstraint(parentBodyUniqueId=planeId, 138 parentLinkIndex=-1, 139 childBodyUniqueId=-1, 140 childLinkIndex=-1, 141 jointType=p.JOINT_FIXED, 142 jointAxis=[0, 0, 0], 143 parentFramePosition=[0, 0, 0], 144 childFramePosition=spec_position, 145 childFrameOrientation=spec_orientation) 146 # Load your texture and apply it to the plane 147 #textureId = p.loadTexture("uvmapped_dish_large_comp.png") 148 p.changeVisualShape(planeId, -1, textureUniqueId=self.textureId) 149 150 self.robotIds.append(robotId) 151 self.specimenIds.append(planeId) 152 153 agent_count += 1 # Increment the agent counter 154 155 # calculate the pipette position 156 pipette_position = self.get_pipette_position(robotId) 157 # save the pipette position 158 self.pipette_positions[f'robotId_{robotId}'] = pipette_position 159 160 # method to get the current pipette position for a robot 161 def get_pipette_position(self, robotId): 162 #get the position of the robot 163 robot_position = p.getBasePositionAndOrientation(robotId)[0] 164 robot_position = list(robot_position) 165 joint_states = p.getJointStates(robotId, [0, 1, 2]) 166 robot_position[0] -= joint_states[0][0] 167 robot_position[1] -= joint_states[1][0] 168 robot_position[2] += joint_states[2][0] 169 # x,y offset 170 x_offset = self.pipette_offset[0] 171 y_offset = self.pipette_offset[1] 172 z_offset = self.pipette_offset[2] 173 # Calculate the position of the pipette at the tip of the pipette but at the same z coordinate as the specimen 174 pipette_position = [robot_position[0]+x_offset, robot_position[1]+y_offset, robot_position[2]+z_offset] 175 return pipette_position 176 177 # method to reset the simulation 178 def reset(self, num_agents=1): 179 # Remove the textures from the specimens 180 for specimenId in self.specimenIds: 181 p.changeVisualShape(specimenId, -1, textureUniqueId=-1) 182 183 # Remove the robots 184 for robotId in self.robotIds: 185 p.removeBody(robotId) 186 # remove the robotId from the list of robotIds 187 self.robotIds.remove(robotId) 188 189 # Remove the specimens 190 for specimenId in self.specimenIds: 191 p.removeBody(specimenId) 192 # remove the specimenId from the list of specimenIds 193 self.specimenIds.remove(specimenId) 194 195 # Remove the spheres 196 for sphereId in self.sphereIds: 197 p.removeBody(sphereId) 198 # remove the sphereId from the list of sphereIds 199 self.sphereIds.remove(sphereId) 200 201 # dictionary to keep track of the current pipette position per robot 202 self.pipette_positions = {} 203 # list of sphere ids 204 self.sphereIds = [] 205 # dictionary to keep track of the droplet positions on specimens key for specimenId, list of droplet positions 206 self.droplet_positions = {} 207 208 # Create the robots 209 self.create_robots(num_agents) 210 211 return self.get_states() 212 213 # method to run the simulation for a specified number of steps 214 def run(self, actions, num_steps=1): 215 #self.apply_actions(actions) 216 start = time.time() 217 n = 100 218 for i in range(num_steps): 219 self.apply_actions(actions) 220 p.stepSimulation() 221 222 # reset the droplet after 20 steps 223 # if self.dropped: 224 # self.cooldown += 1 225 # if self.cooldown == self.cooldown_time: 226 # self.dropped = False 227 # self.cooldown = 0 228 229 #compute and display the frames per second every n steps 230 # if i % n == 0: 231 # fps = n / (time.time() - start) 232 # start = time.time() 233 # print(f'fps: {fps}') 234 # #print the orientation of the robot every n steps 235 # for i in range(len(self.robotIds)): 236 # orientation = p.getBasePositionAndOrientation(self.robotIds[i])[1] 237 # #print(f'robot {i} orientation: {orientation}') 238 # #get the position of the link on the z axis 239 # link_state = p.getLinkState(self.robotIds[i], 0) 240 # print(f'robot {i} link_state: {link_state}') 241 # check contact for each robot and specimen 242 for specimenId, robotId in zip(self.specimenIds, self.robotIds): 243 #logging.info(f'checking contact for robotId: {robotId}, specimenId: {specimenId}') 244 self.check_contact(robotId, specimenId) 245 246 if self.rgb_array: 247 # Camera parameters 248 camera_pos = [1, 0, 1] # Example position 249 camera_target = [-0.3, 0, 0] # Point where the camera is looking at 250 up_vector = [0, 0, 1] # Usually the Z-axis is up 251 fov = 50 # Field of view 252 aspect = 320/240 # Aspect ratio (width/height) 253 254 # Get camera image 255 width, height, rgbImg, depthImg, segImg = p.getCameraImage(width=320, height=240, viewMatrix=p.computeViewMatrix(camera_pos, camera_target, up_vector), projectionMatrix=p.computeProjectionMatrixFOV(fov, aspect, 0.1, 100.0)) 256 257 self.current_frame = rgbImg # RGB array 258 #print(self.current_frame) 259 260 if self.render: 261 time.sleep(1./240.) # slow down the simulation 262 263 return self.get_states() 264 265 # method to apply actions to the robots using velocity control 266 def apply_actions(self, actions): # actions [[x,y,z,drop], [x,y,z,drop], ... 267 for i in range(len(self.robotIds)): 268 p.setJointMotorControl2(self.robotIds[i], 0, p.VELOCITY_CONTROL, targetVelocity=-actions[i][0], force=500) 269 p.setJointMotorControl2(self.robotIds[i], 1, p.VELOCITY_CONTROL, targetVelocity=-actions[i][1], force=500) 270 p.setJointMotorControl2(self.robotIds[i], 2, p.VELOCITY_CONTROL, targetVelocity=actions[i][2], force=800) 271 if actions[i][3] == 1: 272 self.drop(robotId=self.robotIds[i]) 273 #logging.info(f'drop: {i}') 274 275 # method to drop a simulated droplet on the specimen from the pipette 276 def drop(self, robotId): 277 # Get the position of the pipette based on the x,y,z coordinates of the joints 278 #robot_position = [0, 0, 0] 279 #get the position of the robot 280 robot_position = p.getBasePositionAndOrientation(robotId)[0] 281 robot_position = list(robot_position) 282 joint_states = p.getJointStates(robotId, [0, 1, 2]) 283 robot_position[0] -= joint_states[0][0] 284 robot_position[1] -= joint_states[1][0] 285 robot_position[2] += joint_states[2][0] 286 # x,y offset 287 x_offset = self.pipette_offset[0] 288 y_offset = self.pipette_offset[1] 289 z_offset = self.pipette_offset[2]-0.0015 290 # Get the position of the specimen 291 specimen_position = p.getBasePositionAndOrientation(self.specimenIds[0])[0] 292 #logging.info(f'droplet_position: {droplet_position}') 293 # Create a sphere to represent the droplet 294 sphereRadius = 0.003 # Adjust as needed 295 sphereColor = [1, 0, 0, 0.5] # RGBA (Red in this case) 296 visualShapeId = p.createVisualShape(shapeType=p.GEOM_SPHERE, radius=sphereRadius, rgbaColor=sphereColor) 297 #add collision to the sphere 298 collision = p.createCollisionShape(shapeType=p.GEOM_SPHERE, radius=sphereRadius) 299 sphereBody = p.createMultiBody(baseMass=0.1, baseVisualShapeIndex=visualShapeId, baseCollisionShapeIndex=collision) 300 # Calculate the position of the droplet at the tip of the pipette but at the same z coordinate as the specimen 301 droplet_position = [robot_position[0]+x_offset, robot_position[1]+y_offset, robot_position[2]+z_offset] 302 #specimen_position[2] + sphereRadius+0.015/2+0.06] 303 p.resetBasePositionAndOrientation(sphereBody, droplet_position, [0, 0, 0, 1]) 304 # track the sphere id 305 self.sphereIds.append(sphereBody) 306 self.dropped = True 307 #TODO: add some randomness to the droplet position proportional to the height of the pipette above the specimen and the velocity of the pipette of the pipette 308 return droplet_position 309 310 # method to get the states of the robots 311 def get_states(self): 312 states = {} 313 for robotId in self.robotIds: 314 raw_joint_states = p.getJointStates(robotId, [0, 1, 2]) 315 316 # Convert joint states into a dictionary 317 joint_states = {} 318 for i, joint_state in enumerate(raw_joint_states): 319 joint_states[f'joint_{i}'] = { 320 'position': joint_state[0], 321 'velocity': joint_state[1], 322 'reaction_forces': joint_state[2], 323 'motor_torque': joint_state[3] 324 } 325 326 # Robot position 327 robot_position = p.getBasePositionAndOrientation(robotId)[0] 328 robot_position = list(robot_position) 329 330 # Adjust robot position based on joint states 331 robot_position[0] -= raw_joint_states[0][0] 332 robot_position[1] -= raw_joint_states[1][0] 333 robot_position[2] += raw_joint_states[2][0] 334 335 # Pipette position 336 pipette_position = [robot_position[0] + self.pipette_offset[0], 337 robot_position[1] + self.pipette_offset[1], 338 robot_position[2] + self.pipette_offset[2]] 339 # Round pipette position to 4 decimal places 340 pipette_position = [round(num, 4) for num in pipette_position] 341 342 # Store information in the dictionary 343 states[f'robotId_{robotId}'] = { 344 "joint_states": joint_states, 345 "robot_position": robot_position, 346 "pipette_position": pipette_position 347 } 348 349 return states 350 351 # method to check contact with the spheres and the specimen and robot, when contact is detected, the sphere is fixed in place and collision is disabled 352 def check_contact(self, robotId, specimenId): 353 for sphereId in self.sphereIds: 354 # Check contact with the specimen 355 contact_points_specimen = p.getContactPoints(sphereId, specimenId) 356 # Check contact with the robot 357 contact_points_robot = p.getContactPoints(sphereId, robotId) 358 359 # If contact with the specimen is detected 360 if contact_points_specimen: 361 #logging.info(f'sphereId: {sphereId}, in contact with specimen: {specimenId}') 362 # Disable collision between the sphere and the specimen 363 p.setCollisionFilterPair(sphereId, specimenId, -1, -1, enableCollision=0) 364 #logging.info(f'sphereId: {sphereId}, collision disabled') 365 # Get current position and orientation of the sphere 366 sphere_position, sphere_orientation = p.getBasePositionAndOrientation(sphereId) 367 # Fix the sphere in place relative to the world 368 p.createConstraint(parentBodyUniqueId=sphereId, 369 parentLinkIndex=-1, 370 childBodyUniqueId=-1, 371 childLinkIndex=-1, 372 jointType=p.JOINT_FIXED, 373 jointAxis=[0, 0, 0], 374 parentFramePosition=[0, 0, 0], 375 childFramePosition=sphere_position, 376 childFrameOrientation=sphere_orientation) 377 # track the final position of the sphere on the specimen by adding it to the dictionary 378 if f'specimenId_{specimenId}' in self.droplet_positions: 379 self.droplet_positions[f'specimenId_{specimenId}'].append(sphere_position) 380 else: 381 self.droplet_positions[f'specimenId_{specimenId}'] = [sphere_position] 382 383 #logging.info(f'sphereId: {sphereId}, fixed in place') 384 385 # # turn off all collisions with other spheres that are in contact with this sphere 386 # for sphereId2 in self.sphereIds: 387 # if sphereId2 != sphereId: 388 # contact_points = p.getContactPoints(sphereId, sphereId2) 389 # if contact_points: 390 # p.setCollisionFilterPair(sphereId, sphereId2, -1, -1, enableCollision=0) 391 # logging.info(f'sphereId: {sphereId}, collision disabled with sphereId2: {sphereId2}') 392 393 # If contact with the robot is detected 394 if contact_points_robot: 395 # Destroy the sphere 396 p.removeBody(sphereId) 397 #logging.info(f'sphereId: {sphereId}, removed') 398 # Remove the sphereId from the list of sphereIds 399 self.sphereIds.remove(sphereId) 400 # Disable collision between the sphere and the robot 401 # p.setCollisionFilterPair(sphereId, robotId, -1, -1, enableCollision=0) 402 # Get current position and orientation of the sphere 403 # sphere_position, sphere_orientation = p.getBasePositionAndOrientation(sphereId) 404 # sphere_position = list(sphere_position) 405 # sphere_position[2] += 0.001 406 # # Fix the sphere in place relative to the world 407 # p.createConstraint(parentBodyUniqueId=sphereId, 408 # parentLinkIndex=-1, 409 # childBodyUniqueId=-1, 410 # childLinkIndex=-1, 411 # jointType=p.JOINT_FIXED, 412 # jointAxis=[0, 0, 0], 413 # parentFramePosition=[0, 0, 0], 414 # childFramePosition=sphere_position, 415 # childFrameOrientation=sphere_orientation) 416 # turn off all collisions with other spheres that are in contact with this sphere 417 # for sphereId2 in self.sphereIds: 418 # if sphereId2 != sphereId: 419 # contact_points = p.getContactPoints(sphereId, sphereId2) 420 # if contact_points: 421 # p.setCollisionFilterPair(sphereId, sphereId2, -1, -1, enableCollision=0) 422 # logging.info(f'sphereId: {sphereId}, collision disabled with sphereId2: {sphereId2}') 423 424 def set_start_position(self, x, y, z): 425 # Iterate through each robot and set its pipette to the start position 426 for robotId in self.robotIds: 427 # Calculate the necessary joint positions to reach the desired start position 428 # The calculation depends on the kinematic model of the robot 429 # For simplicity, let's assume a simple model where each joint moves in one axis (x, y, z) 430 # You might need to adjust this based on the actual robot kinematics 431 432 # Adjust the x, y, z values based on the robot's current position and pipette offset 433 robot_position = p.getBasePositionAndOrientation(robotId)[0] 434 adjusted_x = x - robot_position[0] - self.pipette_offset[0] 435 adjusted_y = y - robot_position[1] - self.pipette_offset[1] 436 adjusted_z = z - robot_position[2] - self.pipette_offset[2] 437 438 # Reset the joint positions/start position 439 p.resetJointState(robotId, 0, targetValue=adjusted_x) 440 p.resetJointState(robotId, 1, targetValue=adjusted_y) 441 p.resetJointState(robotId, 2, targetValue=adjusted_z) 442 443 # function to return the path of the current plate image 444 def get_plate_image(self): 445 return self.plate_image_path 446 447 # close the simulation 448 def close(self): 449 p.disconnect()
Simulation(num_agents, render=True, rgb_array=False)
13 def __init__(self, num_agents, render=True, rgb_array=False): 14 self.render = render 15 self.rgb_array = rgb_array 16 if render: 17 mode = p.GUI # for graphical version 18 else: 19 mode = p.DIRECT # for non-graphical version 20 # Set up the simulation 21 self.physicsClient = p.connect(mode) 22 # Hide the default GUI components 23 p.configureDebugVisualizer(p.COV_ENABLE_GUI, 0) 24 p.setAdditionalSearchPath(pybullet_data.getDataPath()) #optionally 25 p.setGravity(0,0,-10) 26 #p.setPhysicsEngineParameter(contactBreakingThreshold=0.000001) 27 # load a texture 28 # Get the directory where this file is located 29 self.current_dir = os.path.dirname(os.path.abspath(__file__)) 30 textures_dir = os.path.join(self.current_dir, "textures") 31 plates_dir = os.path.join(textures_dir, "_plates") 32 33 # Get only PNG files from textures directory (exclude directories and system files) 34 texture_list = sorted([f for f in os.listdir(textures_dir) if f.endswith('.png')]) 35 plates_list = sorted(os.listdir(plates_dir)) 36 37 # Select a random texture and use same index for corresponding plate image 38 random_texture = random.choice(texture_list) 39 random_texture_index = texture_list.index(random_texture) 40 self.plate_image_path = os.path.join(plates_dir, plates_list[random_texture_index]) 41 self.textureId = p.loadTexture(os.path.join(textures_dir, random_texture)) 42 #print(f'textureId: {self.textureId}') 43 44 # Set the camera parameters 45 cameraDistance = 1.1*(math.ceil((num_agents)**0.3)) # Distance from the target (zoom) 46 cameraYaw = 90 # Rotation around the vertical axis in degrees 47 cameraPitch = -35 # Rotation around the horizontal axis in degrees 48 cameraTargetPosition = [-0.2, -(math.ceil(num_agents**0.5)/2)+0.5, 0.1] # XYZ coordinates of the target position 49 50 # Reset the camera with the specified parameters 51 p.resetDebugVisualizerCamera(cameraDistance, cameraYaw, cameraPitch, cameraTargetPosition) 52 53 self.baseplaneId = p.loadURDF("plane.urdf") 54 # add collision shape to the plane 55 #p.createCollisionShape(shapeType=p.GEOM_BOX, halfExtents=[30, 305, 0.001]) 56 57 # define the pipette offset 58 self.pipette_offset = [0.073, 0.0895, 0.0895] 59 # dictionary to keep track of the current pipette position per robot 60 self.pipette_positions = {} 61 62 # Create the robots 63 self.create_robots(num_agents) 64 65 # list of sphere ids 66 self.sphereIds = [] 67 68 # dictionary to keep track of the droplet positions on specimens key for specimenId, list of droplet positions 69 self.droplet_positions = {} 70 71 # Function to compute view matrix based on these parameters 72 # def compute_camera_view(cameraDistance, cameraYaw, cameraPitch, cameraTargetPosition): 73 # camUpVector = (0, 0, 1) # Up vector in Z-direction 74 # camForward = (1, 0, 0) # Forward vector in X-direction 75 # camTargetPos = cameraTargetPosition 76 # camPos = p.multiplyTransforms(camTargetPos, p.getQuaternionFromEuler((0, 0, 0)), (0, 0, cameraDistance), p.getQuaternionFromEuler((cameraPitch, 0, cameraYaw)))[0] 77 # viewMatrix = p.computeViewMatrix(camPos, camTargetPos, camUpVector) 78 # return viewMatrix 79 80 # Capture the image 81 #self.view_matrix = compute_camera_view(cameraDistance, cameraYaw, cameraPitch, cameraTargetPosition) 82 #.projection_matrix = p.computeProjectionMatrixFOV(fov=60, aspect=640/480, nearVal=0.1, farVal=100)
def
create_robots(self, num_agents):
85 def create_robots(self, num_agents): 86 spacing = 1 # Adjust the spacing as needed 87 88 # Calculate the grid size to fit all agents 89 grid_size = math.ceil(num_agents ** 0.5) 90 91 self.robotIds = [] 92 self.specimenIds = [] 93 agent_count = 0 # Counter for the number of placed agents 94 95 for i in range(grid_size): 96 for j in range(grid_size): 97 if agent_count < num_agents: # Check if more agents need to be placed 98 # Calculate position for each robot 99 position = [-spacing * i, -spacing * j, 0.03] 100 robotId = p.loadURDF(os.path.join(self.current_dir, "ot_2_simulation_v6.urdf"), position, [0,0,0,1], 101 flags=p.URDF_USE_INERTIA_FROM_FILE) 102 start_position, start_orientation = p.getBasePositionAndOrientation(robotId) 103 p.createConstraint(parentBodyUniqueId=robotId, 104 parentLinkIndex=-1, 105 childBodyUniqueId=-1, 106 childLinkIndex=-1, 107 jointType=p.JOINT_FIXED, 108 jointAxis=[0, 0, 0], 109 parentFramePosition=[0, 0, 0], 110 childFramePosition=start_position, 111 childFrameOrientation=start_orientation) 112 113 # Create a fixed constraint between the robot and the base plane so the robot is fixed in space above the plane by its base link with an offset 114 #p.createConstraint(self.baseplaneId, -1, robotId, -1, p.JOINT_FIXED, [0, 0, 0], position, [0, 0, 0]) 115 116 # Load the specimen with an offset 117 offset = [0.18275-0.00005, 0.163-0.026, 0.057] 118 position_with_offset = [position[0] + offset[0], position[1] + offset[1], position[2] + offset[2]] 119 rotate_90 = p.getQuaternionFromEuler([0, 0, -math.pi/2]) 120 planeId = p.loadURDF(os.path.join(self.current_dir, "custom.urdf"), position_with_offset, rotate_90)#start_orientation) 121 # Disable collision between the robot and the specimen 122 p.setCollisionFilterPair(robotId, planeId, -1, -1, enableCollision=0) 123 spec_position, spec_orientation = p.getBasePositionAndOrientation(planeId) 124 125 #Constrain the specimen to the robot 126 # p.createConstraint(parentBodyUniqueId=robotId, 127 # parentLinkIndex=-1, 128 # childBodyUniqueId=planeId, 129 # childLinkIndex=-1, 130 # jointType=p.JOINT_FIXED, 131 # jointAxis=[0, 0, 0], 132 # parentFramePosition=start_position, 133 # #parentFrameOrientation=start_orientation, 134 # childFramePosition=[0, 0, 0], 135 # childFrameOrientation=[0, 0, 0, 1]) 136 #p.createConstraint(robotId, -1, planeId, -1, p.JOINT_FIXED, [0, 0, 0], offset, [0, 0, 0]) 137 p.createConstraint(parentBodyUniqueId=planeId, 138 parentLinkIndex=-1, 139 childBodyUniqueId=-1, 140 childLinkIndex=-1, 141 jointType=p.JOINT_FIXED, 142 jointAxis=[0, 0, 0], 143 parentFramePosition=[0, 0, 0], 144 childFramePosition=spec_position, 145 childFrameOrientation=spec_orientation) 146 # Load your texture and apply it to the plane 147 #textureId = p.loadTexture("uvmapped_dish_large_comp.png") 148 p.changeVisualShape(planeId, -1, textureUniqueId=self.textureId) 149 150 self.robotIds.append(robotId) 151 self.specimenIds.append(planeId) 152 153 agent_count += 1 # Increment the agent counter 154 155 # calculate the pipette position 156 pipette_position = self.get_pipette_position(robotId) 157 # save the pipette position 158 self.pipette_positions[f'robotId_{robotId}'] = pipette_position
def
get_pipette_position(self, robotId):
161 def get_pipette_position(self, robotId): 162 #get the position of the robot 163 robot_position = p.getBasePositionAndOrientation(robotId)[0] 164 robot_position = list(robot_position) 165 joint_states = p.getJointStates(robotId, [0, 1, 2]) 166 robot_position[0] -= joint_states[0][0] 167 robot_position[1] -= joint_states[1][0] 168 robot_position[2] += joint_states[2][0] 169 # x,y offset 170 x_offset = self.pipette_offset[0] 171 y_offset = self.pipette_offset[1] 172 z_offset = self.pipette_offset[2] 173 # Calculate the position of the pipette at the tip of the pipette but at the same z coordinate as the specimen 174 pipette_position = [robot_position[0]+x_offset, robot_position[1]+y_offset, robot_position[2]+z_offset] 175 return pipette_position
def
reset(self, num_agents=1):
178 def reset(self, num_agents=1): 179 # Remove the textures from the specimens 180 for specimenId in self.specimenIds: 181 p.changeVisualShape(specimenId, -1, textureUniqueId=-1) 182 183 # Remove the robots 184 for robotId in self.robotIds: 185 p.removeBody(robotId) 186 # remove the robotId from the list of robotIds 187 self.robotIds.remove(robotId) 188 189 # Remove the specimens 190 for specimenId in self.specimenIds: 191 p.removeBody(specimenId) 192 # remove the specimenId from the list of specimenIds 193 self.specimenIds.remove(specimenId) 194 195 # Remove the spheres 196 for sphereId in self.sphereIds: 197 p.removeBody(sphereId) 198 # remove the sphereId from the list of sphereIds 199 self.sphereIds.remove(sphereId) 200 201 # dictionary to keep track of the current pipette position per robot 202 self.pipette_positions = {} 203 # list of sphere ids 204 self.sphereIds = [] 205 # dictionary to keep track of the droplet positions on specimens key for specimenId, list of droplet positions 206 self.droplet_positions = {} 207 208 # Create the robots 209 self.create_robots(num_agents) 210 211 return self.get_states()
def
run(self, actions, num_steps=1):
214 def run(self, actions, num_steps=1): 215 #self.apply_actions(actions) 216 start = time.time() 217 n = 100 218 for i in range(num_steps): 219 self.apply_actions(actions) 220 p.stepSimulation() 221 222 # reset the droplet after 20 steps 223 # if self.dropped: 224 # self.cooldown += 1 225 # if self.cooldown == self.cooldown_time: 226 # self.dropped = False 227 # self.cooldown = 0 228 229 #compute and display the frames per second every n steps 230 # if i % n == 0: 231 # fps = n / (time.time() - start) 232 # start = time.time() 233 # print(f'fps: {fps}') 234 # #print the orientation of the robot every n steps 235 # for i in range(len(self.robotIds)): 236 # orientation = p.getBasePositionAndOrientation(self.robotIds[i])[1] 237 # #print(f'robot {i} orientation: {orientation}') 238 # #get the position of the link on the z axis 239 # link_state = p.getLinkState(self.robotIds[i], 0) 240 # print(f'robot {i} link_state: {link_state}') 241 # check contact for each robot and specimen 242 for specimenId, robotId in zip(self.specimenIds, self.robotIds): 243 #logging.info(f'checking contact for robotId: {robotId}, specimenId: {specimenId}') 244 self.check_contact(robotId, specimenId) 245 246 if self.rgb_array: 247 # Camera parameters 248 camera_pos = [1, 0, 1] # Example position 249 camera_target = [-0.3, 0, 0] # Point where the camera is looking at 250 up_vector = [0, 0, 1] # Usually the Z-axis is up 251 fov = 50 # Field of view 252 aspect = 320/240 # Aspect ratio (width/height) 253 254 # Get camera image 255 width, height, rgbImg, depthImg, segImg = p.getCameraImage(width=320, height=240, viewMatrix=p.computeViewMatrix(camera_pos, camera_target, up_vector), projectionMatrix=p.computeProjectionMatrixFOV(fov, aspect, 0.1, 100.0)) 256 257 self.current_frame = rgbImg # RGB array 258 #print(self.current_frame) 259 260 if self.render: 261 time.sleep(1./240.) # slow down the simulation 262 263 return self.get_states()
def
apply_actions(self, actions):
266 def apply_actions(self, actions): # actions [[x,y,z,drop], [x,y,z,drop], ... 267 for i in range(len(self.robotIds)): 268 p.setJointMotorControl2(self.robotIds[i], 0, p.VELOCITY_CONTROL, targetVelocity=-actions[i][0], force=500) 269 p.setJointMotorControl2(self.robotIds[i], 1, p.VELOCITY_CONTROL, targetVelocity=-actions[i][1], force=500) 270 p.setJointMotorControl2(self.robotIds[i], 2, p.VELOCITY_CONTROL, targetVelocity=actions[i][2], force=800) 271 if actions[i][3] == 1: 272 self.drop(robotId=self.robotIds[i]) 273 #logging.info(f'drop: {i}')
def
drop(self, robotId):
276 def drop(self, robotId): 277 # Get the position of the pipette based on the x,y,z coordinates of the joints 278 #robot_position = [0, 0, 0] 279 #get the position of the robot 280 robot_position = p.getBasePositionAndOrientation(robotId)[0] 281 robot_position = list(robot_position) 282 joint_states = p.getJointStates(robotId, [0, 1, 2]) 283 robot_position[0] -= joint_states[0][0] 284 robot_position[1] -= joint_states[1][0] 285 robot_position[2] += joint_states[2][0] 286 # x,y offset 287 x_offset = self.pipette_offset[0] 288 y_offset = self.pipette_offset[1] 289 z_offset = self.pipette_offset[2]-0.0015 290 # Get the position of the specimen 291 specimen_position = p.getBasePositionAndOrientation(self.specimenIds[0])[0] 292 #logging.info(f'droplet_position: {droplet_position}') 293 # Create a sphere to represent the droplet 294 sphereRadius = 0.003 # Adjust as needed 295 sphereColor = [1, 0, 0, 0.5] # RGBA (Red in this case) 296 visualShapeId = p.createVisualShape(shapeType=p.GEOM_SPHERE, radius=sphereRadius, rgbaColor=sphereColor) 297 #add collision to the sphere 298 collision = p.createCollisionShape(shapeType=p.GEOM_SPHERE, radius=sphereRadius) 299 sphereBody = p.createMultiBody(baseMass=0.1, baseVisualShapeIndex=visualShapeId, baseCollisionShapeIndex=collision) 300 # Calculate the position of the droplet at the tip of the pipette but at the same z coordinate as the specimen 301 droplet_position = [robot_position[0]+x_offset, robot_position[1]+y_offset, robot_position[2]+z_offset] 302 #specimen_position[2] + sphereRadius+0.015/2+0.06] 303 p.resetBasePositionAndOrientation(sphereBody, droplet_position, [0, 0, 0, 1]) 304 # track the sphere id 305 self.sphereIds.append(sphereBody) 306 self.dropped = True 307 #TODO: add some randomness to the droplet position proportional to the height of the pipette above the specimen and the velocity of the pipette of the pipette 308 return droplet_position
def
get_states(self):
311 def get_states(self): 312 states = {} 313 for robotId in self.robotIds: 314 raw_joint_states = p.getJointStates(robotId, [0, 1, 2]) 315 316 # Convert joint states into a dictionary 317 joint_states = {} 318 for i, joint_state in enumerate(raw_joint_states): 319 joint_states[f'joint_{i}'] = { 320 'position': joint_state[0], 321 'velocity': joint_state[1], 322 'reaction_forces': joint_state[2], 323 'motor_torque': joint_state[3] 324 } 325 326 # Robot position 327 robot_position = p.getBasePositionAndOrientation(robotId)[0] 328 robot_position = list(robot_position) 329 330 # Adjust robot position based on joint states 331 robot_position[0] -= raw_joint_states[0][0] 332 robot_position[1] -= raw_joint_states[1][0] 333 robot_position[2] += raw_joint_states[2][0] 334 335 # Pipette position 336 pipette_position = [robot_position[0] + self.pipette_offset[0], 337 robot_position[1] + self.pipette_offset[1], 338 robot_position[2] + self.pipette_offset[2]] 339 # Round pipette position to 4 decimal places 340 pipette_position = [round(num, 4) for num in pipette_position] 341 342 # Store information in the dictionary 343 states[f'robotId_{robotId}'] = { 344 "joint_states": joint_states, 345 "robot_position": robot_position, 346 "pipette_position": pipette_position 347 } 348 349 return states
def
check_contact(self, robotId, specimenId):
352 def check_contact(self, robotId, specimenId): 353 for sphereId in self.sphereIds: 354 # Check contact with the specimen 355 contact_points_specimen = p.getContactPoints(sphereId, specimenId) 356 # Check contact with the robot 357 contact_points_robot = p.getContactPoints(sphereId, robotId) 358 359 # If contact with the specimen is detected 360 if contact_points_specimen: 361 #logging.info(f'sphereId: {sphereId}, in contact with specimen: {specimenId}') 362 # Disable collision between the sphere and the specimen 363 p.setCollisionFilterPair(sphereId, specimenId, -1, -1, enableCollision=0) 364 #logging.info(f'sphereId: {sphereId}, collision disabled') 365 # Get current position and orientation of the sphere 366 sphere_position, sphere_orientation = p.getBasePositionAndOrientation(sphereId) 367 # Fix the sphere in place relative to the world 368 p.createConstraint(parentBodyUniqueId=sphereId, 369 parentLinkIndex=-1, 370 childBodyUniqueId=-1, 371 childLinkIndex=-1, 372 jointType=p.JOINT_FIXED, 373 jointAxis=[0, 0, 0], 374 parentFramePosition=[0, 0, 0], 375 childFramePosition=sphere_position, 376 childFrameOrientation=sphere_orientation) 377 # track the final position of the sphere on the specimen by adding it to the dictionary 378 if f'specimenId_{specimenId}' in self.droplet_positions: 379 self.droplet_positions[f'specimenId_{specimenId}'].append(sphere_position) 380 else: 381 self.droplet_positions[f'specimenId_{specimenId}'] = [sphere_position] 382 383 #logging.info(f'sphereId: {sphereId}, fixed in place') 384 385 # # turn off all collisions with other spheres that are in contact with this sphere 386 # for sphereId2 in self.sphereIds: 387 # if sphereId2 != sphereId: 388 # contact_points = p.getContactPoints(sphereId, sphereId2) 389 # if contact_points: 390 # p.setCollisionFilterPair(sphereId, sphereId2, -1, -1, enableCollision=0) 391 # logging.info(f'sphereId: {sphereId}, collision disabled with sphereId2: {sphereId2}') 392 393 # If contact with the robot is detected 394 if contact_points_robot: 395 # Destroy the sphere 396 p.removeBody(sphereId) 397 #logging.info(f'sphereId: {sphereId}, removed') 398 # Remove the sphereId from the list of sphereIds 399 self.sphereIds.remove(sphereId) 400 # Disable collision between the sphere and the robot 401 # p.setCollisionFilterPair(sphereId, robotId, -1, -1, enableCollision=0) 402 # Get current position and orientation of the sphere 403 # sphere_position, sphere_orientation = p.getBasePositionAndOrientation(sphereId) 404 # sphere_position = list(sphere_position) 405 # sphere_position[2] += 0.001 406 # # Fix the sphere in place relative to the world 407 # p.createConstraint(parentBodyUniqueId=sphereId, 408 # parentLinkIndex=-1, 409 # childBodyUniqueId=-1, 410 # childLinkIndex=-1, 411 # jointType=p.JOINT_FIXED, 412 # jointAxis=[0, 0, 0], 413 # parentFramePosition=[0, 0, 0], 414 # childFramePosition=sphere_position, 415 # childFrameOrientation=sphere_orientation) 416 # turn off all collisions with other spheres that are in contact with this sphere 417 # for sphereId2 in self.sphereIds: 418 # if sphereId2 != sphereId: 419 # contact_points = p.getContactPoints(sphereId, sphereId2) 420 # if contact_points: 421 # p.setCollisionFilterPair(sphereId, sphereId2, -1, -1, enableCollision=0) 422 # logging.info(f'sphereId: {sphereId}, collision disabled with sphereId2: {sphereId2}')
def
set_start_position(self, x, y, z):
424 def set_start_position(self, x, y, z): 425 # Iterate through each robot and set its pipette to the start position 426 for robotId in self.robotIds: 427 # Calculate the necessary joint positions to reach the desired start position 428 # The calculation depends on the kinematic model of the robot 429 # For simplicity, let's assume a simple model where each joint moves in one axis (x, y, z) 430 # You might need to adjust this based on the actual robot kinematics 431 432 # Adjust the x, y, z values based on the robot's current position and pipette offset 433 robot_position = p.getBasePositionAndOrientation(robotId)[0] 434 adjusted_x = x - robot_position[0] - self.pipette_offset[0] 435 adjusted_y = y - robot_position[1] - self.pipette_offset[1] 436 adjusted_z = z - robot_position[2] - self.pipette_offset[2] 437 438 # Reset the joint positions/start position 439 p.resetJointState(robotId, 0, targetValue=adjusted_x) 440 p.resetJointState(robotId, 1, targetValue=adjusted_y) 441 p.resetJointState(robotId, 2, targetValue=adjusted_z)