library.simulation

PyBullet simulation package for OT-2 gantry robot with specimen plates.

This package provides a simulation environment for liquid handling robotics with support for multiple agents, droplet physics, and collision detection.

 1"""
 2PyBullet simulation package for OT-2 gantry robot with specimen plates.
 3
 4This package provides a simulation environment for liquid handling robotics
 5with support for multiple agents, droplet physics, and collision detection.
 6"""
 7
 8from .sim_class import Simulation
 9
10__all__ = ['Simulation']
class Simulation:
 12class Simulation:
 13    def __init__(self, num_agents, render=True, rgb_array=False):
 14        self.render = render
 15        self.rgb_array = rgb_array
 16        if render:
 17            mode = p.GUI # for graphical version
 18        else:
 19            mode = p.DIRECT # for non-graphical version
 20        # Set up the simulation
 21        self.physicsClient = p.connect(mode)
 22        # Hide the default GUI components
 23        p.configureDebugVisualizer(p.COV_ENABLE_GUI, 0)
 24        p.setAdditionalSearchPath(pybullet_data.getDataPath()) #optionally
 25        p.setGravity(0,0,-10)
 26        #p.setPhysicsEngineParameter(contactBreakingThreshold=0.000001)
 27        # load a texture
 28        # Get the directory where this file is located
 29        self.current_dir = os.path.dirname(os.path.abspath(__file__))
 30        textures_dir = os.path.join(self.current_dir, "textures")
 31        plates_dir = os.path.join(textures_dir, "_plates")
 32        
 33        # Get only PNG files from textures directory (exclude directories and system files)
 34        texture_list = sorted([f for f in os.listdir(textures_dir) if f.endswith('.png')])
 35        plates_list = sorted(os.listdir(plates_dir))
 36        
 37        # Select a random texture and use same index for corresponding plate image
 38        random_texture = random.choice(texture_list)
 39        random_texture_index = texture_list.index(random_texture)
 40        self.plate_image_path = os.path.join(plates_dir, plates_list[random_texture_index])
 41        self.textureId = p.loadTexture(os.path.join(textures_dir, random_texture))
 42        #print(f'textureId: {self.textureId}')
 43
 44        # Set the camera parameters
 45        cameraDistance = 1.1*(math.ceil((num_agents)**0.3)) # Distance from the target (zoom)
 46        cameraYaw = 90  # Rotation around the vertical axis in degrees
 47        cameraPitch = -35  # Rotation around the horizontal axis in degrees
 48        cameraTargetPosition = [-0.2, -(math.ceil(num_agents**0.5)/2)+0.5, 0.1]  # XYZ coordinates of the target position
 49
 50        # Reset the camera with the specified parameters
 51        p.resetDebugVisualizerCamera(cameraDistance, cameraYaw, cameraPitch, cameraTargetPosition)
 52
 53        self.baseplaneId = p.loadURDF("plane.urdf")
 54        # add collision shape to the plane
 55        #p.createCollisionShape(shapeType=p.GEOM_BOX, halfExtents=[30, 305, 0.001])
 56
 57        # define the pipette offset
 58        self.pipette_offset = [0.073, 0.0895, 0.0895]
 59        # dictionary to keep track of the current pipette position per robot
 60        self.pipette_positions = {}
 61
 62        # Create the robots
 63        self.create_robots(num_agents)
 64
 65        # list of sphere ids
 66        self.sphereIds = []
 67
 68        # dictionary to keep track of the droplet positions on specimens key for specimenId, list of droplet positions
 69        self.droplet_positions = {}
 70
 71        # Function to compute view matrix based on these parameters
 72        # def compute_camera_view(cameraDistance, cameraYaw, cameraPitch, cameraTargetPosition):
 73        #     camUpVector = (0, 0, 1)  # Up vector in Z-direction
 74        #     camForward = (1, 0, 0)  # Forward vector in X-direction
 75        #     camTargetPos = cameraTargetPosition
 76        #     camPos = p.multiplyTransforms(camTargetPos, p.getQuaternionFromEuler((0, 0, 0)), (0, 0, cameraDistance), p.getQuaternionFromEuler((cameraPitch, 0, cameraYaw)))[0]
 77        #     viewMatrix = p.computeViewMatrix(camPos, camTargetPos, camUpVector)
 78        #     return viewMatrix
 79        
 80        # Capture the image
 81        #self.view_matrix = compute_camera_view(cameraDistance, cameraYaw, cameraPitch, cameraTargetPosition)
 82        #.projection_matrix = p.computeProjectionMatrixFOV(fov=60, aspect=640/480, nearVal=0.1, farVal=100)
 83
 84    # method to create n robots in a grid pattern
 85    def create_robots(self, num_agents):
 86        spacing = 1  # Adjust the spacing as needed
 87
 88        # Calculate the grid size to fit all agents
 89        grid_size = math.ceil(num_agents ** 0.5) 
 90
 91        self.robotIds = []
 92        self.specimenIds = []
 93        agent_count = 0  # Counter for the number of placed agents
 94
 95        for i in range(grid_size):
 96            for j in range(grid_size):
 97                if agent_count < num_agents:  # Check if more agents need to be placed
 98                    # Calculate position for each robot
 99                    position = [-spacing * i, -spacing * j, 0.03]
100                    robotId = p.loadURDF(os.path.join(self.current_dir, "ot_2_simulation_v6.urdf"), position, [0,0,0,1],
101                                        flags=p.URDF_USE_INERTIA_FROM_FILE)
102                    start_position, start_orientation = p.getBasePositionAndOrientation(robotId)
103                    p.createConstraint(parentBodyUniqueId=robotId,
104                                    parentLinkIndex=-1,
105                                    childBodyUniqueId=-1,
106                                    childLinkIndex=-1,
107                                    jointType=p.JOINT_FIXED,
108                                    jointAxis=[0, 0, 0],
109                                    parentFramePosition=[0, 0, 0],
110                                    childFramePosition=start_position,
111                                    childFrameOrientation=start_orientation)
112
113                    # Create a fixed constraint between the robot and the base plane so the robot is fixed in space above the plane by its base link with an offset
114                    #p.createConstraint(self.baseplaneId, -1, robotId, -1, p.JOINT_FIXED, [0, 0, 0], position, [0, 0, 0])
115
116                    # Load the specimen with an offset
117                    offset = [0.18275-0.00005, 0.163-0.026, 0.057]
118                    position_with_offset = [position[0] + offset[0], position[1] + offset[1], position[2] + offset[2]]
119                    rotate_90 = p.getQuaternionFromEuler([0, 0, -math.pi/2])
120                    planeId = p.loadURDF(os.path.join(self.current_dir, "custom.urdf"), position_with_offset, rotate_90)#start_orientation)
121                    # Disable collision between the robot and the specimen
122                    p.setCollisionFilterPair(robotId, planeId, -1, -1, enableCollision=0)
123                    spec_position, spec_orientation = p.getBasePositionAndOrientation(planeId)
124
125                    #Constrain the specimen to the robot
126                    # p.createConstraint(parentBodyUniqueId=robotId,
127                    #                 parentLinkIndex=-1,
128                    #                 childBodyUniqueId=planeId,
129                    #                 childLinkIndex=-1,
130                    #                 jointType=p.JOINT_FIXED,
131                    #                 jointAxis=[0, 0, 0],
132                    #                 parentFramePosition=start_position,
133                    #                 #parentFrameOrientation=start_orientation,
134                    #                 childFramePosition=[0, 0, 0],
135                    #                 childFrameOrientation=[0, 0, 0, 1])
136                    #p.createConstraint(robotId, -1, planeId, -1, p.JOINT_FIXED, [0, 0, 0], offset, [0, 0, 0])
137                    p.createConstraint(parentBodyUniqueId=planeId,
138                                    parentLinkIndex=-1,
139                                    childBodyUniqueId=-1,
140                                    childLinkIndex=-1,
141                                    jointType=p.JOINT_FIXED,
142                                    jointAxis=[0, 0, 0],
143                                    parentFramePosition=[0, 0, 0],
144                                    childFramePosition=spec_position,
145                                    childFrameOrientation=spec_orientation)
146                    # Load your texture and apply it to the plane
147                    #textureId = p.loadTexture("uvmapped_dish_large_comp.png")
148                    p.changeVisualShape(planeId, -1, textureUniqueId=self.textureId)
149
150                    self.robotIds.append(robotId)
151                    self.specimenIds.append(planeId)
152
153                    agent_count += 1  # Increment the agent counter
154
155                    # calculate the pipette position
156                    pipette_position = self.get_pipette_position(robotId)
157                    # save the pipette position
158                    self.pipette_positions[f'robotId_{robotId}'] = pipette_position
159
160    # method to get the current pipette position for a robot
161    def get_pipette_position(self, robotId):
162        #get the position of the robot
163        robot_position = p.getBasePositionAndOrientation(robotId)[0]
164        robot_position = list(robot_position)
165        joint_states = p.getJointStates(robotId, [0, 1, 2])
166        robot_position[0] -= joint_states[0][0]
167        robot_position[1] -= joint_states[1][0]
168        robot_position[2] += joint_states[2][0]
169        # x,y offset
170        x_offset = self.pipette_offset[0]
171        y_offset = self.pipette_offset[1]
172        z_offset = self.pipette_offset[2]
173        # Calculate the position of the pipette at the tip of the pipette but at the same z coordinate as the specimen
174        pipette_position = [robot_position[0]+x_offset, robot_position[1]+y_offset, robot_position[2]+z_offset]
175        return pipette_position
176
177    # method to reset the simulation
178    def reset(self, num_agents=1):
179        # Remove the textures from the specimens
180        for specimenId in self.specimenIds:
181            p.changeVisualShape(specimenId, -1, textureUniqueId=-1)
182
183        # Remove the robots
184        for robotId in self.robotIds:
185            p.removeBody(robotId)
186            # remove the robotId from the list of robotIds
187            self.robotIds.remove(robotId)
188
189        # Remove the specimens
190        for specimenId in self.specimenIds:
191            p.removeBody(specimenId)
192            # remove the specimenId from the list of specimenIds
193            self.specimenIds.remove(specimenId)
194
195        # Remove the spheres
196        for sphereId in self.sphereIds:
197            p.removeBody(sphereId)
198            # remove the sphereId from the list of sphereIds
199            self.sphereIds.remove(sphereId)
200
201        # dictionary to keep track of the current pipette position per robot
202        self.pipette_positions = {}
203        # list of sphere ids
204        self.sphereIds = []
205        # dictionary to keep track of the droplet positions on specimens key for specimenId, list of droplet positions
206        self.droplet_positions = {}
207
208        # Create the robots
209        self.create_robots(num_agents)
210
211        return self.get_states()
212
213    # method to run the simulation for a specified number of steps
214    def run(self, actions, num_steps=1):
215        #self.apply_actions(actions)
216        start = time.time()
217        n = 100
218        for i in range(num_steps):
219            self.apply_actions(actions)
220            p.stepSimulation()
221
222            # reset the droplet after 20 steps
223            # if self.dropped:
224            #     self.cooldown += 1
225            #     if self.cooldown == self.cooldown_time:
226            #         self.dropped = False
227            #         self.cooldown = 0                
228
229            #compute and display the frames per second every n steps
230            # if i % n == 0:
231            #     fps = n / (time.time() - start)
232            #     start = time.time()
233            #     print(f'fps: {fps}')
234                # #print the orientation of the robot every n steps
235                # for i in range(len(self.robotIds)):
236                #     orientation = p.getBasePositionAndOrientation(self.robotIds[i])[1]
237                #     #print(f'robot {i} orientation: {orientation}')
238                #     #get the position of the link on the z axis
239                #     link_state = p.getLinkState(self.robotIds[i], 0)
240                #     print(f'robot {i} link_state: {link_state}')
241            # check contact for each robot and specimen
242            for specimenId, robotId in zip(self.specimenIds, self.robotIds):
243                #logging.info(f'checking contact for robotId: {robotId}, specimenId: {specimenId}')
244                self.check_contact(robotId, specimenId)
245
246            if self.rgb_array:
247                # Camera parameters
248                camera_pos = [1, 0, 1] # Example position
249                camera_target = [-0.3, 0, 0] # Point where the camera is looking at
250                up_vector = [0, 0, 1] # Usually the Z-axis is up
251                fov = 50 # Field of view
252                aspect = 320/240 # Aspect ratio (width/height)
253
254                # Get camera image
255                width, height, rgbImg, depthImg, segImg = p.getCameraImage(width=320, height=240, viewMatrix=p.computeViewMatrix(camera_pos, camera_target, up_vector), projectionMatrix=p.computeProjectionMatrixFOV(fov, aspect, 0.1, 100.0))
256                
257                self.current_frame = rgbImg  # RGB array
258                #print(self.current_frame)
259
260            if self.render:
261                time.sleep(1./240.) # slow down the simulation
262
263        return self.get_states()
264    
265    # method to apply actions to the robots using velocity control
266    def apply_actions(self, actions): # actions [[x,y,z,drop], [x,y,z,drop], ...
267        for i in range(len(self.robotIds)):
268            p.setJointMotorControl2(self.robotIds[i], 0, p.VELOCITY_CONTROL, targetVelocity=-actions[i][0], force=500)
269            p.setJointMotorControl2(self.robotIds[i], 1, p.VELOCITY_CONTROL, targetVelocity=-actions[i][1], force=500)
270            p.setJointMotorControl2(self.robotIds[i], 2, p.VELOCITY_CONTROL, targetVelocity=actions[i][2], force=800)
271            if actions[i][3] == 1:
272                self.drop(robotId=self.robotIds[i])
273                #logging.info(f'drop: {i}')
274
275    # method to drop a simulated droplet on the specimen from the pipette
276    def drop(self, robotId):
277        # Get the position of the pipette based on the x,y,z coordinates of the joints
278        #robot_position = [0, 0, 0]
279        #get the position of the robot
280        robot_position = p.getBasePositionAndOrientation(robotId)[0]
281        robot_position = list(robot_position)
282        joint_states = p.getJointStates(robotId, [0, 1, 2])
283        robot_position[0] -= joint_states[0][0]
284        robot_position[1] -= joint_states[1][0]
285        robot_position[2] += joint_states[2][0]
286        # x,y offset
287        x_offset = self.pipette_offset[0]
288        y_offset = self.pipette_offset[1]
289        z_offset = self.pipette_offset[2]-0.0015
290        # Get the position of the specimen
291        specimen_position = p.getBasePositionAndOrientation(self.specimenIds[0])[0]
292        #logging.info(f'droplet_position: {droplet_position}')
293        # Create a sphere to represent the droplet
294        sphereRadius = 0.003  # Adjust as needed
295        sphereColor = [1, 0, 0, 0.5]  # RGBA (Red in this case)
296        visualShapeId = p.createVisualShape(shapeType=p.GEOM_SPHERE, radius=sphereRadius, rgbaColor=sphereColor)
297        #add collision to the sphere
298        collision = p.createCollisionShape(shapeType=p.GEOM_SPHERE, radius=sphereRadius)
299        sphereBody = p.createMultiBody(baseMass=0.1, baseVisualShapeIndex=visualShapeId, baseCollisionShapeIndex=collision)
300        # Calculate the position of the droplet at the tip of the pipette but at the same z coordinate as the specimen
301        droplet_position = [robot_position[0]+x_offset, robot_position[1]+y_offset, robot_position[2]+z_offset]
302                            #specimen_position[2] + sphereRadius+0.015/2+0.06]
303        p.resetBasePositionAndOrientation(sphereBody, droplet_position, [0, 0, 0, 1])
304        # track the sphere id
305        self.sphereIds.append(sphereBody)
306        self.dropped = True
307        #TODO: add some randomness to the droplet position proportional to the height of the pipette above the specimen and the velocity of the pipette of the pipette
308        return droplet_position
309
310    # method to get the states of the robots
311    def get_states(self):
312        states = {}
313        for robotId in self.robotIds:
314            raw_joint_states = p.getJointStates(robotId, [0, 1, 2])
315
316            # Convert joint states into a dictionary
317            joint_states = {}
318            for i, joint_state in enumerate(raw_joint_states):
319                joint_states[f'joint_{i}'] = {
320                    'position': joint_state[0],
321                    'velocity': joint_state[1],
322                    'reaction_forces': joint_state[2],
323                    'motor_torque': joint_state[3]
324                }
325
326            # Robot position
327            robot_position = p.getBasePositionAndOrientation(robotId)[0]
328            robot_position = list(robot_position)
329
330            # Adjust robot position based on joint states
331            robot_position[0] -= raw_joint_states[0][0]
332            robot_position[1] -= raw_joint_states[1][0]
333            robot_position[2] += raw_joint_states[2][0]
334
335            # Pipette position
336            pipette_position = [robot_position[0] + self.pipette_offset[0],
337                                robot_position[1] + self.pipette_offset[1],
338                                robot_position[2] + self.pipette_offset[2]]
339            # Round pipette position to 4 decimal places
340            pipette_position = [round(num, 4) for num in pipette_position]
341
342            # Store information in the dictionary
343            states[f'robotId_{robotId}'] = {
344                "joint_states": joint_states,
345                "robot_position": robot_position,
346                "pipette_position": pipette_position
347            }
348
349        return states
350    
351    # method to check contact with the spheres and the specimen and robot, when contact is detected, the sphere is fixed in place and collision is disabled
352    def check_contact(self, robotId, specimenId):
353        for sphereId in self.sphereIds:
354            # Check contact with the specimen
355            contact_points_specimen = p.getContactPoints(sphereId, specimenId)
356            # Check contact with the robot
357            contact_points_robot = p.getContactPoints(sphereId, robotId)
358
359            # If contact with the specimen is detected
360            if contact_points_specimen:
361                #logging.info(f'sphereId: {sphereId}, in contact with specimen: {specimenId}')
362                # Disable collision between the sphere and the specimen
363                p.setCollisionFilterPair(sphereId, specimenId, -1, -1, enableCollision=0)
364                #logging.info(f'sphereId: {sphereId}, collision disabled')
365                # Get current position and orientation of the sphere
366                sphere_position, sphere_orientation = p.getBasePositionAndOrientation(sphereId)
367                # Fix the sphere in place relative to the world
368                p.createConstraint(parentBodyUniqueId=sphereId,
369                                    parentLinkIndex=-1,
370                                    childBodyUniqueId=-1,
371                                    childLinkIndex=-1,
372                                    jointType=p.JOINT_FIXED,
373                                    jointAxis=[0, 0, 0],
374                                    parentFramePosition=[0, 0, 0],
375                                    childFramePosition=sphere_position,
376                                    childFrameOrientation=sphere_orientation)
377                # track the final position of the sphere on the specimen by adding it to the dictionary
378                if f'specimenId_{specimenId}' in self.droplet_positions:
379                    self.droplet_positions[f'specimenId_{specimenId}'].append(sphere_position)
380                else:
381                    self.droplet_positions[f'specimenId_{specimenId}'] = [sphere_position]
382
383                #logging.info(f'sphereId: {sphereId}, fixed in place')
384
385                # # turn off all collisions with other spheres that are in contact with this sphere
386                # for sphereId2 in self.sphereIds:
387                #     if sphereId2 != sphereId:
388                #         contact_points = p.getContactPoints(sphereId, sphereId2)
389                #         if contact_points:
390                #             p.setCollisionFilterPair(sphereId, sphereId2, -1, -1, enableCollision=0)
391                #             logging.info(f'sphereId: {sphereId}, collision disabled with sphereId2: {sphereId2}')
392
393            # If contact with the robot is detected
394            if contact_points_robot:
395                # Destroy the sphere
396                p.removeBody(sphereId)
397                #logging.info(f'sphereId: {sphereId}, removed')
398                # Remove the sphereId from the list of sphereIds
399                self.sphereIds.remove(sphereId)
400                # Disable collision between the sphere and the robot
401                # p.setCollisionFilterPair(sphereId, robotId, -1, -1, enableCollision=0)
402                # Get current position and orientation of the sphere
403                # sphere_position, sphere_orientation = p.getBasePositionAndOrientation(sphereId)
404                # sphere_position = list(sphere_position)
405                # sphere_position[2] += 0.001
406                # # Fix the sphere in place relative to the world
407                # p.createConstraint(parentBodyUniqueId=sphereId,
408                #                     parentLinkIndex=-1,
409                #                     childBodyUniqueId=-1,
410                #                     childLinkIndex=-1,
411                #                     jointType=p.JOINT_FIXED,
412                #                     jointAxis=[0, 0, 0],
413                #                     parentFramePosition=[0, 0, 0],
414                #                     childFramePosition=sphere_position,
415                #                     childFrameOrientation=sphere_orientation)
416                                # turn off all collisions with other spheres that are in contact with this sphere
417                # for sphereId2 in self.sphereIds:
418                #     if sphereId2 != sphereId:
419                #         contact_points = p.getContactPoints(sphereId, sphereId2)
420                #         if contact_points:
421                #             p.setCollisionFilterPair(sphereId, sphereId2, -1, -1, enableCollision=0)
422                #             logging.info(f'sphereId: {sphereId}, collision disabled with sphereId2: {sphereId2}')
423
424    def set_start_position(self, x, y, z):
425        # Iterate through each robot and set its pipette to the start position
426        for robotId in self.robotIds:
427            # Calculate the necessary joint positions to reach the desired start position
428            # The calculation depends on the kinematic model of the robot
429            # For simplicity, let's assume a simple model where each joint moves in one axis (x, y, z)
430            # You might need to adjust this based on the actual robot kinematics
431
432            # Adjust the x, y, z values based on the robot's current position and pipette offset
433            robot_position = p.getBasePositionAndOrientation(robotId)[0]
434            adjusted_x = x - robot_position[0] - self.pipette_offset[0]
435            adjusted_y = y - robot_position[1] - self.pipette_offset[1]
436            adjusted_z = z - robot_position[2] - self.pipette_offset[2]
437
438            # Reset the joint positions/start position
439            p.resetJointState(robotId, 0, targetValue=adjusted_x)
440            p.resetJointState(robotId, 1, targetValue=adjusted_y)
441            p.resetJointState(robotId, 2, targetValue=adjusted_z)
442
443    # function to return the path of the current plate image
444    def get_plate_image(self):
445        return self.plate_image_path
446    
447    # close the simulation
448    def close(self):
449        p.disconnect()
Simulation(num_agents, render=True, rgb_array=False)
13    def __init__(self, num_agents, render=True, rgb_array=False):
14        self.render = render
15        self.rgb_array = rgb_array
16        if render:
17            mode = p.GUI # for graphical version
18        else:
19            mode = p.DIRECT # for non-graphical version
20        # Set up the simulation
21        self.physicsClient = p.connect(mode)
22        # Hide the default GUI components
23        p.configureDebugVisualizer(p.COV_ENABLE_GUI, 0)
24        p.setAdditionalSearchPath(pybullet_data.getDataPath()) #optionally
25        p.setGravity(0,0,-10)
26        #p.setPhysicsEngineParameter(contactBreakingThreshold=0.000001)
27        # load a texture
28        # Get the directory where this file is located
29        self.current_dir = os.path.dirname(os.path.abspath(__file__))
30        textures_dir = os.path.join(self.current_dir, "textures")
31        plates_dir = os.path.join(textures_dir, "_plates")
32        
33        # Get only PNG files from textures directory (exclude directories and system files)
34        texture_list = sorted([f for f in os.listdir(textures_dir) if f.endswith('.png')])
35        plates_list = sorted(os.listdir(plates_dir))
36        
37        # Select a random texture and use same index for corresponding plate image
38        random_texture = random.choice(texture_list)
39        random_texture_index = texture_list.index(random_texture)
40        self.plate_image_path = os.path.join(plates_dir, plates_list[random_texture_index])
41        self.textureId = p.loadTexture(os.path.join(textures_dir, random_texture))
42        #print(f'textureId: {self.textureId}')
43
44        # Set the camera parameters
45        cameraDistance = 1.1*(math.ceil((num_agents)**0.3)) # Distance from the target (zoom)
46        cameraYaw = 90  # Rotation around the vertical axis in degrees
47        cameraPitch = -35  # Rotation around the horizontal axis in degrees
48        cameraTargetPosition = [-0.2, -(math.ceil(num_agents**0.5)/2)+0.5, 0.1]  # XYZ coordinates of the target position
49
50        # Reset the camera with the specified parameters
51        p.resetDebugVisualizerCamera(cameraDistance, cameraYaw, cameraPitch, cameraTargetPosition)
52
53        self.baseplaneId = p.loadURDF("plane.urdf")
54        # add collision shape to the plane
55        #p.createCollisionShape(shapeType=p.GEOM_BOX, halfExtents=[30, 305, 0.001])
56
57        # define the pipette offset
58        self.pipette_offset = [0.073, 0.0895, 0.0895]
59        # dictionary to keep track of the current pipette position per robot
60        self.pipette_positions = {}
61
62        # Create the robots
63        self.create_robots(num_agents)
64
65        # list of sphere ids
66        self.sphereIds = []
67
68        # dictionary to keep track of the droplet positions on specimens key for specimenId, list of droplet positions
69        self.droplet_positions = {}
70
71        # Function to compute view matrix based on these parameters
72        # def compute_camera_view(cameraDistance, cameraYaw, cameraPitch, cameraTargetPosition):
73        #     camUpVector = (0, 0, 1)  # Up vector in Z-direction
74        #     camForward = (1, 0, 0)  # Forward vector in X-direction
75        #     camTargetPos = cameraTargetPosition
76        #     camPos = p.multiplyTransforms(camTargetPos, p.getQuaternionFromEuler((0, 0, 0)), (0, 0, cameraDistance), p.getQuaternionFromEuler((cameraPitch, 0, cameraYaw)))[0]
77        #     viewMatrix = p.computeViewMatrix(camPos, camTargetPos, camUpVector)
78        #     return viewMatrix
79        
80        # Capture the image
81        #self.view_matrix = compute_camera_view(cameraDistance, cameraYaw, cameraPitch, cameraTargetPosition)
82        #.projection_matrix = p.computeProjectionMatrixFOV(fov=60, aspect=640/480, nearVal=0.1, farVal=100)
render
rgb_array
physicsClient
current_dir
plate_image_path
textureId
baseplaneId
pipette_offset
pipette_positions
sphereIds
droplet_positions
def create_robots(self, num_agents):
 85    def create_robots(self, num_agents):
 86        spacing = 1  # Adjust the spacing as needed
 87
 88        # Calculate the grid size to fit all agents
 89        grid_size = math.ceil(num_agents ** 0.5) 
 90
 91        self.robotIds = []
 92        self.specimenIds = []
 93        agent_count = 0  # Counter for the number of placed agents
 94
 95        for i in range(grid_size):
 96            for j in range(grid_size):
 97                if agent_count < num_agents:  # Check if more agents need to be placed
 98                    # Calculate position for each robot
 99                    position = [-spacing * i, -spacing * j, 0.03]
100                    robotId = p.loadURDF(os.path.join(self.current_dir, "ot_2_simulation_v6.urdf"), position, [0,0,0,1],
101                                        flags=p.URDF_USE_INERTIA_FROM_FILE)
102                    start_position, start_orientation = p.getBasePositionAndOrientation(robotId)
103                    p.createConstraint(parentBodyUniqueId=robotId,
104                                    parentLinkIndex=-1,
105                                    childBodyUniqueId=-1,
106                                    childLinkIndex=-1,
107                                    jointType=p.JOINT_FIXED,
108                                    jointAxis=[0, 0, 0],
109                                    parentFramePosition=[0, 0, 0],
110                                    childFramePosition=start_position,
111                                    childFrameOrientation=start_orientation)
112
113                    # Create a fixed constraint between the robot and the base plane so the robot is fixed in space above the plane by its base link with an offset
114                    #p.createConstraint(self.baseplaneId, -1, robotId, -1, p.JOINT_FIXED, [0, 0, 0], position, [0, 0, 0])
115
116                    # Load the specimen with an offset
117                    offset = [0.18275-0.00005, 0.163-0.026, 0.057]
118                    position_with_offset = [position[0] + offset[0], position[1] + offset[1], position[2] + offset[2]]
119                    rotate_90 = p.getQuaternionFromEuler([0, 0, -math.pi/2])
120                    planeId = p.loadURDF(os.path.join(self.current_dir, "custom.urdf"), position_with_offset, rotate_90)#start_orientation)
121                    # Disable collision between the robot and the specimen
122                    p.setCollisionFilterPair(robotId, planeId, -1, -1, enableCollision=0)
123                    spec_position, spec_orientation = p.getBasePositionAndOrientation(planeId)
124
125                    #Constrain the specimen to the robot
126                    # p.createConstraint(parentBodyUniqueId=robotId,
127                    #                 parentLinkIndex=-1,
128                    #                 childBodyUniqueId=planeId,
129                    #                 childLinkIndex=-1,
130                    #                 jointType=p.JOINT_FIXED,
131                    #                 jointAxis=[0, 0, 0],
132                    #                 parentFramePosition=start_position,
133                    #                 #parentFrameOrientation=start_orientation,
134                    #                 childFramePosition=[0, 0, 0],
135                    #                 childFrameOrientation=[0, 0, 0, 1])
136                    #p.createConstraint(robotId, -1, planeId, -1, p.JOINT_FIXED, [0, 0, 0], offset, [0, 0, 0])
137                    p.createConstraint(parentBodyUniqueId=planeId,
138                                    parentLinkIndex=-1,
139                                    childBodyUniqueId=-1,
140                                    childLinkIndex=-1,
141                                    jointType=p.JOINT_FIXED,
142                                    jointAxis=[0, 0, 0],
143                                    parentFramePosition=[0, 0, 0],
144                                    childFramePosition=spec_position,
145                                    childFrameOrientation=spec_orientation)
146                    # Load your texture and apply it to the plane
147                    #textureId = p.loadTexture("uvmapped_dish_large_comp.png")
148                    p.changeVisualShape(planeId, -1, textureUniqueId=self.textureId)
149
150                    self.robotIds.append(robotId)
151                    self.specimenIds.append(planeId)
152
153                    agent_count += 1  # Increment the agent counter
154
155                    # calculate the pipette position
156                    pipette_position = self.get_pipette_position(robotId)
157                    # save the pipette position
158                    self.pipette_positions[f'robotId_{robotId}'] = pipette_position
def get_pipette_position(self, robotId):
161    def get_pipette_position(self, robotId):
162        #get the position of the robot
163        robot_position = p.getBasePositionAndOrientation(robotId)[0]
164        robot_position = list(robot_position)
165        joint_states = p.getJointStates(robotId, [0, 1, 2])
166        robot_position[0] -= joint_states[0][0]
167        robot_position[1] -= joint_states[1][0]
168        robot_position[2] += joint_states[2][0]
169        # x,y offset
170        x_offset = self.pipette_offset[0]
171        y_offset = self.pipette_offset[1]
172        z_offset = self.pipette_offset[2]
173        # Calculate the position of the pipette at the tip of the pipette but at the same z coordinate as the specimen
174        pipette_position = [robot_position[0]+x_offset, robot_position[1]+y_offset, robot_position[2]+z_offset]
175        return pipette_position
def reset(self, num_agents=1):
178    def reset(self, num_agents=1):
179        # Remove the textures from the specimens
180        for specimenId in self.specimenIds:
181            p.changeVisualShape(specimenId, -1, textureUniqueId=-1)
182
183        # Remove the robots
184        for robotId in self.robotIds:
185            p.removeBody(robotId)
186            # remove the robotId from the list of robotIds
187            self.robotIds.remove(robotId)
188
189        # Remove the specimens
190        for specimenId in self.specimenIds:
191            p.removeBody(specimenId)
192            # remove the specimenId from the list of specimenIds
193            self.specimenIds.remove(specimenId)
194
195        # Remove the spheres
196        for sphereId in self.sphereIds:
197            p.removeBody(sphereId)
198            # remove the sphereId from the list of sphereIds
199            self.sphereIds.remove(sphereId)
200
201        # dictionary to keep track of the current pipette position per robot
202        self.pipette_positions = {}
203        # list of sphere ids
204        self.sphereIds = []
205        # dictionary to keep track of the droplet positions on specimens key for specimenId, list of droplet positions
206        self.droplet_positions = {}
207
208        # Create the robots
209        self.create_robots(num_agents)
210
211        return self.get_states()
def run(self, actions, num_steps=1):
214    def run(self, actions, num_steps=1):
215        #self.apply_actions(actions)
216        start = time.time()
217        n = 100
218        for i in range(num_steps):
219            self.apply_actions(actions)
220            p.stepSimulation()
221
222            # reset the droplet after 20 steps
223            # if self.dropped:
224            #     self.cooldown += 1
225            #     if self.cooldown == self.cooldown_time:
226            #         self.dropped = False
227            #         self.cooldown = 0                
228
229            #compute and display the frames per second every n steps
230            # if i % n == 0:
231            #     fps = n / (time.time() - start)
232            #     start = time.time()
233            #     print(f'fps: {fps}')
234                # #print the orientation of the robot every n steps
235                # for i in range(len(self.robotIds)):
236                #     orientation = p.getBasePositionAndOrientation(self.robotIds[i])[1]
237                #     #print(f'robot {i} orientation: {orientation}')
238                #     #get the position of the link on the z axis
239                #     link_state = p.getLinkState(self.robotIds[i], 0)
240                #     print(f'robot {i} link_state: {link_state}')
241            # check contact for each robot and specimen
242            for specimenId, robotId in zip(self.specimenIds, self.robotIds):
243                #logging.info(f'checking contact for robotId: {robotId}, specimenId: {specimenId}')
244                self.check_contact(robotId, specimenId)
245
246            if self.rgb_array:
247                # Camera parameters
248                camera_pos = [1, 0, 1] # Example position
249                camera_target = [-0.3, 0, 0] # Point where the camera is looking at
250                up_vector = [0, 0, 1] # Usually the Z-axis is up
251                fov = 50 # Field of view
252                aspect = 320/240 # Aspect ratio (width/height)
253
254                # Get camera image
255                width, height, rgbImg, depthImg, segImg = p.getCameraImage(width=320, height=240, viewMatrix=p.computeViewMatrix(camera_pos, camera_target, up_vector), projectionMatrix=p.computeProjectionMatrixFOV(fov, aspect, 0.1, 100.0))
256                
257                self.current_frame = rgbImg  # RGB array
258                #print(self.current_frame)
259
260            if self.render:
261                time.sleep(1./240.) # slow down the simulation
262
263        return self.get_states()
def apply_actions(self, actions):
266    def apply_actions(self, actions): # actions [[x,y,z,drop], [x,y,z,drop], ...
267        for i in range(len(self.robotIds)):
268            p.setJointMotorControl2(self.robotIds[i], 0, p.VELOCITY_CONTROL, targetVelocity=-actions[i][0], force=500)
269            p.setJointMotorControl2(self.robotIds[i], 1, p.VELOCITY_CONTROL, targetVelocity=-actions[i][1], force=500)
270            p.setJointMotorControl2(self.robotIds[i], 2, p.VELOCITY_CONTROL, targetVelocity=actions[i][2], force=800)
271            if actions[i][3] == 1:
272                self.drop(robotId=self.robotIds[i])
273                #logging.info(f'drop: {i}')
def drop(self, robotId):
276    def drop(self, robotId):
277        # Get the position of the pipette based on the x,y,z coordinates of the joints
278        #robot_position = [0, 0, 0]
279        #get the position of the robot
280        robot_position = p.getBasePositionAndOrientation(robotId)[0]
281        robot_position = list(robot_position)
282        joint_states = p.getJointStates(robotId, [0, 1, 2])
283        robot_position[0] -= joint_states[0][0]
284        robot_position[1] -= joint_states[1][0]
285        robot_position[2] += joint_states[2][0]
286        # x,y offset
287        x_offset = self.pipette_offset[0]
288        y_offset = self.pipette_offset[1]
289        z_offset = self.pipette_offset[2]-0.0015
290        # Get the position of the specimen
291        specimen_position = p.getBasePositionAndOrientation(self.specimenIds[0])[0]
292        #logging.info(f'droplet_position: {droplet_position}')
293        # Create a sphere to represent the droplet
294        sphereRadius = 0.003  # Adjust as needed
295        sphereColor = [1, 0, 0, 0.5]  # RGBA (Red in this case)
296        visualShapeId = p.createVisualShape(shapeType=p.GEOM_SPHERE, radius=sphereRadius, rgbaColor=sphereColor)
297        #add collision to the sphere
298        collision = p.createCollisionShape(shapeType=p.GEOM_SPHERE, radius=sphereRadius)
299        sphereBody = p.createMultiBody(baseMass=0.1, baseVisualShapeIndex=visualShapeId, baseCollisionShapeIndex=collision)
300        # Calculate the position of the droplet at the tip of the pipette but at the same z coordinate as the specimen
301        droplet_position = [robot_position[0]+x_offset, robot_position[1]+y_offset, robot_position[2]+z_offset]
302                            #specimen_position[2] + sphereRadius+0.015/2+0.06]
303        p.resetBasePositionAndOrientation(sphereBody, droplet_position, [0, 0, 0, 1])
304        # track the sphere id
305        self.sphereIds.append(sphereBody)
306        self.dropped = True
307        #TODO: add some randomness to the droplet position proportional to the height of the pipette above the specimen and the velocity of the pipette of the pipette
308        return droplet_position
def get_states(self):
311    def get_states(self):
312        states = {}
313        for robotId in self.robotIds:
314            raw_joint_states = p.getJointStates(robotId, [0, 1, 2])
315
316            # Convert joint states into a dictionary
317            joint_states = {}
318            for i, joint_state in enumerate(raw_joint_states):
319                joint_states[f'joint_{i}'] = {
320                    'position': joint_state[0],
321                    'velocity': joint_state[1],
322                    'reaction_forces': joint_state[2],
323                    'motor_torque': joint_state[3]
324                }
325
326            # Robot position
327            robot_position = p.getBasePositionAndOrientation(robotId)[0]
328            robot_position = list(robot_position)
329
330            # Adjust robot position based on joint states
331            robot_position[0] -= raw_joint_states[0][0]
332            robot_position[1] -= raw_joint_states[1][0]
333            robot_position[2] += raw_joint_states[2][0]
334
335            # Pipette position
336            pipette_position = [robot_position[0] + self.pipette_offset[0],
337                                robot_position[1] + self.pipette_offset[1],
338                                robot_position[2] + self.pipette_offset[2]]
339            # Round pipette position to 4 decimal places
340            pipette_position = [round(num, 4) for num in pipette_position]
341
342            # Store information in the dictionary
343            states[f'robotId_{robotId}'] = {
344                "joint_states": joint_states,
345                "robot_position": robot_position,
346                "pipette_position": pipette_position
347            }
348
349        return states
def check_contact(self, robotId, specimenId):
352    def check_contact(self, robotId, specimenId):
353        for sphereId in self.sphereIds:
354            # Check contact with the specimen
355            contact_points_specimen = p.getContactPoints(sphereId, specimenId)
356            # Check contact with the robot
357            contact_points_robot = p.getContactPoints(sphereId, robotId)
358
359            # If contact with the specimen is detected
360            if contact_points_specimen:
361                #logging.info(f'sphereId: {sphereId}, in contact with specimen: {specimenId}')
362                # Disable collision between the sphere and the specimen
363                p.setCollisionFilterPair(sphereId, specimenId, -1, -1, enableCollision=0)
364                #logging.info(f'sphereId: {sphereId}, collision disabled')
365                # Get current position and orientation of the sphere
366                sphere_position, sphere_orientation = p.getBasePositionAndOrientation(sphereId)
367                # Fix the sphere in place relative to the world
368                p.createConstraint(parentBodyUniqueId=sphereId,
369                                    parentLinkIndex=-1,
370                                    childBodyUniqueId=-1,
371                                    childLinkIndex=-1,
372                                    jointType=p.JOINT_FIXED,
373                                    jointAxis=[0, 0, 0],
374                                    parentFramePosition=[0, 0, 0],
375                                    childFramePosition=sphere_position,
376                                    childFrameOrientation=sphere_orientation)
377                # track the final position of the sphere on the specimen by adding it to the dictionary
378                if f'specimenId_{specimenId}' in self.droplet_positions:
379                    self.droplet_positions[f'specimenId_{specimenId}'].append(sphere_position)
380                else:
381                    self.droplet_positions[f'specimenId_{specimenId}'] = [sphere_position]
382
383                #logging.info(f'sphereId: {sphereId}, fixed in place')
384
385                # # turn off all collisions with other spheres that are in contact with this sphere
386                # for sphereId2 in self.sphereIds:
387                #     if sphereId2 != sphereId:
388                #         contact_points = p.getContactPoints(sphereId, sphereId2)
389                #         if contact_points:
390                #             p.setCollisionFilterPair(sphereId, sphereId2, -1, -1, enableCollision=0)
391                #             logging.info(f'sphereId: {sphereId}, collision disabled with sphereId2: {sphereId2}')
392
393            # If contact with the robot is detected
394            if contact_points_robot:
395                # Destroy the sphere
396                p.removeBody(sphereId)
397                #logging.info(f'sphereId: {sphereId}, removed')
398                # Remove the sphereId from the list of sphereIds
399                self.sphereIds.remove(sphereId)
400                # Disable collision between the sphere and the robot
401                # p.setCollisionFilterPair(sphereId, robotId, -1, -1, enableCollision=0)
402                # Get current position and orientation of the sphere
403                # sphere_position, sphere_orientation = p.getBasePositionAndOrientation(sphereId)
404                # sphere_position = list(sphere_position)
405                # sphere_position[2] += 0.001
406                # # Fix the sphere in place relative to the world
407                # p.createConstraint(parentBodyUniqueId=sphereId,
408                #                     parentLinkIndex=-1,
409                #                     childBodyUniqueId=-1,
410                #                     childLinkIndex=-1,
411                #                     jointType=p.JOINT_FIXED,
412                #                     jointAxis=[0, 0, 0],
413                #                     parentFramePosition=[0, 0, 0],
414                #                     childFramePosition=sphere_position,
415                #                     childFrameOrientation=sphere_orientation)
416                                # turn off all collisions with other spheres that are in contact with this sphere
417                # for sphereId2 in self.sphereIds:
418                #     if sphereId2 != sphereId:
419                #         contact_points = p.getContactPoints(sphereId, sphereId2)
420                #         if contact_points:
421                #             p.setCollisionFilterPair(sphereId, sphereId2, -1, -1, enableCollision=0)
422                #             logging.info(f'sphereId: {sphereId}, collision disabled with sphereId2: {sphereId2}')
def set_start_position(self, x, y, z):
424    def set_start_position(self, x, y, z):
425        # Iterate through each robot and set its pipette to the start position
426        for robotId in self.robotIds:
427            # Calculate the necessary joint positions to reach the desired start position
428            # The calculation depends on the kinematic model of the robot
429            # For simplicity, let's assume a simple model where each joint moves in one axis (x, y, z)
430            # You might need to adjust this based on the actual robot kinematics
431
432            # Adjust the x, y, z values based on the robot's current position and pipette offset
433            robot_position = p.getBasePositionAndOrientation(robotId)[0]
434            adjusted_x = x - robot_position[0] - self.pipette_offset[0]
435            adjusted_y = y - robot_position[1] - self.pipette_offset[1]
436            adjusted_z = z - robot_position[2] - self.pipette_offset[2]
437
438            # Reset the joint positions/start position
439            p.resetJointState(robotId, 0, targetValue=adjusted_x)
440            p.resetJointState(robotId, 1, targetValue=adjusted_y)
441            p.resetJointState(robotId, 2, targetValue=adjusted_z)
def get_plate_image(self):
444    def get_plate_image(self):
445        return self.plate_image_path
def close(self):
448    def close(self):
449        p.disconnect()