# Copyright (c) 2018-2021, NVIDIA Corporation
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# 1. Redistributions of source code must retain the above copyright notice, this
# list of conditions and the following disclaimer.
#
# 2. Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
#
# 3. Neither the name of the copyright holder nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
import os
import random
import matplotlib.pyplot as plt
from PIL import Image as Im
from isaacgym import gymapi
from isaacgym import gymtorch
from rofunc.learning.RofuncRL.tasks.isaacgymenv.base.vec_task import VecTask
from rofunc.learning.RofuncRL.tasks.utils.torch_jit_utils import *
from rofunc.utils.oslab import get_rofunc_path
[docs]class ShadowHandPointCloudTask(VecTask):
def __init__(self, cfg, rl_device, sim_device, graphics_device_id, headless, virtual_screen_capture,
force_render, agent_index=[[[0, 1, 2, 3, 4, 5]], [[0, 1, 2, 3, 4, 5]]], is_multi_agent=False):
self.cfg = cfg
self.agent_index = agent_index
self.is_multi_agent = is_multi_agent
self.randomize = self.cfg["task"]["randomize"]
self.randomization_params = self.cfg["task"]["randomization_params"]
self.aggregate_mode = self.cfg["env"]["aggregateMode"]
self.dist_reward_scale = self.cfg["env"]["distRewardScale"]
self.rot_reward_scale = self.cfg["env"]["rotRewardScale"]
self.action_penalty_scale = self.cfg["env"]["actionPenaltyScale"]
self.success_tolerance = self.cfg["env"]["successTolerance"]
self.reach_goal_bonus = self.cfg["env"]["reachGoalBonus"]
self.fall_dist = self.cfg["env"]["fallDistance"]
self.fall_penalty = self.cfg["env"]["fallPenalty"]
self.rot_eps = self.cfg["env"]["rotEps"]
self.vel_obs_scale = 0.2 # scale factor of velocity based observations
self.force_torque_obs_scale = 10.0 # scale factor of velocity based observations
self.reset_position_noise = self.cfg["env"]["resetPositionNoise"]
self.reset_rotation_noise = self.cfg["env"]["resetRotationNoise"]
self.reset_dof_pos_noise = self.cfg["env"]["resetDofPosRandomInterval"]
self.reset_dof_vel_noise = self.cfg["env"]["resetDofVelRandomInterval"]
self.shadow_hand_dof_speed_scale = self.cfg["env"]["dofSpeedScale"]
self.use_relative_control = self.cfg["env"]["useRelativeControl"]
self.act_moving_average = self.cfg["env"]["actionsMovingAverage"]
self.debug_viz = self.cfg["env"]["enableDebugVis"]
self.max_episode_length = self.cfg["env"]["episodeLength"]
self.reset_time = self.cfg["env"].get("resetTime", -1.0)
self.print_success_stat = self.cfg["env"]["printNumSuccesses"]
self.max_consecutive_successes = self.cfg["env"]["maxConsecutiveSuccesses"]
self.av_factor = self.cfg["env"].get("averFactor", 0.01)
print("Averaging factor: ", self.av_factor)
control_freq_inv = self.cfg["env"].get("controlFrequencyInv", 1)
if self.reset_time > 0.0:
self.max_episode_length = int(round(self.reset_time / (control_freq_inv * self.sim_params.dt)))
print("Reset time: ", self.reset_time)
print("New episode length: ", self.max_episode_length)
self.object_type = self.cfg["env"]["objectType"]
assert self.object_type in ["block", "egg", "pen", "ycb/banana", "ycb/can", "ycb/mug", "ycb/brick"]
self.ignore_z = (self.object_type == "pen")
self.asset_files_dict = {
"block": "urdf/objects/cube_multicolor.urdf",
"egg": "mjcf/open_ai_assets/hand/egg.xml",
"pen": "mjcf/open_ai_assets/hand/pen.xml",
"ycb/banana": "urdf/ycb/011_banana/011_banana.urdf",
"ycb/can": "urdf/ycb/010_potted_meat_can/010_potted_meat_can.urdf",
"ycb/mug": "urdf/ycb/025_mug/025_mug.urdf",
"ycb/brick": "urdf/ycb/061_foam_brick/061_foam_brick.urdf"
}
# can be "openai", "full_no_vel", "full", "full_state"
self.obs_type = self.cfg["env"]["observationType"]
if not (self.obs_type in ["openai", "full_no_vel", "full", "full_state"]):
raise Exception(
"Unknown type of observations!\nobservationType should be one of: [openai, full_no_vel, full, full_state]")
print("Obs type:", self.obs_type)
self.num_obs_dict = {
"openai": 42,
"full_no_vel": 77,
"full": 157,
"full_state": 398 + 768 * 3
}
self.num_hand_obs = 72 + 95 + 20
self.up_axis = 'z'
self.fingertips = ["robot0:ffdistal", "robot0:mfdistal", "robot0:rfdistal", "robot0:lfdistal",
"robot0:thdistal"]
self.a_fingertips = ["robot1:ffdistal", "robot1:mfdistal", "robot1:rfdistal", "robot1:lfdistal",
"robot1:thdistal"]
self.hand_center = ["robot1:palm"]
self.num_fingertips = len(self.fingertips) * 2
self.use_vel_obs = False
self.fingertip_obs = True
self.asymmetric_obs = self.cfg["env"]["asymmetric_observations"]
num_states = 0
if self.asymmetric_obs:
num_states = 211
self.cfg["env"]["numObservations"] = self.num_obs_dict[self.obs_type]
self.cfg["env"]["numStates"] = num_states
if self.is_multi_agent:
self.num_agents = 2
self.cfg["env"]["numActions"] = 20
else:
self.num_agents = 1
self.cfg["env"]["numActions"] = 40
super().__init__(cfg, rl_device, sim_device, graphics_device_id, headless, virtual_screen_capture, force_render)
if self.viewer != None:
cam_pos = gymapi.Vec3(10.0, 5.0, 1.0)
cam_target = gymapi.Vec3(6.0, 5.0, 0.0)
self.gym.viewer_camera_look_at(self.viewer, None, cam_pos, cam_target)
# get gym GPU state tensors
actor_root_state_tensor = self.gym.acquire_actor_root_state_tensor(self.sim)
dof_state_tensor = self.gym.acquire_dof_state_tensor(self.sim)
rigid_body_tensor = self.gym.acquire_rigid_body_state_tensor(self.sim)
if self.obs_type == "full_state" or self.asymmetric_obs:
sensor_tensor = self.gym.acquire_force_sensor_tensor(self.sim)
self.vec_sensor_tensor = gymtorch.wrap_tensor(sensor_tensor).view(self.num_envs, self.num_fingertips * 6)
dof_force_tensor = self.gym.acquire_dof_force_tensor(self.sim)
self.dof_force_tensor = gymtorch.wrap_tensor(dof_force_tensor).view(self.num_envs,
self.num_shadow_hand_dofs * 2)
self.gym.refresh_actor_root_state_tensor(self.sim)
self.gym.refresh_dof_state_tensor(self.sim)
self.gym.refresh_rigid_body_state_tensor(self.sim)
# create some wrapper tensors for different slices
self.shadow_hand_default_dof_pos = torch.zeros(self.num_shadow_hand_dofs, dtype=torch.float, device=self.device)
self.dof_state = gymtorch.wrap_tensor(dof_state_tensor)
self.shadow_hand_dof_state = self.dof_state.view(self.num_envs, -1, 2)[:, :self.num_shadow_hand_dofs]
self.shadow_hand_dof_pos = self.shadow_hand_dof_state[..., 0]
self.shadow_hand_dof_vel = self.shadow_hand_dof_state[..., 1]
self.shadow_hand_another_dof_state = self.dof_state.view(self.num_envs, -1, 2)[:,
self.num_shadow_hand_dofs:self.num_shadow_hand_dofs * 2]
self.shadow_hand_another_dof_pos = self.shadow_hand_another_dof_state[..., 0]
self.shadow_hand_another_dof_vel = self.shadow_hand_another_dof_state[..., 1]
self.rigid_body_states = gymtorch.wrap_tensor(rigid_body_tensor).view(self.num_envs, -1, 13)
self.num_bodies = self.rigid_body_states.shape[1]
self.root_state_tensor = gymtorch.wrap_tensor(actor_root_state_tensor).view(-1, 13)
self.num_dofs = self.gym.get_sim_dof_count(self.sim) // self.num_envs
self.prev_targets = torch.zeros((self.num_envs, self.num_dofs), dtype=torch.float, device=self.device)
self.cur_targets = torch.zeros((self.num_envs, self.num_dofs), dtype=torch.float, device=self.device)
self.global_indices = torch.arange(self.num_envs * 3, dtype=torch.int32, device=self.device).view(self.num_envs,
-1)
self.x_unit_tensor = to_torch([1, 0, 0], dtype=torch.float, device=self.device).repeat((self.num_envs, 1))
self.y_unit_tensor = to_torch([0, 1, 0], dtype=torch.float, device=self.device).repeat((self.num_envs, 1))
self.z_unit_tensor = to_torch([0, 0, 1], dtype=torch.float, device=self.device).repeat((self.num_envs, 1))
self.reset_goal_buf = self.reset_buf.clone()
self.successes = torch.zeros(self.num_envs, dtype=torch.float, device=self.device)
self.consecutive_successes = torch.zeros(1, dtype=torch.float, device=self.device)
self.av_factor = to_torch(self.av_factor, dtype=torch.float, device=self.device)
self.total_successes = 0
self.total_resets = 0
[docs] def create_sim(self):
self.dt = self.sim_params.dt
self.up_axis_idx = self.set_sim_params_up_axis(self.sim_params, self.up_axis)
self.sim = super().create_sim(self.device_id, self.graphics_device_id, self.physics_engine, self.sim_params)
self._create_ground_plane()
self._create_envs(self.num_envs, self.cfg["env"]['envSpacing'], int(np.sqrt(self.num_envs)))
def _create_ground_plane(self):
plane_params = gymapi.PlaneParams()
plane_params.normal = gymapi.Vec3(0.0, 0.0, 1.0)
self.gym.add_ground(self.sim, plane_params)
def _create_envs(self, num_envs, spacing, num_per_row):
lower = gymapi.Vec3(-spacing, -spacing, 0.0)
upper = gymapi.Vec3(spacing, spacing, spacing)
# get rofunc path from rofunc package metadata
rofunc_path = get_rofunc_path()
asset_root = os.path.join(rofunc_path, "simulator/assets")
shadow_hand_asset_file = "mjcf/open_ai_assets/hand/shadow_hand.xml"
shadow_hand_another_asset_file = "mjcf/open_ai_assets/hand/shadow_hand1.xml"
if "asset" in self.cfg["env"]:
asset_root = self.cfg["env"]["asset"].get("assetRoot", asset_root)
shadow_hand_asset_file = self.cfg["env"]["asset"].get("assetFileName", shadow_hand_asset_file)
object_asset_file = self.asset_files_dict[self.object_type]
# load shadow hand_ asset
asset_options = gymapi.AssetOptions()
asset_options.flip_visual_attachments = False
asset_options.fix_base_link = True
asset_options.collapse_fixed_joints = True
asset_options.disable_gravity = True
asset_options.thickness = 0.001
asset_options.angular_damping = 0.01
if self.physics_engine == gymapi.SIM_PHYSX:
asset_options.use_physx_armature = True
asset_options.default_dof_drive_mode = gymapi.DOF_MODE_NONE
shadow_hand_asset = self.gym.load_asset(self.sim, asset_root, shadow_hand_asset_file, asset_options)
shadow_hand_another_asset = self.gym.load_asset(self.sim, asset_root, shadow_hand_another_asset_file,
asset_options)
self.num_shadow_hand_bodies = self.gym.get_asset_rigid_body_count(shadow_hand_asset)
self.num_shadow_hand_shapes = self.gym.get_asset_rigid_shape_count(shadow_hand_asset)
self.num_shadow_hand_dofs = self.gym.get_asset_dof_count(shadow_hand_asset)
self.num_shadow_hand_actuators = self.gym.get_asset_actuator_count(shadow_hand_asset)
self.num_shadow_hand_tendons = self.gym.get_asset_tendon_count(shadow_hand_asset)
print("self.num_shadow_hand_bodies: ", self.num_shadow_hand_bodies)
print("self.num_shadow_hand_shapes: ", self.num_shadow_hand_shapes)
print("self.num_shadow_hand_dofs: ", self.num_shadow_hand_dofs)
print("self.num_shadow_hand_actuators: ", self.num_shadow_hand_actuators)
print("self.num_shadow_hand_tendons: ", self.num_shadow_hand_tendons)
# tendon set up
limit_stiffness = 30
t_damping = 0.1
relevant_tendons = ["robot0:T_FFJ1c", "robot0:T_MFJ1c", "robot0:T_RFJ1c", "robot0:T_LFJ1c"]
a_relevant_tendons = ["robot1:T_FFJ1c", "robot1:T_MFJ1c", "robot1:T_RFJ1c", "robot1:T_LFJ1c"]
tendon_props = self.gym.get_asset_tendon_properties(shadow_hand_asset)
a_tendon_props = self.gym.get_asset_tendon_properties(shadow_hand_another_asset)
for i in range(self.num_shadow_hand_tendons):
for rt in relevant_tendons:
if self.gym.get_asset_tendon_name(shadow_hand_asset, i) == rt:
tendon_props[i].limit_stiffness = limit_stiffness
tendon_props[i].damping = t_damping
for rt in a_relevant_tendons:
if self.gym.get_asset_tendon_name(shadow_hand_another_asset, i) == rt:
a_tendon_props[i].limit_stiffness = limit_stiffness
a_tendon_props[i].damping = t_damping
self.gym.set_asset_tendon_properties(shadow_hand_asset, tendon_props)
self.gym.set_asset_tendon_properties(shadow_hand_another_asset, a_tendon_props)
actuated_dof_names = [self.gym.get_asset_actuator_joint_name(shadow_hand_asset, i) for i in
range(self.num_shadow_hand_actuators)]
self.actuated_dof_indices = [self.gym.find_asset_dof_index(shadow_hand_asset, name) for name in
actuated_dof_names]
# set shadow_hand dof properties
shadow_hand_dof_props = self.gym.get_asset_dof_properties(shadow_hand_asset)
shadow_hand_another_dof_props = self.gym.get_asset_dof_properties(shadow_hand_another_asset)
self.shadow_hand_dof_lower_limits = []
self.shadow_hand_dof_upper_limits = []
self.shadow_hand_dof_default_pos = []
self.shadow_hand_dof_default_vel = []
self.sensors = []
sensor_pose = gymapi.Transform()
for i in range(self.num_shadow_hand_dofs):
self.shadow_hand_dof_lower_limits.append(shadow_hand_dof_props['lower'][i])
self.shadow_hand_dof_upper_limits.append(shadow_hand_dof_props['upper'][i])
self.shadow_hand_dof_default_pos.append(0.0)
self.shadow_hand_dof_default_vel.append(0.0)
self.actuated_dof_indices = to_torch(self.actuated_dof_indices, dtype=torch.long, device=self.device)
self.shadow_hand_dof_lower_limits = to_torch(self.shadow_hand_dof_lower_limits, device=self.device)
self.shadow_hand_dof_upper_limits = to_torch(self.shadow_hand_dof_upper_limits, device=self.device)
self.shadow_hand_dof_default_pos = to_torch(self.shadow_hand_dof_default_pos, device=self.device)
self.shadow_hand_dof_default_vel = to_torch(self.shadow_hand_dof_default_vel, device=self.device)
# load manipulated object and goal assets
object_asset_options = gymapi.AssetOptions()
object_asset_options.density = 500
object_asset = self.gym.load_asset(self.sim, asset_root, object_asset_file, object_asset_options)
object_asset_options.disable_gravity = True
goal_asset = self.gym.load_asset(self.sim, asset_root, object_asset_file, object_asset_options)
shadow_hand_start_pose = gymapi.Transform()
shadow_hand_start_pose.p = gymapi.Vec3(*get_axis_params(0.5, self.up_axis_idx))
shadow_another_hand_start_pose = gymapi.Transform()
shadow_another_hand_start_pose.p = gymapi.Vec3(0, -1, 0.5)
shadow_another_hand_start_pose.r = gymapi.Quat().from_euler_zyx(0, 0, 3.1415)
object_start_pose = gymapi.Transform()
object_start_pose.p = gymapi.Vec3()
object_start_pose.p.x = shadow_hand_start_pose.p.x
pose_dy, pose_dz = -0.39, 0.04
object_start_pose.p.y = shadow_hand_start_pose.p.y + pose_dy
object_start_pose.p.z = shadow_hand_start_pose.p.z + pose_dz
if self.object_type == "pen":
object_start_pose.p.z = shadow_hand_start_pose.p.z + 0.02
self.goal_displacement = gymapi.Vec3(-0., 0.0, 0.)
self.goal_displacement_tensor = to_torch(
[self.goal_displacement.x, self.goal_displacement.y, self.goal_displacement.z], device=self.device)
goal_start_pose = gymapi.Transform()
goal_start_pose.p = object_start_pose.p + self.goal_displacement
goal_start_pose.p.z -= 0.0
# compute aggregate size
max_agg_bodies = self.num_shadow_hand_bodies * 2 + 2
max_agg_shapes = self.num_shadow_hand_shapes * 2 + 2
self.shadow_hands = []
self.envs = []
self.cameras = []
self.camera_tensors = []
self.camera_view_matrixs = []
self.camera_proj_matrixs = []
self.object_init_state = []
self.hand_start_states = []
self.hand_indices = []
self.another_hand_indices = []
self.fingertip_indices = []
self.object_indices = []
self.goal_object_indices = []
self.fingertip_handles = [self.gym.find_asset_rigid_body_index(shadow_hand_asset, name) for name in
self.fingertips]
self.fingertip_another_handles = [self.gym.find_asset_rigid_body_index(shadow_hand_another_asset, name) for name
in self.a_fingertips]
# create fingertip force sensors, if needed
if self.obs_type == "full_state" or self.asymmetric_obs:
sensor_pose = gymapi.Transform()
for ft_handle in self.fingertip_handles:
self.gym.create_asset_force_sensor(shadow_hand_asset, ft_handle, sensor_pose)
for ft_a_handle in self.fingertip_another_handles:
self.gym.create_asset_force_sensor(shadow_hand_another_asset, ft_a_handle, sensor_pose)
self.camera_props = gymapi.CameraProperties()
self.camera_props.width = 256
self.camera_props.height = 256
self.camera_props.enable_tensors = True
self.env_origin = torch.zeros((self.num_envs, 3), device=self.device, dtype=torch.float)
self.pointCloudDownsampleNum = 768
self.camera_u = torch.arange(0, self.camera_props.width, device=self.device)
self.camera_v = torch.arange(0, self.camera_props.height, device=self.device)
self.camera_v2, self.camera_u2 = torch.meshgrid(self.camera_v, self.camera_u, indexing='ij')
if True:
import open3d as o3d
from bidexhands.utils.o3dviewer import PointcloudVisualizer
self.pointCloudVisualizer = PointcloudVisualizer()
self.pointCloudVisualizerInitialized = False
self.o3d_pc = o3d.geometry.PointCloud()
else:
self.pointCloudVisualizer = None
for i in range(self.num_envs):
# create env instance
env_ptr = self.gym.create_env(
self.sim, lower, upper, num_per_row
)
if self.aggregate_mode >= 1:
self.gym.begin_aggregate(env_ptr, max_agg_bodies, max_agg_shapes, True)
# add hand - collision filter = -1 to use asset collision filters set in mjcf loader
shadow_hand_actor = self.gym.create_actor(env_ptr, shadow_hand_asset, shadow_hand_start_pose, "hand", i, -1,
0)
shadow_hand_another_actor = self.gym.create_actor(env_ptr, shadow_hand_another_asset,
shadow_another_hand_start_pose, "another_hand", i, -1, 0)
self.hand_start_states.append(
[shadow_hand_start_pose.p.x, shadow_hand_start_pose.p.y, shadow_hand_start_pose.p.z,
shadow_hand_start_pose.r.x, shadow_hand_start_pose.r.y, shadow_hand_start_pose.r.z,
shadow_hand_start_pose.r.w,
0, 0, 0, 0, 0, 0])
self.gym.set_actor_dof_properties(env_ptr, shadow_hand_actor, shadow_hand_dof_props)
hand_idx = self.gym.get_actor_index(env_ptr, shadow_hand_actor, gymapi.DOMAIN_SIM)
self.hand_indices.append(hand_idx)
self.gym.set_actor_dof_properties(env_ptr, shadow_hand_another_actor, shadow_hand_another_dof_props)
another_hand_idx = self.gym.get_actor_index(env_ptr, shadow_hand_another_actor, gymapi.DOMAIN_SIM)
self.another_hand_indices.append(another_hand_idx)
# randomize colors and textures for rigid body
num_bodies = self.gym.get_actor_rigid_body_count(env_ptr, shadow_hand_actor)
hand_rigid_body_index = [[0, 1, 2, 3], [4, 5, 6, 7], [8, 9, 10, 11], [12, 13, 14, 15], [16, 17, 18, 19, 20],
[21, 22, 23, 24, 25]]
for n in self.agent_index[0]:
colorx = random.uniform(0, 1)
colory = random.uniform(0, 1)
colorz = random.uniform(0, 1)
for m in n:
for o in hand_rigid_body_index[m]:
self.gym.set_rigid_body_color(env_ptr, shadow_hand_actor, o, gymapi.MESH_VISUAL,
gymapi.Vec3(colorx, colory, colorz))
for n in self.agent_index[1]:
colorx = random.uniform(0, 1)
colory = random.uniform(0, 1)
colorz = random.uniform(0, 1)
for m in n:
for o in hand_rigid_body_index[m]:
self.gym.set_rigid_body_color(env_ptr, shadow_hand_another_actor, o, gymapi.MESH_VISUAL,
gymapi.Vec3(colorx, colory, colorz))
# gym.set_rigid_body_texture(env, actor_handles[-1], n, gymapi.MESH_VISUAL,
# loaded_texture_handle_list[random.randint(0, len(loaded_texture_handle_list)-1)])
# create fingertip force-torque sensors
if self.obs_type == "full_state" or self.asymmetric_obs:
self.gym.enable_actor_dof_force_sensors(env_ptr, shadow_hand_actor)
self.gym.enable_actor_dof_force_sensors(env_ptr, shadow_hand_another_actor)
# add object
object_handle = self.gym.create_actor(env_ptr, object_asset, object_start_pose, "object", i, 0, 0)
self.object_init_state.append([object_start_pose.p.x, object_start_pose.p.y, object_start_pose.p.z,
object_start_pose.r.x, object_start_pose.r.y, object_start_pose.r.z,
object_start_pose.r.w,
0, 0, 0, 0, 0, 0])
object_idx = self.gym.get_actor_index(env_ptr, object_handle, gymapi.DOMAIN_SIM)
self.object_indices.append(object_idx)
# add goal object
goal_handle = self.gym.create_actor(env_ptr, goal_asset, goal_start_pose, "goal_object", i + self.num_envs,
0, 0)
goal_object_idx = self.gym.get_actor_index(env_ptr, goal_handle, gymapi.DOMAIN_SIM)
self.goal_object_indices.append(goal_object_idx)
camera_handle = self.gym.create_camera_sensor(env_ptr, self.camera_props)
self.gym.set_camera_location(camera_handle, env_ptr, gymapi.Vec3(0.25, -0.5, 0.75),
gymapi.Vec3(-0.24, -0.5, 0))
camera_tensor = self.gym.get_camera_image_gpu_tensor(self.sim, env_ptr, camera_handle, gymapi.IMAGE_DEPTH)
torch_cam_tensor = gymtorch.wrap_tensor(camera_tensor)
cam_vinv = torch.inverse(
(torch.tensor(self.gym.get_camera_view_matrix(self.sim, env_ptr, camera_handle)))).to(self.device)
cam_proj = torch.tensor(self.gym.get_camera_proj_matrix(self.sim, env_ptr, camera_handle),
device=self.device)
if self.object_type != "block":
self.gym.set_rigid_body_color(
env_ptr, object_handle, 0, gymapi.MESH_VISUAL, gymapi.Vec3(0.6, 0.72, 0.98))
self.gym.set_rigid_body_color(
env_ptr, goal_handle, 0, gymapi.MESH_VISUAL, gymapi.Vec3(0.6, 0.72, 0.98))
if self.aggregate_mode > 0:
self.gym.end_aggregate(env_ptr)
origin = self.gym.get_env_origin(env_ptr)
self.env_origin[i][0] = origin.x
self.env_origin[i][1] = origin.y
self.env_origin[i][2] = origin.z
self.camera_tensors.append(torch_cam_tensor)
self.camera_view_matrixs.append(cam_vinv)
self.camera_proj_matrixs.append(cam_proj)
self.envs.append(env_ptr)
self.shadow_hands.append(shadow_hand_actor)
self.cameras.append(camera_handle)
self.object_init_state = to_torch(self.object_init_state, device=self.device, dtype=torch.float).view(
self.num_envs, 13)
self.goal_states = self.object_init_state.clone()
self.goal_pose = self.goal_states[:, 0:7]
self.goal_pos = self.goal_states[:, 0:3]
self.goal_rot = self.goal_states[:, 3:7]
# self.goal_states[:, self.up_axis_idx] -= 0.04
self.goal_init_state = self.goal_states.clone()
self.hand_start_states = to_torch(self.hand_start_states, device=self.device).view(self.num_envs, 13)
self.fingertip_handles = to_torch(self.fingertip_handles, dtype=torch.long, device=self.device)
self.fingertip_another_handles = to_torch(self.fingertip_another_handles, dtype=torch.long, device=self.device)
self.hand_indices = to_torch(self.hand_indices, dtype=torch.long, device=self.device)
self.another_hand_indices = to_torch(self.another_hand_indices, dtype=torch.long, device=self.device)
self.object_indices = to_torch(self.object_indices, dtype=torch.long, device=self.device)
self.goal_object_indices = to_torch(self.goal_object_indices, dtype=torch.long, device=self.device)
[docs] def compute_reward(self, actions):
self.rew_buf[:], self.reset_buf[:], self.reset_goal_buf[:], self.progress_buf[:], self.successes[
:], self.consecutive_successes[
:] = compute_hand_reward(
self.rew_buf, self.reset_buf, self.reset_goal_buf, self.progress_buf, self.successes,
self.consecutive_successes,
self.max_episode_length, self.object_pos, self.object_rot, self.goal_pos, self.goal_rot,
self.dist_reward_scale, self.rot_reward_scale, self.rot_eps, self.actions, self.action_penalty_scale,
self.success_tolerance, self.reach_goal_bonus, self.fall_dist, self.fall_penalty,
self.max_consecutive_successes, self.av_factor, (self.object_type == "pen")
)
self.extras['successes'] = self.successes
self.extras['consecutive_successes'] = self.consecutive_successes
if self.print_success_stat:
self.total_resets = self.total_resets + self.reset_buf.sum()
direct_average_successes = self.total_successes + self.successes.sum()
self.total_successes = self.total_successes + (self.successes * self.reset_buf).sum()
# The direct average shows the overall result more quickly, but slightly undershoots long term
# policy performance.
print("Direct average consecutive successes = {:.1f}".format(
direct_average_successes / (self.total_resets + self.num_envs)))
if self.total_resets > 0:
print("Post-Reset average consecutive successes = {:.1f}".format(
self.total_successes / self.total_resets))
[docs] def compute_observations(self):
self.gym.refresh_dof_state_tensor(self.sim)
self.gym.refresh_actor_root_state_tensor(self.sim)
self.gym.refresh_rigid_body_state_tensor(self.sim)
self.gym.render_all_camera_sensors(self.sim)
self.gym.start_access_image_tensors(self.sim)
if self.obs_type == "full_state" or self.asymmetric_obs:
self.gym.refresh_force_sensor_tensor(self.sim)
self.gym.refresh_dof_force_tensor(self.sim)
self.object_pose = self.root_state_tensor[self.object_indices, 0:7]
self.object_pos = self.root_state_tensor[self.object_indices, 0:3]
self.object_rot = self.root_state_tensor[self.object_indices, 3:7]
self.object_linvel = self.root_state_tensor[self.object_indices, 7:10]
self.object_angvel = self.root_state_tensor[self.object_indices, 10:13]
self.goal_pose = self.goal_states[:, 0:7]
self.goal_pos = self.goal_states[:, 0:3]
self.goal_rot = self.goal_states[:, 3:7]
self.fingertip_state = self.rigid_body_states[:, self.fingertip_handles][:, :, 0:13]
self.fingertip_pos = self.rigid_body_states[:, self.fingertip_handles][:, :, 0:3]
self.fingertip_another_state = self.rigid_body_states[:, self.fingertip_another_handles][:, :, 0:13]
self.fingertip_another_pos = self.rigid_body_states[:, self.fingertip_another_handles][:, :, 0:3]
self.compute_full_state()
if self.asymmetric_obs:
self.compute_full_state(True)
[docs] def compute_full_state(self, asymm_obs=False):
num_ft_states = 13 * int(self.num_fingertips / 2) # 65
num_ft_force_torques = 6 * int(self.num_fingertips / 2) # 30
self.obs_buf[:, 0:self.num_shadow_hand_dofs] = unscale(self.shadow_hand_dof_pos,
self.shadow_hand_dof_lower_limits,
self.shadow_hand_dof_upper_limits)
self.obs_buf[:,
self.num_shadow_hand_dofs:2 * self.num_shadow_hand_dofs] = self.vel_obs_scale * self.shadow_hand_dof_vel
self.obs_buf[:,
2 * self.num_shadow_hand_dofs:3 * self.num_shadow_hand_dofs] = self.force_torque_obs_scale * self.dof_force_tensor[
:, :24]
fingertip_obs_start = 72 # 168 = 157 + 11
self.obs_buf[:, fingertip_obs_start:fingertip_obs_start + num_ft_states] = self.fingertip_state.reshape(
self.num_envs, num_ft_states)
self.obs_buf[:, fingertip_obs_start + num_ft_states:fingertip_obs_start + num_ft_states +
num_ft_force_torques] = self.force_torque_obs_scale * self.vec_sensor_tensor[
:,
:30]
action_obs_start = fingertip_obs_start + 95
self.obs_buf[:, action_obs_start:action_obs_start + 20] = self.actions[:, :20]
# another_hand
another_hand_start = action_obs_start + 20
self.obs_buf[:, another_hand_start:self.num_shadow_hand_dofs + another_hand_start] = unscale(
self.shadow_hand_another_dof_pos,
self.shadow_hand_dof_lower_limits, self.shadow_hand_dof_upper_limits)
self.obs_buf[:,
self.num_shadow_hand_dofs + another_hand_start:2 * self.num_shadow_hand_dofs + another_hand_start] = self.vel_obs_scale * self.shadow_hand_another_dof_vel
self.obs_buf[:,
2 * self.num_shadow_hand_dofs + another_hand_start:3 * self.num_shadow_hand_dofs + another_hand_start] = self.force_torque_obs_scale * self.dof_force_tensor[
:,
24:48]
fingertip_another_obs_start = another_hand_start + 72
self.obs_buf[:,
fingertip_another_obs_start:fingertip_another_obs_start + num_ft_states] = self.fingertip_another_state.reshape(
self.num_envs, num_ft_states)
self.obs_buf[:, fingertip_another_obs_start + num_ft_states:fingertip_another_obs_start + num_ft_states +
num_ft_force_torques] = self.force_torque_obs_scale * self.vec_sensor_tensor[
:,
30:]
action_another_obs_start = fingertip_another_obs_start + 95
self.obs_buf[:, action_another_obs_start:action_another_obs_start + 20] = self.actions[:, 20:]
obj_obs_start = action_another_obs_start + 20 # 144
# self.obs_buf[:, obj_obs_start:obj_obs_start + 7] = self.object_pose
# self.obs_buf[:, obj_obs_start + 7:obj_obs_start + 10] = self.object_linvel
# self.obs_buf[:, obj_obs_start + 10:obj_obs_start + 13] = self.vel_obs_scale * self.object_angvel
goal_obs_start = obj_obs_start + 13 # 157 = 144 + 13
# self.obs_buf[:, goal_obs_start:goal_obs_start + 7] = self.goal_pose
# self.obs_buf[:, goal_obs_start + 7:goal_obs_start + 11] = quat_mul(self.object_rot, quat_conjugate(self.goal_rot))
point_clouds = torch.zeros((self.num_envs, self.pointCloudDownsampleNum, 3), device=self.device)
self.camera_rgba_debug_fig = plt.figure("CAMERA_RGBD_DEBUG")
camera_rgba_image = self.camera_visulization(is_depth_image=False)
print(camera_rgba_image)
plt.imshow(camera_rgba_image)
plt.pause(1e-9)
for i in range(self.num_envs):
# Here is an example. In practice, it's better not to convert tensor from GPU to CPU
points = depth_image_to_point_cloud_GPU(self.camera_tensors[i], self.camera_view_matrixs[i],
self.camera_proj_matrixs[i], self.camera_u2, self.camera_v2,
self.camera_props.width, self.camera_props.height, 10, self.device)
if points.shape[0] > 0:
selected_points = self.sample_points(points, sample_num=self.pointCloudDownsampleNum,
sample_mathed='random')
else:
selected_points = torch.zeros((self.num_envs, self.pointCloudDownsampleNum, 3), device=self.device)
point_clouds[i] = selected_points
if self.pointCloudVisualizer != None:
import open3d as o3d
points = point_clouds[0, :, :3].cpu().numpy()
# colors = plt.get_cmap()(point_clouds[0, :, 3].cpu().numpy())
self.o3d_pc.points = o3d.utility.Vector3dVector(points)
# self.o3d_pc.colors = o3d.utility.Vector3dVector(colors[..., :3])
if self.pointCloudVisualizerInitialized == False:
self.pointCloudVisualizer.add_geometry(self.o3d_pc)
self.pointCloudVisualizerInitialized = True
else:
self.pointCloudVisualizer.update(self.o3d_pc)
self.gym.end_access_image_tensors(self.sim)
point_clouds -= self.env_origin.view(self.num_envs, 1, 3)
point_clouds_start = goal_obs_start + 11
self.obs_buf[:, point_clouds_start:].copy_(point_clouds.view(self.num_envs, self.pointCloudDownsampleNum * 3))
[docs] def reset_target_pose(self, env_ids, apply_reset=False):
rand_floats = torch_rand_float(-1.0, 1.0, (len(env_ids), 4), device=self.device)
new_rot = randomize_rotation(rand_floats[:, 0], rand_floats[:, 1], self.x_unit_tensor[env_ids],
self.y_unit_tensor[env_ids])
self.goal_states[env_ids, 0:3] = self.goal_init_state[env_ids, 0:3]
self.goal_states[env_ids, 1] -= 0.25
self.goal_states[env_ids, 3:7] = new_rot
self.root_state_tensor[self.goal_object_indices[env_ids], 0:3] = self.goal_states[env_ids,
0:3] + self.goal_displacement_tensor
self.root_state_tensor[self.goal_object_indices[env_ids], 3:7] = self.goal_states[env_ids, 3:7]
self.root_state_tensor[self.goal_object_indices[env_ids], 7:13] = torch.zeros_like(
self.root_state_tensor[self.goal_object_indices[env_ids], 7:13])
if apply_reset:
goal_object_indices = self.goal_object_indices[env_ids].to(torch.int32)
self.gym.set_actor_root_state_tensor_indexed(self.sim,
gymtorch.unwrap_tensor(self.root_state_tensor),
gymtorch.unwrap_tensor(goal_object_indices), len(env_ids))
self.reset_goal_buf[env_ids] = 0
[docs] def reset_idx(self, env_ids, goal_env_ids):
# randomization can happen only at reset time, since it can reset actor positions on GPU
if self.randomize:
self.apply_randomizations(self.randomization_params)
# generate random values
rand_floats = torch_rand_float(-1.0, 1.0, (len(env_ids), self.num_shadow_hand_dofs * 2 + 5), device=self.device)
# randomize start object poses
self.reset_target_pose(env_ids)
# reset object
self.root_state_tensor[self.object_indices[env_ids]] = self.object_init_state[env_ids].clone()
self.root_state_tensor[self.object_indices[env_ids], 0:2] = self.object_init_state[env_ids, 0:2] + \
self.reset_position_noise * rand_floats[:, 0:2]
self.root_state_tensor[self.object_indices[env_ids], self.up_axis_idx] = self.object_init_state[
env_ids, self.up_axis_idx] + \
self.reset_position_noise * rand_floats[
:,
self.up_axis_idx]
new_object_rot = randomize_rotation(rand_floats[:, 3], rand_floats[:, 4], self.x_unit_tensor[env_ids],
self.y_unit_tensor[env_ids])
if self.object_type == "pen":
rand_angle_y = torch.tensor(0.3)
new_object_rot = randomize_rotation_pen(rand_floats[:, 3], rand_floats[:, 4], rand_angle_y,
self.x_unit_tensor[env_ids], self.y_unit_tensor[env_ids],
self.z_unit_tensor[env_ids])
self.root_state_tensor[self.object_indices[env_ids], 3:7] = new_object_rot
self.root_state_tensor[self.object_indices[env_ids], 7:13] = torch.zeros_like(
self.root_state_tensor[self.object_indices[env_ids], 7:13])
object_indices = torch.unique(torch.cat([self.object_indices[env_ids],
self.goal_object_indices[env_ids],
self.goal_object_indices[goal_env_ids]]).to(torch.int32))
# self.gym.set_actor_root_state_tensor_indexed(self.sim,
# gymtorch.unwrap_tensor(self.root_state_tensor),
# gymtorch.unwrap_tensor(object_indices), len(object_indices))
# reset shadow hand
delta_max = self.shadow_hand_dof_upper_limits - self.shadow_hand_dof_default_pos
delta_min = self.shadow_hand_dof_lower_limits - self.shadow_hand_dof_default_pos
rand_delta = delta_min + (delta_max - delta_min) * rand_floats[:, 5:5 + self.num_shadow_hand_dofs]
pos = self.shadow_hand_default_dof_pos + self.reset_dof_pos_noise * rand_delta
self.shadow_hand_dof_pos[env_ids, :] = pos
self.shadow_hand_another_dof_pos[env_ids, :] = pos
self.shadow_hand_dof_vel[env_ids, :] = self.shadow_hand_dof_default_vel + \
self.reset_dof_vel_noise * rand_floats[:,
5 + self.num_shadow_hand_dofs:5 + self.num_shadow_hand_dofs * 2]
self.shadow_hand_another_dof_vel[env_ids, :] = self.shadow_hand_dof_default_vel + \
self.reset_dof_vel_noise * rand_floats[:,
5 + self.num_shadow_hand_dofs:5 + self.num_shadow_hand_dofs * 2]
self.prev_targets[env_ids, :self.num_shadow_hand_dofs] = pos
self.cur_targets[env_ids, :self.num_shadow_hand_dofs] = pos
self.prev_targets[env_ids, self.num_shadow_hand_dofs:self.num_shadow_hand_dofs * 2] = pos
self.cur_targets[env_ids, self.num_shadow_hand_dofs:self.num_shadow_hand_dofs * 2] = pos
hand_indices = self.hand_indices[env_ids].to(torch.int32)
another_hand_indices = self.another_hand_indices[env_ids].to(torch.int32)
all_hand_indices = torch.unique(torch.cat([hand_indices,
another_hand_indices]).to(torch.int32))
self.gym.set_dof_position_target_tensor_indexed(self.sim,
gymtorch.unwrap_tensor(self.prev_targets),
gymtorch.unwrap_tensor(all_hand_indices), len(all_hand_indices))
all_indices = torch.unique(torch.cat([all_hand_indices,
object_indices]).to(torch.int32))
self.gym.set_dof_state_tensor_indexed(self.sim,
gymtorch.unwrap_tensor(self.dof_state),
gymtorch.unwrap_tensor(all_hand_indices), len(all_hand_indices))
self.gym.set_actor_root_state_tensor_indexed(self.sim,
gymtorch.unwrap_tensor(self.root_state_tensor),
gymtorch.unwrap_tensor(all_indices), len(all_indices))
self.progress_buf[env_ids] = 0
self.reset_buf[env_ids] = 0
self.successes[env_ids] = 0
[docs] def pre_physics_step(self, actions):
env_ids = self.reset_buf.nonzero(as_tuple=False).squeeze(-1)
goal_env_ids = self.reset_goal_buf.nonzero(as_tuple=False).squeeze(-1)
# if only goals need reset, then call set API
if len(goal_env_ids) > 0 and len(env_ids) == 0:
self.reset_target_pose(goal_env_ids, apply_reset=True)
# if goals need reset in addition to other envs, call set API in reset()
elif len(goal_env_ids) > 0:
self.reset_target_pose(goal_env_ids)
if len(env_ids) > 0:
self.reset_idx(env_ids, goal_env_ids)
self.actions = actions.clone().to(self.device)
if self.use_relative_control:
targets = self.prev_targets[:,
self.actuated_dof_indices] + self.shadow_hand_dof_speed_scale * self.dt * self.actions
self.cur_targets[:, self.actuated_dof_indices] = tensor_clamp(targets,
self.shadow_hand_dof_lower_limits[
self.actuated_dof_indices],
self.shadow_hand_dof_upper_limits[
self.actuated_dof_indices])
else:
self.cur_targets[:, self.actuated_dof_indices] = scale(self.actions[:, :20],
self.shadow_hand_dof_lower_limits[
self.actuated_dof_indices],
self.shadow_hand_dof_upper_limits[
self.actuated_dof_indices])
self.cur_targets[:, self.actuated_dof_indices] = self.act_moving_average * self.cur_targets[:,
self.actuated_dof_indices] + (
1.0 - self.act_moving_average) * self.prev_targets[
:,
self.actuated_dof_indices]
self.cur_targets[:, self.actuated_dof_indices] = tensor_clamp(
self.cur_targets[:, self.actuated_dof_indices],
self.shadow_hand_dof_lower_limits[self.actuated_dof_indices],
self.shadow_hand_dof_upper_limits[self.actuated_dof_indices])
self.cur_targets[:, self.actuated_dof_indices + 24] = scale(self.actions[:, 20:40],
self.shadow_hand_dof_lower_limits[
self.actuated_dof_indices],
self.shadow_hand_dof_upper_limits[
self.actuated_dof_indices])
self.cur_targets[:, self.actuated_dof_indices + 24] = self.act_moving_average * self.cur_targets[:,
self.actuated_dof_indices + 24] + (
1.0 - self.act_moving_average) * self.prev_targets[
:,
self.actuated_dof_indices]
self.cur_targets[:, self.actuated_dof_indices + 24] = tensor_clamp(
self.cur_targets[:, self.actuated_dof_indices + 24],
self.shadow_hand_dof_lower_limits[self.actuated_dof_indices],
self.shadow_hand_dof_upper_limits[self.actuated_dof_indices])
# self.root_state_tensor[self.goal_object_indices, :3] = self.rigid_body_states[:, 3 + 27, 0:3] + self.z_unit_tensor * 0.055 + self.y_unit_tensor * -0.04
# self.goal_states[:, 0:3] = self.root_state_tensor[self.goal_object_indices, :3]
# self.gym.set_actor_root_state_tensor(self.sim, gymtorch.unwrap_tensor(self.root_state_tensor))
self.prev_targets[:, self.actuated_dof_indices] = self.cur_targets[:, self.actuated_dof_indices]
self.prev_targets[:, self.actuated_dof_indices + 24] = self.cur_targets[:, self.actuated_dof_indices + 24]
self.gym.set_dof_position_target_tensor(self.sim, gymtorch.unwrap_tensor(self.cur_targets))
[docs] def post_physics_step(self):
self.progress_buf += 1
self.randomize_buf += 1
self.compute_observations()
self.compute_reward(self.actions)
if self.viewer and self.debug_viz:
# draw axes on target object
self.gym.clear_lines(self.viewer)
self.gym.refresh_rigid_body_state_tensor(self.sim)
for i in range(self.num_envs):
targetx = (self.goal_pos[i] + quat_apply(self.goal_rot[i],
to_torch([1, 0, 0], device=self.device) * 0.2)).cpu().numpy()
targety = (self.goal_pos[i] + quat_apply(self.goal_rot[i],
to_torch([0, 1, 0], device=self.device) * 0.2)).cpu().numpy()
targetz = (self.goal_pos[i] + quat_apply(self.goal_rot[i],
to_torch([0, 0, 1], device=self.device) * 0.2)).cpu().numpy()
p0 = self.goal_pos[i].cpu().numpy() + self.goal_displacement_tensor.cpu().numpy()
self.gym.add_lines(self.viewer, self.envs[i], 1,
[p0[0], p0[1], p0[2], targetx[0], targetx[1], targetx[2]], [0.85, 0.1, 0.1])
self.gym.add_lines(self.viewer, self.envs[i], 1,
[p0[0], p0[1], p0[2], targety[0], targety[1], targety[2]], [0.1, 0.85, 0.1])
self.gym.add_lines(self.viewer, self.envs[i], 1,
[p0[0], p0[1], p0[2], targetz[0], targetz[1], targetz[2]], [0.1, 0.1, 0.85])
objectx = (self.object_pos[i] + quat_apply(self.object_rot[i],
to_torch([1, 0, 0], device=self.device) * 0.2)).cpu().numpy()
objecty = (self.object_pos[i] + quat_apply(self.object_rot[i],
to_torch([0, 1, 0], device=self.device) * 0.2)).cpu().numpy()
objectz = (self.object_pos[i] + quat_apply(self.object_rot[i],
to_torch([0, 0, 1], device=self.device) * 0.2)).cpu().numpy()
p0 = self.object_pos[i].cpu().numpy()
self.gym.add_lines(self.viewer, self.envs[i], 1,
[p0[0], p0[1], p0[2], objectx[0], objectx[1], objectx[2]], [0.85, 0.1, 0.1])
self.gym.add_lines(self.viewer, self.envs[i], 1,
[p0[0], p0[1], p0[2], objecty[0], objecty[1], objecty[2]], [0.1, 0.85, 0.1])
self.gym.add_lines(self.viewer, self.envs[i], 1,
[p0[0], p0[1], p0[2], objectz[0], objectz[1], objectz[2]], [0.1, 0.1, 0.85])
[docs] def camera_visulization(self, is_depth_image=False):
if is_depth_image:
camera_depth_tensor = self.gym.get_camera_image_gpu_tensor(self.sim, self.envs[0], self.cameras[0],
gymapi.IMAGE_DEPTH)
torch_depth_tensor = gymtorch.wrap_tensor(camera_depth_tensor)
torch_depth_tensor = torch.clamp(torch_depth_tensor, -1, 1)
torch_depth_tensor = scale(torch_depth_tensor, to_torch([0], dtype=torch.float, device=self.device),
to_torch([256], dtype=torch.float, device=self.device))
camera_image = torch_depth_tensor.cpu().numpy()
camera_image = Im.fromarray(camera_image)
else:
camera_rgba_tensor = self.gym.get_camera_image_gpu_tensor(self.sim, self.envs[0], self.cameras[0],
gymapi.IMAGE_COLOR)
torch_rgba_tensor = gymtorch.wrap_tensor(camera_rgba_tensor)
camera_image = torch_rgba_tensor.cpu().numpy()
camera_image = Im.fromarray(camera_image)
return camera_image
[docs] def rand_row(self, tensor, dim_needed):
row_total = tensor.shape[0]
return tensor[torch.randint(low=0, high=row_total, size=(dim_needed,)), :]
[docs] def sample_points(self, points, sample_num=1000, sample_mathed='furthest'):
eff_points = points[points[:, 2] > 0.04]
if eff_points.shape[0] < sample_num:
eff_points = points
if sample_mathed == 'random':
sampled_points = self.rand_row(eff_points, sample_num)
return sampled_points
#####################################################################
###=========================jit functions=========================###
#####################################################################
@torch.jit.script
def depth_image_to_point_cloud_GPU(camera_tensor, camera_view_matrix_inv, camera_proj_matrix, u, v, width: float,
height: float, depth_bar: float, device: torch.device):
# time1 = time.time()
depth_buffer = camera_tensor.to(device)
# Get the camera view matrix and invert it to transform points from camera to world space
vinv = camera_view_matrix_inv
# Get the camera projection matrix and get the necessary scaling
# coefficients for deprojection
proj = camera_proj_matrix
fu = 2 / proj[0, 0]
fv = 2 / proj[1, 1]
centerU = width / 2
centerV = height / 2
Z = depth_buffer
X = -(u - centerU) / width * Z * fu
Y = (v - centerV) / height * Z * fv
Z = Z.view(-1)
valid = Z > -depth_bar
X = X.view(-1)
Y = Y.view(-1)
position = torch.vstack((X, Y, Z, torch.ones(len(X), device=device)))[:, valid]
position = position.permute(1, 0)
position = position @ vinv
points = position[:, 0:3]
return points
@torch.jit.script
def compute_hand_reward(
rew_buf, reset_buf, reset_goal_buf, progress_buf, successes, consecutive_successes,
max_episode_length: float, object_pos, object_rot, target_pos, target_rot,
dist_reward_scale: float, rot_reward_scale: float, rot_eps: float,
actions, action_penalty_scale: float,
success_tolerance: float, reach_goal_bonus: float, fall_dist: float,
fall_penalty: float, max_consecutive_successes: int, av_factor: float, ignore_z_rot: bool
):
# Distance from the hand to the object
goal_dist = torch.norm(target_pos - object_pos, p=2, dim=-1)
if ignore_z_rot:
success_tolerance = 2.0 * success_tolerance
# Orientation alignment for the cube in hand and goal cube
quat_diff = quat_mul(object_rot, quat_conjugate(target_rot))
rot_dist = 2.0 * torch.asin(torch.clamp(torch.norm(quat_diff[:, 0:3], p=2, dim=-1), max=1.0))
dist_rew = goal_dist
# rot_rew = 1.0/(torch.abs(rot_dist) + rot_eps) * rot_reward_scale
action_penalty = torch.sum(actions ** 2, dim=-1)
# Total reward is: position distance + orientation alignment + action regularization + success bonus + fall penalty
reward = torch.exp(-0.2 * (dist_rew * dist_reward_scale + rot_dist))
# Find out which envs hit the goal and update successes count
goal_resets = torch.where(torch.abs(goal_dist) <= 0, torch.ones_like(reset_goal_buf), reset_goal_buf)
successes = successes + goal_resets
# Success bonus: orientation is within `success_tolerance` of goal orientation
reward = torch.where(goal_resets == 1, reward + reach_goal_bonus, reward)
# Fall penalty: distance to the goal is larger than a threashold
reward = torch.where(object_pos[:, 2] <= 0.2, reward + fall_penalty, reward)
# Check env termination conditions, including maximum success number
resets = torch.where(object_pos[:, 2] <= 0.2, torch.ones_like(reset_buf), reset_buf)
if max_consecutive_successes > 0:
# Reset progress buffer on goal envs if max_consecutive_successes > 0
progress_buf = torch.where(torch.abs(rot_dist) <= success_tolerance, torch.zeros_like(progress_buf),
progress_buf)
resets = torch.where(successes >= max_consecutive_successes, torch.ones_like(resets), resets)
resets = torch.where(progress_buf >= max_episode_length, torch.ones_like(resets), resets)
# Apply penalty for not reaching the goal
if max_consecutive_successes > 0:
reward = torch.where(progress_buf >= max_episode_length, reward + 0.5 * fall_penalty, reward)
num_resets = torch.sum(resets)
finished_cons_successes = torch.sum(successes * resets.float())
cons_successes = torch.where(num_resets > 0, av_factor * finished_cons_successes / num_resets + (
1.0 - av_factor) * consecutive_successes, consecutive_successes)
return reward, resets, goal_resets, progress_buf, successes, cons_successes
@torch.jit.script
def randomize_rotation(rand0, rand1, x_unit_tensor, y_unit_tensor):
return quat_mul(quat_from_angle_axis(rand0 * np.pi, x_unit_tensor),
quat_from_angle_axis(rand1 * np.pi, y_unit_tensor))
# @torch.jit.script
[docs]def randomize_rotation_pen(rand0, rand1, max_angle, x_unit_tensor, y_unit_tensor, z_unit_tensor):
rot = quat_mul(quat_from_angle_axis(0.5 * np.pi + rand0 * max_angle, x_unit_tensor),
quat_from_angle_axis(rand0 * np.pi, z_unit_tensor))
return rot