# Copyright (c) 2021-2023, NVIDIA Corporation
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# 1. Redistributions of source code must retain the above copyright notice, this
# list of conditions and the following disclaimer.
#
# 2. Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
#
# 3. Neither the name of the copyright holder nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
import os
from isaacgym import gymapi
from isaacgym import gymtorch
from rofunc.learning.RofuncRL.tasks.isaacgymenv.base.vec_task import VecTask
from rofunc.learning.RofuncRL.tasks.utils.torch_jit_utils import *
from rofunc.utils.oslab.path import get_rofunc_path
@torch.jit.script
def axisangle2quat(vec, eps=1e-6):
# type: (Tensor, float) -> Tensor
# store input shape and reshape
input_shape = vec.shape[:-1]
vec = vec.reshape(-1, 3)
# Grab angle
angle = torch.norm(vec, dim=-1, keepdim=True)
# Create return array
quat = torch.zeros(torch.prod(torch.tensor(input_shape)), 4, device=vec.device)
quat[:, 3] = 1.0
# Grab indexes where angle is not zero an convert the input to its quaternion form
idx = angle.reshape(-1) > eps
quat[idx, :] = torch.cat([
vec[idx, :] * torch.sin(angle[idx, :] / 2.0) / angle[idx, :],
torch.cos(angle[idx, :] / 2.0)
], dim=-1)
# Reshape and return output
quat = quat.reshape(list(input_shape) + [4, ])
return quat
[docs]class FrankaCubeStackTask(VecTask):
def __init__(self, cfg, rl_device, sim_device, graphics_device_id, headless, virtual_screen_capture, force_render):
self.cfg = cfg
self.max_episode_length = self.cfg["env"]["episodeLength"]
self.action_scale = self.cfg["env"]["actionScale"]
self.start_position_noise = self.cfg["env"]["startPositionNoise"]
self.start_rotation_noise = self.cfg["env"]["startRotationNoise"]
self.franka_position_noise = self.cfg["env"]["frankaPositionNoise"]
self.franka_rotation_noise = self.cfg["env"]["frankaRotationNoise"]
self.franka_dof_noise = self.cfg["env"]["frankaDofNoise"]
self.aggregate_mode = self.cfg["env"]["aggregateMode"]
# Create dicts to pass to reward function
self.reward_settings = {
"r_dist_scale": self.cfg["env"]["distRewardScale"],
"r_lift_scale": self.cfg["env"]["liftRewardScale"],
"r_align_scale": self.cfg["env"]["alignRewardScale"],
"r_stack_scale": self.cfg["env"]["stackRewardScale"],
}
# Controller type
self.control_type = self.cfg["env"]["controlType"]
assert self.control_type in {"osc", "joint_tor"}, \
"Invalid control type specified. Must be one of: {osc, joint_tor}"
# dimensions
# obs include: cubeA_pose (7) + cubeB_pos (3) + eef_pose (7) + q_gripper (2)
self.cfg["env"]["numObservations"] = 19 if self.control_type == "osc" else 26
# actions include: delta EEF if OSC (6) or joint torques (7) + bool gripper (1)
self.cfg["env"]["numActions"] = 7 if self.control_type == "osc" else 8
# Values to be filled in at runtime
self.states = {} # will be dict filled with relevant states to use for reward calculation
self.handles = {} # will be dict mapping names to relevant sim handles
self.num_dofs = None # Total number of DOFs per env
self.actions = None # Current actions to be deployed
self._init_cubeA_state = None # Initial state of cubeA for the current env
self._init_cubeB_state = None # Initial state of cubeB for the current env
self._cubeA_state = None # Current state of cubeA for the current env
self._cubeB_state = None # Current state of cubeB for the current env
self._cubeA_id = None # Actor ID corresponding to cubeA for a given env
self._cubeB_id = None # Actor ID corresponding to cubeB for a given env
# Tensor placeholders
self._root_state = None # State of root body (n_envs, 13)
self._dof_state = None # State of all joints (n_envs, n_dof)
self._q = None # Joint positions (n_envs, n_dof)
self._qd = None # Joint velocities (n_envs, n_dof)
self._rigid_body_state = None # State of all rigid bodies (n_envs, n_bodies, 13)
self._contact_forces = None # Contact forces in sim
self._eef_state = None # end effector state (at grasping point)
self._eef_lf_state = None # end effector state (at left fingertip)
self._eef_rf_state = None # end effector state (at left fingertip)
self._j_eef = None # Jacobian for end effector
self._mm = None # Mass matrix
self._arm_control = None # Tensor buffer for controlling arm
self._gripper_control = None # Tensor buffer for controlling gripper
self._pos_control = None # Position actions
self._effort_control = None # Torque actions
self._franka_effort_limits = None # Actuator effort limits for franka
self._global_indices = None # Unique indices corresponding to all envs in flattened array
self.debug_viz = self.cfg["env"]["enableDebugVis"]
self.up_axis = "z"
self.up_axis_idx = 2
super().__init__(config=self.cfg, rl_device=rl_device, sim_device=sim_device,
graphics_device_id=graphics_device_id, headless=headless,
virtual_screen_capture=virtual_screen_capture, force_render=force_render)
# Franka defaults
self.franka_default_dof_pos = to_torch(
[0, 0.1963, 0, -2.6180, 0, 2.9416, 0.7854, 0.035, 0.035], device=self.device
)
# OSC Gains
self.kp = to_torch([150.] * 6, device=self.device)
self.kd = 2 * torch.sqrt(self.kp)
self.kp_null = to_torch([10.] * 7, device=self.device)
self.kd_null = 2 * torch.sqrt(self.kp_null)
# self.cmd_limit = None # filled in later
# Set control limits
self.cmd_limit = to_torch([0.1, 0.1, 0.1, 0.5, 0.5, 0.5], device=self.device).unsqueeze(0) if \
self.control_type == "osc" else self._franka_effort_limits[:7].unsqueeze(0)
# Reset all environments
self.reset_idx(torch.arange(self.num_envs, device=self.device))
# Refresh tensors
self._refresh()
[docs] def create_sim(self):
self.sim_params.up_axis = gymapi.UP_AXIS_Z
self.sim_params.gravity.x = 0
self.sim_params.gravity.y = 0
self.sim_params.gravity.z = -9.81
self.sim = super().create_sim(
self.device_id, self.graphics_device_id, self.physics_engine, self.sim_params)
self._create_ground_plane()
self._create_envs(self.num_envs, self.cfg["env"]['envSpacing'], int(np.sqrt(self.num_envs)))
def _create_ground_plane(self):
plane_params = gymapi.PlaneParams()
plane_params.normal = gymapi.Vec3(0.0, 0.0, 1.0)
self.gym.add_ground(self.sim, plane_params)
def _create_envs(self, num_envs, spacing, num_per_row):
lower = gymapi.Vec3(-spacing, -spacing, 0.0)
upper = gymapi.Vec3(spacing, spacing, spacing)
# get rofunc path from rofunc package metadata
rofunc_path = get_rofunc_path()
asset_root = os.path.join(rofunc_path, "simulator/assets")
franka_asset_file = "urdf/franka_description/robots/franka_panda_gripper.urdf"
if "asset" in self.cfg["env"]:
franka_asset_file = self.cfg["env"]["asset"].get("assetFileNameFranka", franka_asset_file)
# load franka asset
asset_options = gymapi.AssetOptions()
asset_options.flip_visual_attachments = True
asset_options.fix_base_link = True
asset_options.collapse_fixed_joints = False
asset_options.disable_gravity = True
asset_options.thickness = 0.001
asset_options.default_dof_drive_mode = gymapi.DOF_MODE_EFFORT
asset_options.use_mesh_materials = True
franka_asset = self.gym.load_asset(self.sim, asset_root, franka_asset_file, asset_options)
franka_dof_stiffness = to_torch([0, 0, 0, 0, 0, 0, 0, 5000., 5000.], dtype=torch.float, device=self.device)
franka_dof_damping = to_torch([0, 0, 0, 0, 0, 0, 0, 1.0e2, 1.0e2], dtype=torch.float, device=self.device)
# Create table asset
table_pos = [0.0, 0.0, 1.0]
table_thickness = 0.05
table_opts = gymapi.AssetOptions()
table_opts.fix_base_link = True
table_asset = self.gym.create_box(self.sim, *[1.2, 1.2, table_thickness], table_opts)
# Create table stand asset
table_stand_height = 0.1
table_stand_pos = [-0.5, 0.0, 1.0 + table_thickness / 2 + table_stand_height / 2]
table_stand_opts = gymapi.AssetOptions()
table_stand_opts.fix_base_link = True
table_stand_asset = self.gym.create_box(self.sim, *[0.2, 0.2, table_stand_height], table_opts)
self.cubeA_size = 0.050
self.cubeB_size = 0.070
# Create cubeA asset
cubeA_opts = gymapi.AssetOptions()
cubeA_asset = self.gym.create_box(self.sim, *([self.cubeA_size] * 3), cubeA_opts)
cubeA_color = gymapi.Vec3(0.6, 0.1, 0.0)
# Create cubeB asset
cubeB_opts = gymapi.AssetOptions()
cubeB_asset = self.gym.create_box(self.sim, *([self.cubeB_size] * 3), cubeB_opts)
cubeB_color = gymapi.Vec3(0.0, 0.4, 0.1)
self.num_franka_bodies = self.gym.get_asset_rigid_body_count(franka_asset)
self.num_franka_dofs = self.gym.get_asset_dof_count(franka_asset)
print("num franka bodies: ", self.num_franka_bodies)
print("num franka dofs: ", self.num_franka_dofs)
# set franka dof properties
franka_dof_props = self.gym.get_asset_dof_properties(franka_asset)
self.franka_dof_lower_limits = []
self.franka_dof_upper_limits = []
self._franka_effort_limits = []
for i in range(self.num_franka_dofs):
franka_dof_props['driveMode'][i] = gymapi.DOF_MODE_POS if i > 6 else gymapi.DOF_MODE_EFFORT
if self.physics_engine == gymapi.SIM_PHYSX:
franka_dof_props['stiffness'][i] = franka_dof_stiffness[i]
franka_dof_props['damping'][i] = franka_dof_damping[i]
else:
franka_dof_props['stiffness'][i] = 7000.0
franka_dof_props['damping'][i] = 50.0
self.franka_dof_lower_limits.append(franka_dof_props['lower'][i])
self.franka_dof_upper_limits.append(franka_dof_props['upper'][i])
self._franka_effort_limits.append(franka_dof_props['effort'][i])
self.franka_dof_lower_limits = to_torch(self.franka_dof_lower_limits, device=self.device)
self.franka_dof_upper_limits = to_torch(self.franka_dof_upper_limits, device=self.device)
self._franka_effort_limits = to_torch(self._franka_effort_limits, device=self.device)
self.franka_dof_speed_scales = torch.ones_like(self.franka_dof_lower_limits)
self.franka_dof_speed_scales[[7, 8]] = 0.1
franka_dof_props['effort'][7] = 200
franka_dof_props['effort'][8] = 200
# Define start pose for franka
franka_start_pose = gymapi.Transform()
franka_start_pose.p = gymapi.Vec3(-0.45, 0.0, 1.0 + table_thickness / 2 + table_stand_height)
franka_start_pose.r = gymapi.Quat(0.0, 0.0, 0.0, 1.0)
# Define start pose for table
table_start_pose = gymapi.Transform()
table_start_pose.p = gymapi.Vec3(*table_pos)
table_start_pose.r = gymapi.Quat(0.0, 0.0, 0.0, 1.0)
self._table_surface_pos = np.array(table_pos) + np.array([0, 0, table_thickness / 2])
self.reward_settings["table_height"] = self._table_surface_pos[2]
# Define start pose for table stand
table_stand_start_pose = gymapi.Transform()
table_stand_start_pose.p = gymapi.Vec3(*table_stand_pos)
table_stand_start_pose.r = gymapi.Quat(0.0, 0.0, 0.0, 1.0)
# Define start pose for cubes (doesn't really matter since they're get overridden during reset() anyways)
cubeA_start_pose = gymapi.Transform()
cubeA_start_pose.p = gymapi.Vec3(-1.0, 0.0, 0.0)
cubeA_start_pose.r = gymapi.Quat(0.0, 0.0, 0.0, 1.0)
cubeB_start_pose = gymapi.Transform()
cubeB_start_pose.p = gymapi.Vec3(1.0, 0.0, 0.0)
cubeB_start_pose.r = gymapi.Quat(0.0, 0.0, 0.0, 1.0)
# compute aggregate size
num_franka_bodies = self.gym.get_asset_rigid_body_count(franka_asset)
num_franka_shapes = self.gym.get_asset_rigid_shape_count(franka_asset)
max_agg_bodies = num_franka_bodies + 4 # 1 for table, table stand, cubeA, cubeB
max_agg_shapes = num_franka_shapes + 4 # 1 for table, table stand, cubeA, cubeB
self.frankas = []
self.envs = []
# Create environments
for i in range(self.num_envs):
# create env instance
env_ptr = self.gym.create_env(self.sim, lower, upper, num_per_row)
# Create actors and define aggregate group appropriately depending on setting
# NOTE: franka should ALWAYS be loaded first in sim!
if self.aggregate_mode >= 3:
self.gym.begin_aggregate(env_ptr, max_agg_bodies, max_agg_shapes, True)
# Create franka
# Potentially randomize start pose
if self.franka_position_noise > 0:
rand_xy = self.franka_position_noise * (-1. + np.random.rand(2) * 2.0)
franka_start_pose.p = gymapi.Vec3(-0.45 + rand_xy[0], 0.0 + rand_xy[1],
1.0 + table_thickness / 2 + table_stand_height)
if self.franka_rotation_noise > 0:
rand_rot = torch.zeros(1, 3)
rand_rot[:, -1] = self.franka_rotation_noise * (-1. + np.random.rand() * 2.0)
new_quat = axisangle2quat(rand_rot).squeeze().numpy().tolist()
franka_start_pose.r = gymapi.Quat(*new_quat)
franka_actor = self.gym.create_actor(env_ptr, franka_asset, franka_start_pose, "franka", i, 0, 0)
self.gym.set_actor_dof_properties(env_ptr, franka_actor, franka_dof_props)
if self.aggregate_mode == 2:
self.gym.begin_aggregate(env_ptr, max_agg_bodies, max_agg_shapes, True)
# Create table
table_actor = self.gym.create_actor(env_ptr, table_asset, table_start_pose, "table", i, 1, 0)
table_stand_actor = self.gym.create_actor(env_ptr, table_stand_asset, table_stand_start_pose, "table_stand",
i, 1, 0)
if self.aggregate_mode == 1:
self.gym.begin_aggregate(env_ptr, max_agg_bodies, max_agg_shapes, True)
# Create cubes
self._cubeA_id = self.gym.create_actor(env_ptr, cubeA_asset, cubeA_start_pose, "cubeA", i, 2, 0)
self._cubeB_id = self.gym.create_actor(env_ptr, cubeB_asset, cubeB_start_pose, "cubeB", i, 4, 0)
# Set colors
self.gym.set_rigid_body_color(env_ptr, self._cubeA_id, 0, gymapi.MESH_VISUAL, cubeA_color)
self.gym.set_rigid_body_color(env_ptr, self._cubeB_id, 0, gymapi.MESH_VISUAL, cubeB_color)
if self.aggregate_mode > 0:
self.gym.end_aggregate(env_ptr)
# Store the created env pointers
self.envs.append(env_ptr)
self.frankas.append(franka_actor)
# Setup init state buffer
self._init_cubeA_state = torch.zeros(self.num_envs, 13, device=self.device)
self._init_cubeB_state = torch.zeros(self.num_envs, 13, device=self.device)
# Setup data
self.init_data()
[docs] def init_data(self):
# Setup sim handles
env_ptr = self.envs[0]
franka_handle = 0
self.handles = {
# Franka
"hand": self.gym.find_actor_rigid_body_handle(env_ptr, franka_handle, "panda_hand"),
"leftfinger_tip": self.gym.find_actor_rigid_body_handle(env_ptr, franka_handle, "panda_leftfinger_tip"),
"rightfinger_tip": self.gym.find_actor_rigid_body_handle(env_ptr, franka_handle, "panda_rightfinger_tip"),
"grip_site": self.gym.find_actor_rigid_body_handle(env_ptr, franka_handle, "panda_grip_site"),
# Cubes
"cubeA_body_handle": self.gym.find_actor_rigid_body_handle(self.envs[0], self._cubeA_id, "box"),
"cubeB_body_handle": self.gym.find_actor_rigid_body_handle(self.envs[0], self._cubeB_id, "box"),
}
# Get total DOFs
self.num_dofs = self.gym.get_sim_dof_count(self.sim) // self.num_envs
# Setup tensor buffers
_actor_root_state_tensor = self.gym.acquire_actor_root_state_tensor(self.sim)
_dof_state_tensor = self.gym.acquire_dof_state_tensor(self.sim)
_rigid_body_state_tensor = self.gym.acquire_rigid_body_state_tensor(self.sim)
self._root_state = gymtorch.wrap_tensor(_actor_root_state_tensor).view(self.num_envs, -1, 13)
self._dof_state = gymtorch.wrap_tensor(_dof_state_tensor).view(self.num_envs, -1, 2)
self._rigid_body_state = gymtorch.wrap_tensor(_rigid_body_state_tensor).view(self.num_envs, -1, 13)
self._q = self._dof_state[..., 0]
self._qd = self._dof_state[..., 1]
self._eef_state = self._rigid_body_state[:, self.handles["grip_site"], :]
self._eef_lf_state = self._rigid_body_state[:, self.handles["leftfinger_tip"], :]
self._eef_rf_state = self._rigid_body_state[:, self.handles["rightfinger_tip"], :]
_jacobian = self.gym.acquire_jacobian_tensor(self.sim, "franka")
jacobian = gymtorch.wrap_tensor(_jacobian)
hand_joint_index = self.gym.get_actor_joint_dict(env_ptr, franka_handle)['panda_hand_joint']
self._j_eef = jacobian[:, hand_joint_index, :, :7]
_massmatrix = self.gym.acquire_mass_matrix_tensor(self.sim, "franka")
mm = gymtorch.wrap_tensor(_massmatrix)
self._mm = mm[:, :7, :7]
self._cubeA_state = self._root_state[:, self._cubeA_id, :]
self._cubeB_state = self._root_state[:, self._cubeB_id, :]
# Initialize states
self.states.update({
"cubeA_size": torch.ones_like(self._eef_state[:, 0]) * self.cubeA_size,
"cubeB_size": torch.ones_like(self._eef_state[:, 0]) * self.cubeB_size,
})
# Initialize actions
self._pos_control = torch.zeros((self.num_envs, self.num_dofs), dtype=torch.float, device=self.device)
self._effort_control = torch.zeros_like(self._pos_control)
# Initialize control
self._arm_control = self._effort_control[:, :7]
self._gripper_control = self._pos_control[:, 7:9]
# Initialize indices
self._global_indices = torch.arange(self.num_envs * 5, dtype=torch.int32,
device=self.device).view(self.num_envs, -1)
def _update_states(self):
self.states.update({
# Franka
"q": self._q[:, :],
"q_gripper": self._q[:, -2:],
"eef_pos": self._eef_state[:, :3],
"eef_quat": self._eef_state[:, 3:7],
"eef_vel": self._eef_state[:, 7:],
"eef_lf_pos": self._eef_lf_state[:, :3],
"eef_rf_pos": self._eef_rf_state[:, :3],
# Cubes
"cubeA_quat": self._cubeA_state[:, 3:7],
"cubeA_pos": self._cubeA_state[:, :3],
"cubeA_pos_relative": self._cubeA_state[:, :3] - self._eef_state[:, :3],
"cubeB_quat": self._cubeB_state[:, 3:7],
"cubeB_pos": self._cubeB_state[:, :3],
"cubeA_to_cubeB_pos": self._cubeB_state[:, :3] - self._cubeA_state[:, :3],
})
def _refresh(self):
self.gym.refresh_actor_root_state_tensor(self.sim)
self.gym.refresh_dof_state_tensor(self.sim)
self.gym.refresh_rigid_body_state_tensor(self.sim)
self.gym.refresh_jacobian_tensors(self.sim)
self.gym.refresh_mass_matrix_tensors(self.sim)
# Refresh states
self._update_states()
[docs] def compute_reward(self, actions):
self.rew_buf[:], self.reset_buf[:] = compute_franka_reward(
self.reset_buf, self.progress_buf, self.actions, self.states, self.reward_settings, self.max_episode_length
)
[docs] def compute_observations(self):
self._refresh()
obs = ["cubeA_quat", "cubeA_pos", "cubeA_to_cubeB_pos", "eef_pos", "eef_quat"]
obs += ["q_gripper"] if self.control_type == "osc" else ["q"]
self.obs_buf = torch.cat([self.states[ob] for ob in obs], dim=-1)
maxs = {ob: torch.max(self.states[ob]).item() for ob in obs}
return self.obs_buf
[docs] def reset_idx(self, env_ids):
env_ids_int32 = env_ids.to(dtype=torch.int32)
# Reset cubes, sampling cube B first, then A
# if not self._i:
self._reset_init_cube_state(cube='B', env_ids=env_ids, check_valid=False)
self._reset_init_cube_state(cube='A', env_ids=env_ids, check_valid=True)
# self._i = True
# Write these new init states to the sim states
self._cubeA_state[env_ids] = self._init_cubeA_state[env_ids]
self._cubeB_state[env_ids] = self._init_cubeB_state[env_ids]
# Reset agent
reset_noise = torch.rand((len(env_ids), 9), device=self.device)
pos = tensor_clamp(
self.franka_default_dof_pos.unsqueeze(0) +
self.franka_dof_noise * 2.0 * (reset_noise - 0.5),
self.franka_dof_lower_limits.unsqueeze(0), self.franka_dof_upper_limits)
# Overwrite gripper init pos (no noise since these are always position controlled)
pos[:, -2:] = self.franka_default_dof_pos[-2:]
# Reset the internal obs accordingly
self._q[env_ids, :] = pos
self._qd[env_ids, :] = torch.zeros_like(self._qd[env_ids])
# Set any position control to the current position, and any vel / effort control to be 0
# NOTE: Task takes care of actually propagating these controls in sim using the SimActions API
self._pos_control[env_ids, :] = pos
self._effort_control[env_ids, :] = torch.zeros_like(pos)
# Deploy updates
multi_env_ids_int32 = self._global_indices[env_ids, 0].flatten()
self.gym.set_dof_position_target_tensor_indexed(self.sim,
gymtorch.unwrap_tensor(self._pos_control),
gymtorch.unwrap_tensor(multi_env_ids_int32),
len(multi_env_ids_int32))
self.gym.set_dof_actuation_force_tensor_indexed(self.sim,
gymtorch.unwrap_tensor(self._effort_control),
gymtorch.unwrap_tensor(multi_env_ids_int32),
len(multi_env_ids_int32))
self.gym.set_dof_state_tensor_indexed(self.sim,
gymtorch.unwrap_tensor(self._dof_state),
gymtorch.unwrap_tensor(multi_env_ids_int32),
len(multi_env_ids_int32))
# Update cube states
multi_env_ids_cubes_int32 = self._global_indices[env_ids, -2:].flatten()
self.gym.set_actor_root_state_tensor_indexed(
self.sim, gymtorch.unwrap_tensor(self._root_state),
gymtorch.unwrap_tensor(multi_env_ids_cubes_int32), len(multi_env_ids_cubes_int32))
self.progress_buf[env_ids] = 0
self.reset_buf[env_ids] = 0
def _reset_init_cube_state(self, cube, env_ids, check_valid=True):
"""
Simple method to sample @cube's position based on self.startPositionNoise and self.startRotationNoise, and
automaticlly reset the pose internally. Populates the appropriate self._init_cubeX_state
If @check_valid is True, then this will also make sure that the sampled position is not in contact with the
other cube.
Args:
cube(str): Which cube to sample location for. Either 'A' or 'B'
env_ids (tensor or None): Specific environments to reset cube for
check_valid (bool): Whether to make sure sampled position is collision-free with the other cube.
"""
# If env_ids is None, we reset all the envs
if env_ids is None:
env_ids = torch.arange(start=0, end=self.num_envs, device=self.device, dtype=torch.long)
# Initialize buffer to hold sampled values
num_resets = len(env_ids)
sampled_cube_state = torch.zeros(num_resets, 13, device=self.device)
# Get correct references depending on which one was selected
if cube.lower() == 'a':
this_cube_state_all = self._init_cubeA_state
other_cube_state = self._init_cubeB_state[env_ids, :]
cube_heights = self.states["cubeA_size"]
elif cube.lower() == 'b':
this_cube_state_all = self._init_cubeB_state
other_cube_state = self._init_cubeA_state[env_ids, :]
cube_heights = self.states["cubeA_size"]
else:
raise ValueError(f"Invalid cube specified, options are 'A' and 'B'; got: {cube}")
# Minimum cube distance for guarenteed collision-free sampling is the sum of each cube's effective radius
min_dists = (self.states["cubeA_size"] + self.states["cubeB_size"])[env_ids] * np.sqrt(2) / 2.0
# We scale the min dist by 2 so that the cubes aren't too close together
min_dists = min_dists * 2.0
# Sampling is "centered" around middle of table
centered_cube_xy_state = torch.tensor(self._table_surface_pos[:2], device=self.device, dtype=torch.float32)
# Set z value, which is fixed height
sampled_cube_state[:, 2] = self._table_surface_pos[2] + cube_heights.squeeze(-1)[env_ids] / 2
# Initialize rotation, which is no rotation (quat w = 1)
sampled_cube_state[:, 6] = 1.0
# If we're verifying valid sampling, we need to check and re-sample if any are not collision-free
# We use a simple heuristic of checking based on cubes' radius to determine if a collision would occur
if check_valid:
success = False
# Indexes corresponding to envs we're still actively sampling for
active_idx = torch.arange(num_resets, device=self.device)
num_active_idx = len(active_idx)
for i in range(100):
# Sample x y values
sampled_cube_state[active_idx, :2] = centered_cube_xy_state + \
2.0 * self.start_position_noise * (
torch.rand_like(sampled_cube_state[active_idx, :2]) - 0.5)
# Check if sampled values are valid
cube_dist = torch.linalg.norm(sampled_cube_state[:, :2] - other_cube_state[:, :2], dim=-1)
active_idx = torch.nonzero(cube_dist < min_dists, as_tuple=True)[0]
num_active_idx = len(active_idx)
# If active idx is empty, then all sampling is valid :D
if num_active_idx == 0:
success = True
break
# Make sure we succeeded at sampling
assert success, "Sampling cube locations was unsuccessful! ):"
else:
# We just directly sample
sampled_cube_state[:, :2] = centered_cube_xy_state.unsqueeze(0) + \
2.0 * self.start_position_noise * (
torch.rand(num_resets, 2, device=self.device) - 0.5)
# Sample rotation value
if self.start_rotation_noise > 0:
aa_rot = torch.zeros(num_resets, 3, device=self.device)
aa_rot[:, 2] = 2.0 * self.start_rotation_noise * (torch.rand(num_resets, device=self.device) - 0.5)
sampled_cube_state[:, 3:7] = quat_mul(axisangle2quat(aa_rot), sampled_cube_state[:, 3:7])
# Lastly, set these sampled values as the new init state
this_cube_state_all[env_ids, :] = sampled_cube_state
def _compute_osc_torques(self, dpose):
# Solve for Operational Space Control # Paper: khatib.stanford.edu/publications/pdfs/Khatib_1987_RA.pdf
# Helpful resource: studywolf.wordpress.com/2013/09/17/robot-control-4-operation-space-control/
q, qd = self._q[:, :7], self._qd[:, :7]
mm_inv = torch.inverse(self._mm)
m_eef_inv = self._j_eef @ mm_inv @ torch.transpose(self._j_eef, 1, 2)
m_eef = torch.inverse(m_eef_inv)
# Transform our cartesian action `dpose` into joint torques `u`
u = torch.transpose(self._j_eef, 1, 2) @ m_eef @ (
self.kp * dpose - self.kd * self.states["eef_vel"]).unsqueeze(-1)
# Nullspace control torques `u_null` prevents large changes in joint configuration
# They are added into the nullspace of OSC so that the end effector orientation remains constant
# roboticsproceedings.org/rss07/p31.pdf
j_eef_inv = m_eef @ self._j_eef @ mm_inv
u_null = self.kd_null * -qd + self.kp_null * (
(self.franka_default_dof_pos[:7] - q + np.pi) % (2 * np.pi) - np.pi)
u_null[:, 7:] *= 0
u_null = self._mm @ u_null.unsqueeze(-1)
u += (torch.eye(7, device=self.device).unsqueeze(0) - torch.transpose(self._j_eef, 1, 2) @ j_eef_inv) @ u_null
# Clip the values to be within valid effort range
u = tensor_clamp(u.squeeze(-1),
-self._franka_effort_limits[:7].unsqueeze(0), self._franka_effort_limits[:7].unsqueeze(0))
return u
[docs] def pre_physics_step(self, actions):
self.actions = actions.clone().to(self.device)
# Split arm and gripper command
u_arm, u_gripper = self.actions[:, :-1], self.actions[:, -1]
# print(u_arm, u_gripper)
# print(self.cmd_limit, self.action_scale)
# Control arm (scale value first)
u_arm = u_arm * self.cmd_limit / self.action_scale
if self.control_type == "osc":
u_arm = self._compute_osc_torques(dpose=u_arm)
self._arm_control[:, :] = u_arm
# Control gripper
u_fingers = torch.zeros_like(self._gripper_control)
u_fingers[:, 0] = torch.where(u_gripper >= 0.0, self.franka_dof_upper_limits[-2].item(),
self.franka_dof_lower_limits[-2].item())
u_fingers[:, 1] = torch.where(u_gripper >= 0.0, self.franka_dof_upper_limits[-1].item(),
self.franka_dof_lower_limits[-1].item())
# Write gripper command to appropriate tensor buffer
self._gripper_control[:, :] = u_fingers
# Deploy actions
self.gym.set_dof_position_target_tensor(self.sim, gymtorch.unwrap_tensor(self._pos_control))
self.gym.set_dof_actuation_force_tensor(self.sim, gymtorch.unwrap_tensor(self._effort_control))
[docs] def post_physics_step(self):
self.progress_buf += 1
env_ids = self.reset_buf.nonzero(as_tuple=False).squeeze(-1)
if len(env_ids) > 0:
self.reset_idx(env_ids)
self.compute_observations()
self.compute_reward(self.actions)
# debug viz
if self.viewer and self.debug_viz:
self.gym.clear_lines(self.viewer)
self.gym.refresh_rigid_body_state_tensor(self.sim)
# Grab relevant states to visualize
eef_pos = self.states["eef_pos"]
eef_rot = self.states["eef_quat"]
cubeA_pos = self.states["cubeA_pos"]
cubeA_rot = self.states["cubeA_quat"]
cubeB_pos = self.states["cubeB_pos"]
cubeB_rot = self.states["cubeB_quat"]
# Plot visualizations
for i in range(self.num_envs):
for pos, rot in zip((eef_pos, cubeA_pos, cubeB_pos), (eef_rot, cubeA_rot, cubeB_rot)):
px = (pos[i] + quat_apply(rot[i], to_torch([1, 0, 0], device=self.device) * 0.2)).cpu().numpy()
py = (pos[i] + quat_apply(rot[i], to_torch([0, 1, 0], device=self.device) * 0.2)).cpu().numpy()
pz = (pos[i] + quat_apply(rot[i], to_torch([0, 0, 1], device=self.device) * 0.2)).cpu().numpy()
p0 = pos[i].cpu().numpy()
self.gym.add_lines(self.viewer, self.envs[i], 1, [p0[0], p0[1], p0[2], px[0], px[1], px[2]],
[0.85, 0.1, 0.1])
self.gym.add_lines(self.viewer, self.envs[i], 1, [p0[0], p0[1], p0[2], py[0], py[1], py[2]],
[0.1, 0.85, 0.1])
self.gym.add_lines(self.viewer, self.envs[i], 1, [p0[0], p0[1], p0[2], pz[0], pz[1], pz[2]],
[0.1, 0.1, 0.85])
#####################################################################
###=========================jit functions=========================###
#####################################################################
@torch.jit.script
def compute_franka_reward(
reset_buf, progress_buf, actions, states, reward_settings, max_episode_length
):
# type: (Tensor, Tensor, Tensor, Dict[str, Tensor], Dict[str, float], float) -> Tuple[Tensor, Tensor]
# Compute per-env physical parameters
target_height = states["cubeB_size"] + states["cubeA_size"] / 2.0
cubeA_size = states["cubeA_size"]
cubeB_size = states["cubeB_size"]
# distance from hand to the cubeA
d = torch.norm(states["cubeA_pos_relative"], dim=-1)
d_lf = torch.norm(states["cubeA_pos"] - states["eef_lf_pos"], dim=-1)
d_rf = torch.norm(states["cubeA_pos"] - states["eef_rf_pos"], dim=-1)
dist_reward = 1 - torch.tanh(10.0 * (d + d_lf + d_rf) / 3)
# reward for lifting cubeA
cubeA_height = states["cubeA_pos"][:, 2] - reward_settings["table_height"]
cubeA_lifted = (cubeA_height - cubeA_size) > 0.04
lift_reward = cubeA_lifted
# how closely aligned cubeA is to cubeB (only provided if cubeA is lifted)
offset = torch.zeros_like(states["cubeA_to_cubeB_pos"])
offset[:, 2] = (cubeA_size + cubeB_size) / 2
d_ab = torch.norm(states["cubeA_to_cubeB_pos"] + offset, dim=-1)
align_reward = (1 - torch.tanh(10.0 * d_ab)) * cubeA_lifted
# Dist reward is maximum of dist and align reward
dist_reward = torch.max(dist_reward, align_reward)
# final reward for stacking successfully (only if cubeA is close to target height and corresponding location, and gripper is not grasping)
cubeA_align_cubeB = (torch.norm(states["cubeA_to_cubeB_pos"][:, :2], dim=-1) < 0.02)
cubeA_on_cubeB = torch.abs(cubeA_height - target_height) < 0.02
gripper_away_from_cubeA = (d > 0.04)
stack_reward = cubeA_align_cubeB & cubeA_on_cubeB & gripper_away_from_cubeA
# Compose rewards
# We either provide the stack reward or the align + dist reward
rewards = torch.where(
stack_reward,
reward_settings["r_stack_scale"] * stack_reward,
reward_settings["r_dist_scale"] * dist_reward + reward_settings["r_lift_scale"] * lift_reward + reward_settings[
"r_align_scale"] * align_reward,
)
# Compute resets
reset_buf = torch.where((progress_buf >= max_episode_length - 1) | (stack_reward > 0), torch.ones_like(reset_buf),
reset_buf)
return rewards, reset_buf