library.robot_control

  1import pybullet as p
  2import imageio
  3from PIL import Image
  4from pathlib import Path
  5
  6def search_limit(sim, joint_idx, direction, max_steps=1000):
  7    """
  8    Run a joint in the positive or negative direction for a set number of steps 
  9    reporting when the position if it has stopped within the `max_steps` range.
 10
 11    Args:
 12        sim (Simulation): The simulation object
 13        joint_idx (int): integer value of joint
 14        direction (int): search in the positive or negative direction
 15        max_steps(int): Maximum steps to search for each limit
 16    
 17    Returns
 18        tuple (min_limit, max_limit) for the joint
 19    """
 20    actions = [[0, 0, 0, 0]]
 21    actions[0][joint_idx] = direction
 22    
 23    for _ in range(max_steps):
 24        sim.run(actions, num_steps=1)
 25        states = sim.get_states()
 26        robotId = list(states.keys())[0]
 27        vel = states[robotId]['joint_states'][f'joint_{joint_idx}']['velocity']
 28        
 29        if abs(vel) < 0.001:
 30            return get_position(sim)[joint_idx]
 31    
 32    return None
 33
 34
 35def find_joint_limit(sim, joint, max_steps=1000, reset=True):
 36    """
 37    Find both limits of a joint by moving positive then negative using velocity values
 38    
 39    Args:
 40        sim (Simulation): The simulation object
 41        joint (str): Which joint to test: 'x', 'y', or 'z'
 42    max_steps(int): Maximum steps to search for each limit
 43    reset(bool): Whether to reset simulation before testing
 44    stable_steps(int): Number of consecutive steps with unchanged position to consider stable
 45    
 46    Returns
 47        tuple (min_limit, max_limit) for the joint
 48    """
 49    if reset:
 50        sim.reset()
 51    
 52    joint_map = {'x': 0, 'y': 1, 'z': 2}
 53    joint_idx = joint_map[joint.lower()]
 54    
 55    
 56    max_limit = search_limit(sim, joint_idx, 1.0, max_steps)
 57    min_limit = search_limit(sim, joint_idx, -1.0, max_steps)
 58    
 59    return (min_limit, max_limit)
 60
 61def find_joint_limit_pos(sim, joint, max_steps=1000, reset=True, stable_steps=10):
 62    """
 63    Find both limits of a joint by moving positive then negative using position values
 64    
 65    Args:
 66        sim (Simulation): The simulation object
 67        joint (str): Which joint to test: 'x', 'y', or 'z'
 68    max_steps(int): Maximum steps to search for each limit
 69    reset(bool): Whether to reset simulation before testing
 70    stable_steps(int): Number of consecutive steps with unchanged position to consider stable
 71    
 72    Returns
 73        tuple (min_limit, max_limit) for the joint
 74    """
 75    if reset:
 76        sim.reset()
 77    
 78    joint_map = {'x': 0, 'y': 1, 'z': 2}
 79    joint_idx = joint_map[joint.lower()]
 80    
 81    def search_limit(direction):
 82        actions = [[0, 0, 0, 0]]
 83        actions[0][joint_idx] = direction
 84        
 85        prev_position = None
 86        stable_count = 0
 87        
 88        for _ in range(max_steps):
 89            sim.run(actions, num_steps=1)
 90            
 91            current_position = get_position(sim)[joint_idx]
 92            
 93            # Check if position hasn't changed
 94            if prev_position is not None:
 95                if abs(current_position - prev_position) < 1e-6:  # Position tolerance
 96                    stable_count += 1
 97                    if stable_count >= stable_steps:
 98                        return current_position
 99                else:
100                    stable_count = 0  # Reset counter if position changed
101            
102            prev_position = current_position
103        
104        return None
105    
106    max_limit = search_limit(1.0)
107    min_limit = search_limit(-1.0)
108    
109    return (min_limit, max_limit)
110
111def get_position(sim, robotId=None):
112    """Get the position of the pipet using 
113    
114    Args:
115        sim(simulation object)
116        robotId(str): entity id to query, None returns first entity, 'all' returns all as dict
117        
118    Returns:
119        list of positions [x, y, z] OR
120        dict of list of positions {robotId: [x, y, z]}"""
121    states = sim.get_states()
122
123    if isinstance(robotId, str):
124        if robotId == "all":
125            return  {i: states[i]['pipette_position'] for i in sorted(states.keys())}
126        
127        
128
129    if not robotId:
130        robotId = list(sorted(states.keys()))[0]
131    
132    return states.get(robotId, {}).get('pipette_position', [None, None, None])
133        
134def get_torques(sim, robotId=None):
135    """Get motor torques for robot joints.
136    
137    Args:
138        sim: Simulation instance
139        robotId (str or None): Robot ID to get torques for. 
140            - None: Returns torques for first robot
141            - "all": Returns torques for all robots as dict
142            - Specific ID: Returns torques for that robot
143    
144    Returns:
145        list or dict: List of torques [x, y, z] for single robot, 
146                     or dict mapping robot IDs to torque lists for "all"
147    """
148    states = sim.get_states()
149
150    if isinstance(robotId, str):
151        if robotId == "all":
152            data = {}
153            for id, val in states.items():
154                torques = []
155                joints = val['joint_states']
156                for j in sorted(joints.keys()):
157                    torques.append(joints[j]['motor_torque'])
158                data[id] = torques
159            return data
160
161    if not robotId:
162        robotId = list(sorted(states.keys()))[0]
163    
164    # Get torques for specific robot
165    robot_state = states.get(robotId, {})
166    joints = robot_state.get('joint_states', {})
167    torques = []
168    for j in sorted(joints.keys()):
169        torques.append(joints[j]['motor_torque'])
170    
171    return torques if torques else [None, None, None]
172
173
174def get_velocities(sim, robotId=None):
175    """
176    Get the velocity of the pipet
177    
178    Args:
179        sim(simulation object)
180        robotId(str): entity id to query, None returns first entity, 'all' returns all as dict
181        
182    Returns:
183        list of positions [x, y, z] OR
184        dict of list of velocities {robotId: [x, y, z]}
185    """    
186    states = sim.get_states()
187
188    if isinstance(robotId, str):
189        if robotId == "all":
190            return {i: [states[i]['joint_states'][j]['velocity'] 
191                       for j in sorted(states[i]['joint_states'].keys())] 
192                   for i in sorted(states.keys())}
193    
194    if not robotId:
195        robotId = list(sorted(states.keys()))[0]
196    
197    robot_state = states.get(robotId, {})
198    if 'joint_states' not in robot_state:
199        return [None, None, None]
200    
201    return [robot_state['joint_states'][j]['velocity'] 
202            for j in sorted(robot_state['joint_states'].keys())]
203
204
205def find_workspace(sim, max_steps=1000, reset=True):
206    """
207    Find the workspace limits and center point.
208    
209    Parameters:
210    -----------
211    sim : Simulation
212        The simulation object
213    max_steps : int
214        Maximum steps to search for each limit
215    reset : bool
216        Whether to reset simulation before testing
217    
218    Returns:
219    --------
220    dict
221        Dictionary containing:
222        - 'x_min', 'x_max': X axis limits
223        - 'y_min', 'y_max': Y axis limits
224        - 'z_min', 'z_max': Z axis limits
225        - 'center': (x, y, z) tuple of workspace center
226        - 'volume': workspace volume in cubic meters
227    """
228    # Find limits for each axis
229    x_limits = find_joint_limit(sim, 'x', max_steps, reset)
230    y_limits = find_joint_limit(sim, 'y', max_steps, False)
231    z_limits = find_joint_limit(sim, 'z', max_steps, False)
232    
233    # Ensure min < max for each axis
234    x_min, x_max = min(x_limits), max(x_limits)
235    y_min, y_max = min(y_limits), max(y_limits)
236    z_min, z_max = min(z_limits), max(z_limits)
237    
238    # Calculate center
239    center = (
240        (x_min + x_max) / 2.0,
241        (y_min + y_max) / 2.0,
242        (z_min + z_max) / 2.0
243    )
244    
245    # Calculate volume
246    volume = (x_max - x_min) * (y_max - y_min) * (z_max - z_min)
247    
248    return {
249        'x_min': x_min,
250        'x_max': x_max,
251        'y_min': y_min,
252        'y_max': y_max,
253        'z_min': z_min,
254        'z_max': z_max,
255        'center': center,
256        'volume': volume
257    }
258
259
260
261
262
263def record_simulation_gif(sim, action_function, filename, fps=25, 
264                          capture_every=1, width=640, height=480):
265    """
266    Record a GIF of any simulation action function.
267    
268    This is a general-purpose recorder that can wrap any function that
269    controls the simulation. The function will execute normally while
270    frames are captured in the background.
271    
272    Args:
273        sim: Simulation object (must have render=True).
274        action_function: Callable that performs actions in the simulation.
275            This function will be executed and its execution recorded.
276        filename (str or Path): Output filepath for the GIF (e.g., 'robot_motion.gif').
277        fps (int): Frames per second for the output GIF. Defaults to 25.
278        capture_every (int): Capture every Nth frame to reduce file size. 
279            E.g., capture_every=2 means capture every other frame. Defaults to 1.
280        width (int): Width of captured frames in pixels. Defaults to 640.
281        height (int): Height of captured frames in pixels. Defaults to 480.
282    
283    Returns:
284        Any: The return value from action_function (if any).
285    
286    Example:
287        >>> def move_robot():
288        >>>     for i in range(100):
289        >>>         sim.run([[0.1, 0, 0, 0]], num_steps=1)
290        >>> 
291        >>> record_simulation_gif(sim, move_robot, 'motion.gif')
292    """
293    import numpy as np
294    
295    # Check if simulation has rendering enabled
296    if not sim.render:
297        print("Error: Simulation must be created with render=True to record GIFs")
298        return action_function()
299    
300    frames = []
301    frame_count = 0
302    
303    # Create a frame capture callback
304    def capture_frame():
305        nonlocal frame_count
306        if frame_count % capture_every == 0:
307            try:
308                # Get camera image
309                img_data = p.getCameraImage(
310                    width, height,
311                    renderer=p.ER_BULLET_HARDWARE_OPENGL
312                )
313                
314                # Check if we got valid data (tuple with 5 elements)
315                if isinstance(img_data, tuple) and len(img_data) == 5:
316                    width_img, height_img, rgb, depth, seg = img_data
317                    
318                    # Convert rgb to numpy array if it isn't already
319                    if not isinstance(rgb, np.ndarray):
320                        rgb = np.array(rgb, dtype=np.uint8)
321                    
322                    # Reshape if needed (flat array to 2D image)
323                    if len(rgb.shape) == 1:
324                        rgb = rgb.reshape((height, width, 4))
325                    
326                    # Convert to PIL Image (remove alpha channel)
327                    if rgb.shape[2] == 4:
328                        frame = Image.fromarray(rgb[:, :, :3], 'RGB')
329                    else:
330                        frame = Image.fromarray(rgb, 'RGB')
331                    frames.append(frame)
332            except Exception as e:
333                print(f"Warning: Failed to capture frame {frame_count}: {e}")
334        frame_count += 1
335    
336    # Monkey-patch sim.run to capture frames
337    original_run = sim.run
338    
339    def run_with_capture(*args, **kwargs):
340        result = original_run(*args, **kwargs)
341        capture_frame()
342        return result
343    
344    # Temporarily replace sim.run
345    sim.run = run_with_capture
346    
347    try:
348        # Execute the action function
349        result = action_function()
350    finally:
351        # Restore original sim.run
352        sim.run = original_run
353    
354    # Save as GIF
355    if frames:
356        duration = 1.0 / fps  # Duration per frame in seconds
357        imageio.mimsave(filename, frames, duration=duration)
358        print(f"GIF saved to {filename} ({len(frames)} frames at {fps} fps)")
359    else:
360        print("Warning: No frames captured! Make sure your action_function calls sim.run()")
361    
362    return result
def search_limit(sim, joint_idx, direction, max_steps=1000):
 7def search_limit(sim, joint_idx, direction, max_steps=1000):
 8    """
 9    Run a joint in the positive or negative direction for a set number of steps 
10    reporting when the position if it has stopped within the `max_steps` range.
11
12    Args:
13        sim (Simulation): The simulation object
14        joint_idx (int): integer value of joint
15        direction (int): search in the positive or negative direction
16        max_steps(int): Maximum steps to search for each limit
17    
18    Returns
19        tuple (min_limit, max_limit) for the joint
20    """
21    actions = [[0, 0, 0, 0]]
22    actions[0][joint_idx] = direction
23    
24    for _ in range(max_steps):
25        sim.run(actions, num_steps=1)
26        states = sim.get_states()
27        robotId = list(states.keys())[0]
28        vel = states[robotId]['joint_states'][f'joint_{joint_idx}']['velocity']
29        
30        if abs(vel) < 0.001:
31            return get_position(sim)[joint_idx]
32    
33    return None

Run a joint in the positive or negative direction for a set number of steps reporting when the position if it has stopped within the max_steps range.

Arguments:
  • sim (Simulation): The simulation object
  • joint_idx (int): integer value of joint
  • direction (int): search in the positive or negative direction
  • max_steps(int): Maximum steps to search for each limit

Returns tuple (min_limit, max_limit) for the joint

def find_joint_limit(sim, joint, max_steps=1000, reset=True):
36def find_joint_limit(sim, joint, max_steps=1000, reset=True):
37    """
38    Find both limits of a joint by moving positive then negative using velocity values
39    
40    Args:
41        sim (Simulation): The simulation object
42        joint (str): Which joint to test: 'x', 'y', or 'z'
43    max_steps(int): Maximum steps to search for each limit
44    reset(bool): Whether to reset simulation before testing
45    stable_steps(int): Number of consecutive steps with unchanged position to consider stable
46    
47    Returns
48        tuple (min_limit, max_limit) for the joint
49    """
50    if reset:
51        sim.reset()
52    
53    joint_map = {'x': 0, 'y': 1, 'z': 2}
54    joint_idx = joint_map[joint.lower()]
55    
56    
57    max_limit = search_limit(sim, joint_idx, 1.0, max_steps)
58    min_limit = search_limit(sim, joint_idx, -1.0, max_steps)
59    
60    return (min_limit, max_limit)

Find both limits of a joint by moving positive then negative using velocity values

Arguments:
  • sim (Simulation): The simulation object
  • joint (str): Which joint to test: 'x', 'y', or 'z'

max_steps(int): Maximum steps to search for each limit reset(bool): Whether to reset simulation before testing stable_steps(int): Number of consecutive steps with unchanged position to consider stable

Returns tuple (min_limit, max_limit) for the joint

def find_joint_limit_pos(sim, joint, max_steps=1000, reset=True, stable_steps=10):
 62def find_joint_limit_pos(sim, joint, max_steps=1000, reset=True, stable_steps=10):
 63    """
 64    Find both limits of a joint by moving positive then negative using position values
 65    
 66    Args:
 67        sim (Simulation): The simulation object
 68        joint (str): Which joint to test: 'x', 'y', or 'z'
 69    max_steps(int): Maximum steps to search for each limit
 70    reset(bool): Whether to reset simulation before testing
 71    stable_steps(int): Number of consecutive steps with unchanged position to consider stable
 72    
 73    Returns
 74        tuple (min_limit, max_limit) for the joint
 75    """
 76    if reset:
 77        sim.reset()
 78    
 79    joint_map = {'x': 0, 'y': 1, 'z': 2}
 80    joint_idx = joint_map[joint.lower()]
 81    
 82    def search_limit(direction):
 83        actions = [[0, 0, 0, 0]]
 84        actions[0][joint_idx] = direction
 85        
 86        prev_position = None
 87        stable_count = 0
 88        
 89        for _ in range(max_steps):
 90            sim.run(actions, num_steps=1)
 91            
 92            current_position = get_position(sim)[joint_idx]
 93            
 94            # Check if position hasn't changed
 95            if prev_position is not None:
 96                if abs(current_position - prev_position) < 1e-6:  # Position tolerance
 97                    stable_count += 1
 98                    if stable_count >= stable_steps:
 99                        return current_position
100                else:
101                    stable_count = 0  # Reset counter if position changed
102            
103            prev_position = current_position
104        
105        return None
106    
107    max_limit = search_limit(1.0)
108    min_limit = search_limit(-1.0)
109    
110    return (min_limit, max_limit)

Find both limits of a joint by moving positive then negative using position values

Arguments:
  • sim (Simulation): The simulation object
  • joint (str): Which joint to test: 'x', 'y', or 'z'

max_steps(int): Maximum steps to search for each limit reset(bool): Whether to reset simulation before testing stable_steps(int): Number of consecutive steps with unchanged position to consider stable

Returns tuple (min_limit, max_limit) for the joint

def get_position(sim, robotId=None):
112def get_position(sim, robotId=None):
113    """Get the position of the pipet using 
114    
115    Args:
116        sim(simulation object)
117        robotId(str): entity id to query, None returns first entity, 'all' returns all as dict
118        
119    Returns:
120        list of positions [x, y, z] OR
121        dict of list of positions {robotId: [x, y, z]}"""
122    states = sim.get_states()
123
124    if isinstance(robotId, str):
125        if robotId == "all":
126            return  {i: states[i]['pipette_position'] for i in sorted(states.keys())}
127        
128        
129
130    if not robotId:
131        robotId = list(sorted(states.keys()))[0]
132    
133    return states.get(robotId, {}).get('pipette_position', [None, None, None])

Get the position of the pipet using

Arguments:
  • sim(simulation object)
  • robotId(str): entity id to query, None returns first entity, 'all' returns all as dict
Returns:

list of positions [x, y, z] OR dict of list of positions {robotId: [x, y, z]}

def get_torques(sim, robotId=None):
135def get_torques(sim, robotId=None):
136    """Get motor torques for robot joints.
137    
138    Args:
139        sim: Simulation instance
140        robotId (str or None): Robot ID to get torques for. 
141            - None: Returns torques for first robot
142            - "all": Returns torques for all robots as dict
143            - Specific ID: Returns torques for that robot
144    
145    Returns:
146        list or dict: List of torques [x, y, z] for single robot, 
147                     or dict mapping robot IDs to torque lists for "all"
148    """
149    states = sim.get_states()
150
151    if isinstance(robotId, str):
152        if robotId == "all":
153            data = {}
154            for id, val in states.items():
155                torques = []
156                joints = val['joint_states']
157                for j in sorted(joints.keys()):
158                    torques.append(joints[j]['motor_torque'])
159                data[id] = torques
160            return data
161
162    if not robotId:
163        robotId = list(sorted(states.keys()))[0]
164    
165    # Get torques for specific robot
166    robot_state = states.get(robotId, {})
167    joints = robot_state.get('joint_states', {})
168    torques = []
169    for j in sorted(joints.keys()):
170        torques.append(joints[j]['motor_torque'])
171    
172    return torques if torques else [None, None, None]

Get motor torques for robot joints.

Arguments:
  • sim: Simulation instance
  • robotId (str or None): Robot ID to get torques for.
    • None: Returns torques for first robot
    • "all": Returns torques for all robots as dict
    • Specific ID: Returns torques for that robot
Returns:

list or dict: List of torques [x, y, z] for single robot, or dict mapping robot IDs to torque lists for "all"

def get_velocities(sim, robotId=None):
175def get_velocities(sim, robotId=None):
176    """
177    Get the velocity of the pipet
178    
179    Args:
180        sim(simulation object)
181        robotId(str): entity id to query, None returns first entity, 'all' returns all as dict
182        
183    Returns:
184        list of positions [x, y, z] OR
185        dict of list of velocities {robotId: [x, y, z]}
186    """    
187    states = sim.get_states()
188
189    if isinstance(robotId, str):
190        if robotId == "all":
191            return {i: [states[i]['joint_states'][j]['velocity'] 
192                       for j in sorted(states[i]['joint_states'].keys())] 
193                   for i in sorted(states.keys())}
194    
195    if not robotId:
196        robotId = list(sorted(states.keys()))[0]
197    
198    robot_state = states.get(robotId, {})
199    if 'joint_states' not in robot_state:
200        return [None, None, None]
201    
202    return [robot_state['joint_states'][j]['velocity'] 
203            for j in sorted(robot_state['joint_states'].keys())]

Get the velocity of the pipet

Arguments:
  • sim(simulation object)
  • robotId(str): entity id to query, None returns first entity, 'all' returns all as dict
Returns:

list of positions [x, y, z] OR dict of list of velocities {robotId: [x, y, z]}

def find_workspace(sim, max_steps=1000, reset=True):
206def find_workspace(sim, max_steps=1000, reset=True):
207    """
208    Find the workspace limits and center point.
209    
210    Parameters:
211    -----------
212    sim : Simulation
213        The simulation object
214    max_steps : int
215        Maximum steps to search for each limit
216    reset : bool
217        Whether to reset simulation before testing
218    
219    Returns:
220    --------
221    dict
222        Dictionary containing:
223        - 'x_min', 'x_max': X axis limits
224        - 'y_min', 'y_max': Y axis limits
225        - 'z_min', 'z_max': Z axis limits
226        - 'center': (x, y, z) tuple of workspace center
227        - 'volume': workspace volume in cubic meters
228    """
229    # Find limits for each axis
230    x_limits = find_joint_limit(sim, 'x', max_steps, reset)
231    y_limits = find_joint_limit(sim, 'y', max_steps, False)
232    z_limits = find_joint_limit(sim, 'z', max_steps, False)
233    
234    # Ensure min < max for each axis
235    x_min, x_max = min(x_limits), max(x_limits)
236    y_min, y_max = min(y_limits), max(y_limits)
237    z_min, z_max = min(z_limits), max(z_limits)
238    
239    # Calculate center
240    center = (
241        (x_min + x_max) / 2.0,
242        (y_min + y_max) / 2.0,
243        (z_min + z_max) / 2.0
244    )
245    
246    # Calculate volume
247    volume = (x_max - x_min) * (y_max - y_min) * (z_max - z_min)
248    
249    return {
250        'x_min': x_min,
251        'x_max': x_max,
252        'y_min': y_min,
253        'y_max': y_max,
254        'z_min': z_min,
255        'z_max': z_max,
256        'center': center,
257        'volume': volume
258    }

Find the workspace limits and center point.

Parameters:

sim : Simulation The simulation object max_steps : int Maximum steps to search for each limit reset : bool Whether to reset simulation before testing

Returns:

dict Dictionary containing: - 'x_min', 'x_max': X axis limits - 'y_min', 'y_max': Y axis limits - 'z_min', 'z_max': Z axis limits - 'center': (x, y, z) tuple of workspace center - 'volume': workspace volume in cubic meters

def record_simulation_gif( sim, action_function, filename, fps=25, capture_every=1, width=640, height=480):
264def record_simulation_gif(sim, action_function, filename, fps=25, 
265                          capture_every=1, width=640, height=480):
266    """
267    Record a GIF of any simulation action function.
268    
269    This is a general-purpose recorder that can wrap any function that
270    controls the simulation. The function will execute normally while
271    frames are captured in the background.
272    
273    Args:
274        sim: Simulation object (must have render=True).
275        action_function: Callable that performs actions in the simulation.
276            This function will be executed and its execution recorded.
277        filename (str or Path): Output filepath for the GIF (e.g., 'robot_motion.gif').
278        fps (int): Frames per second for the output GIF. Defaults to 25.
279        capture_every (int): Capture every Nth frame to reduce file size. 
280            E.g., capture_every=2 means capture every other frame. Defaults to 1.
281        width (int): Width of captured frames in pixels. Defaults to 640.
282        height (int): Height of captured frames in pixels. Defaults to 480.
283    
284    Returns:
285        Any: The return value from action_function (if any).
286    
287    Example:
288        >>> def move_robot():
289        >>>     for i in range(100):
290        >>>         sim.run([[0.1, 0, 0, 0]], num_steps=1)
291        >>> 
292        >>> record_simulation_gif(sim, move_robot, 'motion.gif')
293    """
294    import numpy as np
295    
296    # Check if simulation has rendering enabled
297    if not sim.render:
298        print("Error: Simulation must be created with render=True to record GIFs")
299        return action_function()
300    
301    frames = []
302    frame_count = 0
303    
304    # Create a frame capture callback
305    def capture_frame():
306        nonlocal frame_count
307        if frame_count % capture_every == 0:
308            try:
309                # Get camera image
310                img_data = p.getCameraImage(
311                    width, height,
312                    renderer=p.ER_BULLET_HARDWARE_OPENGL
313                )
314                
315                # Check if we got valid data (tuple with 5 elements)
316                if isinstance(img_data, tuple) and len(img_data) == 5:
317                    width_img, height_img, rgb, depth, seg = img_data
318                    
319                    # Convert rgb to numpy array if it isn't already
320                    if not isinstance(rgb, np.ndarray):
321                        rgb = np.array(rgb, dtype=np.uint8)
322                    
323                    # Reshape if needed (flat array to 2D image)
324                    if len(rgb.shape) == 1:
325                        rgb = rgb.reshape((height, width, 4))
326                    
327                    # Convert to PIL Image (remove alpha channel)
328                    if rgb.shape[2] == 4:
329                        frame = Image.fromarray(rgb[:, :, :3], 'RGB')
330                    else:
331                        frame = Image.fromarray(rgb, 'RGB')
332                    frames.append(frame)
333            except Exception as e:
334                print(f"Warning: Failed to capture frame {frame_count}: {e}")
335        frame_count += 1
336    
337    # Monkey-patch sim.run to capture frames
338    original_run = sim.run
339    
340    def run_with_capture(*args, **kwargs):
341        result = original_run(*args, **kwargs)
342        capture_frame()
343        return result
344    
345    # Temporarily replace sim.run
346    sim.run = run_with_capture
347    
348    try:
349        # Execute the action function
350        result = action_function()
351    finally:
352        # Restore original sim.run
353        sim.run = original_run
354    
355    # Save as GIF
356    if frames:
357        duration = 1.0 / fps  # Duration per frame in seconds
358        imageio.mimsave(filename, frames, duration=duration)
359        print(f"GIF saved to {filename} ({len(frames)} frames at {fps} fps)")
360    else:
361        print("Warning: No frames captured! Make sure your action_function calls sim.run()")
362    
363    return result

Record a GIF of any simulation action function.

This is a general-purpose recorder that can wrap any function that controls the simulation. The function will execute normally while frames are captured in the background.

Arguments:
  • sim: Simulation object (must have render=True).
  • action_function: Callable that performs actions in the simulation. This function will be executed and its execution recorded.
  • filename (str or Path): Output filepath for the GIF (e.g., 'robot_motion.gif').
  • fps (int): Frames per second for the output GIF. Defaults to 25.
  • capture_every (int): Capture every Nth frame to reduce file size. E.g., capture_every=2 means capture every other frame. Defaults to 1.
  • width (int): Width of captured frames in pixels. Defaults to 640.
  • height (int): Height of captured frames in pixels. Defaults to 480.
Returns:

Any: The return value from action_function (if any).

Example:
>>> def move_robot():
>>>     for i in range(100):
>>>         sim.run([[0.1, 0, 0, 0]], num_steps=1)
>>> 
>>> record_simulation_gif(sim, move_robot, 'motion.gif')