Source code for rofunc.learning.RofuncRL.tasks.isaacgymenv.hands.shadow_hand_catch_underarm

# Copyright (c) 2020, NVIDIA CORPORATION.  All rights reserved.
# NVIDIA CORPORATION and its licensors retain all intellectual property
# and proprietary rights in and to this software, related documentation
# and any modifications thereto.  Any use, reproduction, disclosure or
# distribution of this software and related documentation without an express
# license agreement from NVIDIA CORPORATION is strictly prohibited.

import os
import random

from PIL import Image as Im
from isaacgym import gymapi
from isaacgym import gymtorch

from rofunc.learning.RofuncRL.tasks.isaacgymenv.base.vec_task import VecTask
from rofunc.learning.RofuncRL.tasks.utils.torch_jit_utils import *
from rofunc.utils.oslab import get_rofunc_path


[docs]class ShadowHandCatchUnderarmTask(VecTask): """ This class corresponds to the Catch Underarm task. In this task, two shadow hands with palms facing upwards are controlled to pass an object from one palm to the other. What makes it more difficult than the Hand over task is that the hands' translation and rotation degrees of freedom are no longer frozen but are added into the action space """ def __init__(self, cfg, rl_device, sim_device, graphics_device_id, headless, virtual_screen_capture, force_render, agent_index=[[[0, 1, 2, 3, 4, 5]], [[0, 1, 2, 3, 4, 5]]], is_multi_agent=False): """ :param cfg: :param rl_device: :param sim_device: :param graphics_device_id: :param headless: :param virtual_screen_capture: :param force_render: :param agent_index: Specifies how to divide the agents of the hands, useful only when using a multi-agent algorithm. It contains two lists, representing the left hand and the right hand. Each list has six numbers from 0 to 5, representing the palm, middle finger, ring finger, tail finger, index finger, and thumb. Each part can be combined arbitrarily, and if placed in the same list, it means that it is divided into the same agent. The default setting is [[[0, 1, 2, 3, 4, 5]], [[0, 1, 2, 3, 4, 5]]], which means that the two whole hands are regarded as one agent respectively. :param is_multi_agent: Specifies whether it is a multi-agent environment """ self.cfg = cfg self.agent_index = agent_index self.is_multi_agent = is_multi_agent # Domain randomization configuration self.randomize = self.cfg["task"]["randomize"] self.randomization_params = self.cfg["task"]["randomization_params"] self.aggregate_mode = self.cfg["env"]["aggregateMode"] # Reward configuration self.dist_reward_scale = self.cfg["env"]["distRewardScale"] self.rot_reward_scale = self.cfg["env"]["rotRewardScale"] self.action_penalty_scale = self.cfg["env"]["actionPenaltyScale"] self.success_tolerance = self.cfg["env"]["successTolerance"] self.reach_goal_bonus = self.cfg["env"]["reachGoalBonus"] self.fall_dist = self.cfg["env"]["fallDistance"] self.fall_penalty = self.cfg["env"]["fallPenalty"] self.rot_eps = self.cfg["env"]["rotEps"] # Scale factor of velocity based observations self.vel_obs_scale = 0.2 # Scale factor of velocity based observations self.force_torque_obs_scale = 10.0 # The noise of the initial state each time the environment is reset self.reset_position_noise = self.cfg["env"]["resetPositionNoise"] self.reset_rotation_noise = self.cfg["env"]["resetRotationNoise"] self.reset_dof_pos_noise = self.cfg["env"]["resetDofPosRandomInterval"] self.reset_dof_vel_noise = self.cfg["env"]["resetDofVelRandomInterval"] # The configuration of how to control the ShadowHand (Action in RL) self.shadow_hand_dof_speed_scale = self.cfg["env"]["dofSpeedScale"] self.use_relative_control = self.cfg["env"]["useRelativeControl"] self.act_moving_average = self.cfg["env"]["actionsMovingAverage"] # Whether to enable debug mode during visualization self.debug_viz = self.cfg["env"]["enableDebugVis"] # Success and goal configuration self.max_episode_length = self.cfg["env"]["episodeLength"] self.reset_time = self.cfg["env"].get("resetTime", -1.0) self.print_success_stat = self.cfg["env"]["printNumSuccesses"] self.max_consecutive_successes = self.cfg["env"]["maxConsecutiveSuccesses"] self.av_factor = self.cfg["env"].get("averFactor", 0.01) print("Averaging factor: ", self.av_factor) # Scale factor of transition and orientation when the base of Shadowhand is not fixed self.transition_scale = self.cfg["env"]["transition_scale"] self.orientation_scale = self.cfg["env"]["orientation_scale"] # The inverser number of the control frequency control_freq_inv = self.cfg["env"].get("controlFrequencyInv", 1) if self.reset_time > 0.0: self.max_episode_length = int(round(self.reset_time / (control_freq_inv * self.sim_params.dt))) print("Reset time: ", self.reset_time) print("New episode length: ", self.max_episode_length) # Specifies the object to be manipulated, which can be an object in the sapien dataset. # Only useful in certain environments self.object_type = self.cfg["env"]["objectType"] self.ignore_z = (self.object_type == "pen") # Specify the path of the asset self.asset_files_dict = { "block": "urdf/objects/cube_multicolor.urdf", "egg": "mjcf/open_ai_assets/hand/egg.xml", "pen": "mjcf/open_ai_assets/hand/pen.xml" } if "asset" in self.cfg["env"]: self.asset_files_dict["block"] = self.cfg["env"]["asset"].get("assetFileNameBlock", self.asset_files_dict["block"]) self.asset_files_dict["egg"] = self.cfg["env"]["asset"].get("assetFileNameEgg", self.asset_files_dict["egg"]) self.asset_files_dict["pen"] = self.cfg["env"]["asset"].get("assetFileNamePen", self.asset_files_dict["pen"]) # can be "openai", "full_no_vel", "full", "full_state" self.obs_type = self.cfg["env"]["observationType"] if not (self.obs_type in ["point_cloud", "full_state"]): raise Exception( "Unknown type of observations!\nobservationType should be one of: [point_cloud, full_state]") print("Obs type:", self.obs_type) # Specify the number of action and observation in the environment self.num_point_cloud_feature_dim = 768 self.num_obs_dict = { "point_cloud": 422 + self.num_point_cloud_feature_dim * 3, "point_cloud_for_distill": 422 + self.num_point_cloud_feature_dim * 3, "full_state": 422 } self.num_hand_obs = 72 + 95 + 26 + 6 self.up_axis = 'z' # The names of the five fingertips of Shadowhand, which are used to obtain force information later self.fingertips = ["robot0:ffdistal", "robot0:mfdistal", "robot0:rfdistal", "robot0:lfdistal", "robot0:thdistal"] self.a_fingertips = ["robot1:ffdistal", "robot1:mfdistal", "robot1:rfdistal", "robot1:lfdistal", "robot1:thdistal"] self.num_fingertips = len(self.fingertips) * 2 self.use_vel_obs = False self.fingertip_obs = True self.asymmetric_obs = self.cfg["env"]["asymmetric_observations"] num_states = 0 if self.asymmetric_obs: num_states = 211 self.cfg["env"]["numObservations"] = self.num_obs_dict[self.obs_type] self.cfg["env"]["numStates"] = num_states if self.is_multi_agent: self.num_agents = 2 self.cfg["env"]["numActions"] = 26 else: self.num_agents = 1 self.cfg["env"]["numActions"] = 52 super().__init__(cfg, rl_device, sim_device, graphics_device_id, headless, virtual_screen_capture, force_render) self.camera_debug = self.cfg["env"].get("cameraDebug", False) self.point_cloud_debug = self.cfg["env"].get("pointCloudDebug", False) # Viewer settings, including the camera's initial position and viewing direction if self.viewer != None: cam_pos = gymapi.Vec3(10.0, 5.0, 1.0) cam_target = gymapi.Vec3(6.0, 5.0, 0.0) self.gym.viewer_camera_look_at(self.viewer, None, cam_pos, cam_target) # Get gym GPU state tensors actor_root_state_tensor = self.gym.acquire_actor_root_state_tensor(self.sim) dof_state_tensor = self.gym.acquire_dof_state_tensor(self.sim) rigid_body_tensor = self.gym.acquire_rigid_body_state_tensor(self.sim) sensor_tensor = self.gym.acquire_force_sensor_tensor(self.sim) self.vec_sensor_tensor = gymtorch.wrap_tensor(sensor_tensor).view(self.num_envs, self.num_fingertips * 6) dof_force_tensor = self.gym.acquire_dof_force_tensor(self.sim) self.dof_force_tensor = gymtorch.wrap_tensor(dof_force_tensor).view(self.num_envs, self.num_shadow_hand_dofs * 2) self.gym.refresh_actor_root_state_tensor(self.sim) self.gym.refresh_dof_state_tensor(self.sim) self.gym.refresh_rigid_body_state_tensor(self.sim) # Create dof state wrapper tensors for different slices self.shadow_hand_default_dof_pos = torch.zeros(self.num_shadow_hand_dofs, dtype=torch.float, device=self.device) self.dof_state = gymtorch.wrap_tensor(dof_state_tensor) self.shadow_hand_dof_state = self.dof_state.view(self.num_envs, -1, 2)[:, :self.num_shadow_hand_dofs] self.shadow_hand_dof_pos = self.shadow_hand_dof_state[..., 0] self.shadow_hand_dof_vel = self.shadow_hand_dof_state[..., 1] self.shadow_hand_another_dof_state = self.dof_state.view(self.num_envs, -1, 2)[:, self.num_shadow_hand_dofs:self.num_shadow_hand_dofs * 2] self.shadow_hand_another_dof_pos = self.shadow_hand_another_dof_state[..., 0] self.shadow_hand_another_dof_vel = self.shadow_hand_another_dof_state[..., 1] # Create rigid body state wrapper tensors for different slices self.rigid_body_states = gymtorch.wrap_tensor(rigid_body_tensor).view(self.num_envs, -1, 13) self.num_bodies = self.rigid_body_states.shape[1] self.root_state_tensor = gymtorch.wrap_tensor(actor_root_state_tensor).view(-1, 13) self.hand_positions = self.root_state_tensor[:, 0:3] self.hand_orientations = self.root_state_tensor[:, 3:7] self.hand_linvels = self.root_state_tensor[:, 7:10] self.hand_angvels = self.root_state_tensor[:, 10:13] self.saved_root_tensor = self.root_state_tensor.clone() # The total number of dofs in the environment self.num_dofs = self.gym.get_sim_dof_count(self.sim) // self.num_envs # Tensor used to control dof value in the environment self.prev_targets = torch.zeros((self.num_envs, self.num_dofs), dtype=torch.float, device=self.device) self.cur_targets = torch.zeros((self.num_envs, self.num_dofs), dtype=torch.float, device=self.device) # The total number of actors in the environment self.global_indices = torch.arange(self.num_envs * 3, dtype=torch.int32, device=self.device).view(self.num_envs, -1) # Unit tensor in x, y, z axis self.x_unit_tensor = to_torch([1, 0, 0], dtype=torch.float, device=self.device).repeat((self.num_envs, 1)) self.y_unit_tensor = to_torch([0, 1, 0], dtype=torch.float, device=self.device).repeat((self.num_envs, 1)) self.z_unit_tensor = to_torch([0, 0, 1], dtype=torch.float, device=self.device).repeat((self.num_envs, 1)) # Reset and success buffer self.reset_goal_buf = self.reset_buf.clone() self.successes = torch.zeros(self.num_envs, dtype=torch.float, device=self.device) self.consecutive_successes = torch.zeros(self.num_envs, dtype=torch.float, device=self.device) self.av_factor = to_torch(self.av_factor, dtype=torch.float, device=self.device) # Forces and torque applied to the Shadowhand's base self.apply_forces = torch.zeros((self.num_envs, self.num_bodies, 3), device=self.device, dtype=torch.float) self.apply_torque = torch.zeros((self.num_envs, self.num_bodies, 3), device=self.device, dtype=torch.float) # Total success and reset count self.total_successes = 0 self.total_resets = 0
[docs] def create_sim(self): """ Allocates which device will simulate and which device will render the scene. Defines the simulation type to be used """ self.dt = self.sim_params.dt self.up_axis_idx = self.set_sim_params_up_axis(self.sim_params, self.up_axis) self.sim = super().create_sim(self.device_id, self.graphics_device_id, self.physics_engine, self.sim_params) self._create_ground_plane() self._create_envs(self.num_envs, self.cfg["env"]['envSpacing'], int(np.sqrt(self.num_envs)))
def _create_ground_plane(self): """ Adds ground plane to simulation """ plane_params = gymapi.PlaneParams() plane_params.normal = gymapi.Vec3(0.0, 0.0, 1.0) self.gym.add_ground(self.sim, plane_params) def _create_envs(self, num_envs, spacing, num_per_row): """ Create multiple parallel isaacgym environments Args: num_envs (int): The total number of environment spacing (float): Specifies half the side length of the square area occupied by each environment num_per_row (int): Specify how many environments in a row """ lower = gymapi.Vec3(-spacing, -spacing, 0.0) upper = gymapi.Vec3(spacing, spacing, spacing) # get rofunc path from rofunc package metadata rofunc_path = get_rofunc_path() asset_root = os.path.join(rofunc_path, "simulator/assets") shadow_hand_asset_file = "mjcf/open_ai_assets/hand/shadow_hand.xml" shadow_hand_another_asset_file = "mjcf/open_ai_assets/hand/shadow_hand1.xml" if "asset" in self.cfg["env"]: asset_root = self.cfg["env"]["asset"].get("assetRoot", asset_root) shadow_hand_asset_file = self.cfg["env"]["asset"].get("assetFileName", shadow_hand_asset_file) object_asset_file = self.asset_files_dict[self.object_type] # Load Shadowhand asset asset_options = gymapi.AssetOptions() # Switch Meshes from Z-up left-handed system to Y-up Right-handed coordinate system asset_options.flip_visual_attachments = False # Set Asset base to a fixed placement upon import asset_options.fix_base_link = False # Merge links that are connected by fixed joints asset_options.collapse_fixed_joints = True # Disables gravity for asset asset_options.disable_gravity = True # Thickness of the collision shapes. Sets how far objects should come to rest from the surface of this body asset_options.thickness = 0.001 # Angular velocity damping for rigid bodies asset_options.angular_damping = 100 # Linear velocity damping for rigid bodies asset_options.linear_damping = 100 if self.physics_engine == gymapi.SIM_PHYSX: asset_options.use_physx_armature = True asset_options.default_dof_drive_mode = gymapi.DOF_MODE_NONE shadow_hand_asset = self.gym.load_asset(self.sim, asset_root, shadow_hand_asset_file, asset_options) shadow_hand_another_asset = self.gym.load_asset(self.sim, asset_root, shadow_hand_another_asset_file, asset_options) # The count of Shadowhand's attributes self.num_shadow_hand_bodies = self.gym.get_asset_rigid_body_count(shadow_hand_asset) self.num_shadow_hand_shapes = self.gym.get_asset_rigid_shape_count(shadow_hand_asset) self.num_shadow_hand_dofs = self.gym.get_asset_dof_count(shadow_hand_asset) self.num_shadow_hand_actuators = self.gym.get_asset_actuator_count(shadow_hand_asset) self.num_shadow_hand_tendons = self.gym.get_asset_tendon_count(shadow_hand_asset) print("self.num_shadow_hand_bodies: ", self.num_shadow_hand_bodies) print("self.num_shadow_hand_shapes: ", self.num_shadow_hand_shapes) print("self.num_shadow_hand_dofs: ", self.num_shadow_hand_dofs) print("self.num_shadow_hand_actuators: ", self.num_shadow_hand_actuators) print("self.num_shadow_hand_tendons: ", self.num_shadow_hand_tendons) # Tendon set up limit_stiffness = 30 t_damping = 0.1 relevant_tendons = ["robot0:T_FFJ1c", "robot0:T_MFJ1c", "robot0:T_RFJ1c", "robot0:T_LFJ1c"] a_relevant_tendons = ["robot1:T_FFJ1c", "robot1:T_MFJ1c", "robot1:T_RFJ1c", "robot1:T_LFJ1c"] tendon_props = self.gym.get_asset_tendon_properties(shadow_hand_asset) a_tendon_props = self.gym.get_asset_tendon_properties(shadow_hand_another_asset) for i in range(self.num_shadow_hand_tendons): for rt in relevant_tendons: if self.gym.get_asset_tendon_name(shadow_hand_asset, i) == rt: tendon_props[i].limit_stiffness = limit_stiffness tendon_props[i].damping = t_damping for rt in a_relevant_tendons: if self.gym.get_asset_tendon_name(shadow_hand_another_asset, i) == rt: a_tendon_props[i].limit_stiffness = limit_stiffness a_tendon_props[i].damping = t_damping self.gym.set_asset_tendon_properties(shadow_hand_asset, tendon_props) self.gym.set_asset_tendon_properties(shadow_hand_another_asset, a_tendon_props) # Specifies the index of the actuated dof actuated_dof_names = [self.gym.get_asset_actuator_joint_name(shadow_hand_asset, i) for i in range(self.num_shadow_hand_actuators)] self.actuated_dof_indices = [self.gym.find_asset_dof_index(shadow_hand_asset, name) for name in actuated_dof_names] # Set Shadowhand dof properties shadow_hand_dof_props = self.gym.get_asset_dof_properties(shadow_hand_asset) shadow_hand_another_dof_props = self.gym.get_asset_dof_properties(shadow_hand_another_asset) self.shadow_hand_dof_lower_limits = [] self.shadow_hand_dof_upper_limits = [] self.shadow_hand_dof_default_pos = [] self.shadow_hand_dof_default_vel = [] self.sensors = [] sensor_pose = gymapi.Transform() for i in range(self.num_shadow_hand_dofs): self.shadow_hand_dof_lower_limits.append(shadow_hand_dof_props['lower'][i]) self.shadow_hand_dof_upper_limits.append(shadow_hand_dof_props['upper'][i]) self.shadow_hand_dof_default_pos.append(0.0) self.shadow_hand_dof_default_vel.append(0.0) self.actuated_dof_indices = to_torch(self.actuated_dof_indices, dtype=torch.long, device=self.device) self.shadow_hand_dof_lower_limits = to_torch(self.shadow_hand_dof_lower_limits, device=self.device) self.shadow_hand_dof_upper_limits = to_torch(self.shadow_hand_dof_upper_limits, device=self.device) self.shadow_hand_dof_default_pos = to_torch(self.shadow_hand_dof_default_pos, device=self.device) self.shadow_hand_dof_default_vel = to_torch(self.shadow_hand_dof_default_vel, device=self.device) # Load manipulated object and goal assets and their initial state object_asset_options = gymapi.AssetOptions() object_asset_options.density = 500 object_asset = self.gym.load_asset(self.sim, asset_root, object_asset_file, object_asset_options) object_asset_options.disable_gravity = True goal_asset = self.gym.load_asset(self.sim, asset_root, object_asset_file, object_asset_options) shadow_hand_start_pose = gymapi.Transform() shadow_hand_start_pose.p = gymapi.Vec3(*get_axis_params(0.5, self.up_axis_idx)) shadow_another_hand_start_pose = gymapi.Transform() shadow_another_hand_start_pose.p = gymapi.Vec3(0, -1.15, 0.5) shadow_another_hand_start_pose.r = gymapi.Quat().from_euler_zyx(0, 0, 3.1415) object_start_pose = gymapi.Transform() object_start_pose.p = gymapi.Vec3() object_start_pose.p.x = shadow_hand_start_pose.p.x pose_dy, pose_dz = -0.39, 0.04 object_start_pose.p.y = shadow_hand_start_pose.p.y + pose_dy object_start_pose.p.z = shadow_hand_start_pose.p.z + pose_dz if self.object_type == "pen": object_start_pose.p.z = shadow_hand_start_pose.p.z + 0.02 self.goal_displacement = gymapi.Vec3(-0., 0.0, 0.) self.goal_displacement_tensor = to_torch( [self.goal_displacement.x, self.goal_displacement.y, self.goal_displacement.z], device=self.device) goal_start_pose = gymapi.Transform() goal_start_pose.p = object_start_pose.p goal_start_pose.p.z -= 0.0 # Compute aggregate size max_agg_bodies = self.num_shadow_hand_bodies * 2 + 2 max_agg_shapes = self.num_shadow_hand_shapes * 2 + 2 self.shadow_hands = [] self.envs = [] self.object_init_state = [] self.hand_start_states = [] self.hand_indices = [] self.another_hand_indices = [] self.fingertip_indices = [] self.object_indices = [] self.goal_object_indices = [] self.fingertip_handles = [self.gym.find_asset_rigid_body_index(shadow_hand_asset, name) for name in self.fingertips] self.fingertip_another_handles = [self.gym.find_asset_rigid_body_index(shadow_hand_another_asset, name) for name in self.a_fingertips] # Create fingertip force sensors, if needed sensor_pose = gymapi.Transform() for ft_handle in self.fingertip_handles: self.gym.create_asset_force_sensor(shadow_hand_asset, ft_handle, sensor_pose) for ft_a_handle in self.fingertip_another_handles: self.gym.create_asset_force_sensor(shadow_hand_another_asset, ft_a_handle, sensor_pose) if self.obs_type in ["point_cloud"]: self.cameras = [] self.camera_tensors = [] self.camera_view_matrixs = [] self.camera_proj_matrixs = [] self.camera_props = gymapi.CameraProperties() self.camera_props.width = 256 self.camera_props.height = 256 self.camera_props.enable_tensors = True self.env_origin = torch.zeros((self.num_envs, 3), device=self.device, dtype=torch.float) self.pointCloudDownsampleNum = 768 self.camera_u = torch.arange(0, self.camera_props.width, device=self.device) self.camera_v = torch.arange(0, self.camera_props.height, device=self.device) self.camera_v2, self.camera_u2 = torch.meshgrid(self.camera_v, self.camera_u, indexing='ij') if self.point_cloud_debug: import open3d as o3d from bidexhands.utils.o3dviewer import PointcloudVisualizer self.pointCloudVisualizer = PointcloudVisualizer() self.pointCloudVisualizerInitialized = False self.o3d_pc = o3d.geometry.PointCloud() else: self.pointCloudVisualizer = None for i in range(self.num_envs): # Create env instance env_ptr = self.gym.create_env( self.sim, lower, upper, num_per_row ) if self.aggregate_mode >= 1: self.gym.begin_aggregate(env_ptr, max_agg_bodies, max_agg_shapes, True) # Add hand - collision filter = -1 to use asset collision filters set in mjcf loader shadow_hand_actor = self.gym.create_actor(env_ptr, shadow_hand_asset, shadow_hand_start_pose, "hand", i, -1, 0) shadow_hand_another_actor = self.gym.create_actor(env_ptr, shadow_hand_another_asset, shadow_another_hand_start_pose, "another_hand", i, -1, 0) self.hand_start_states.append( [shadow_hand_start_pose.p.x, shadow_hand_start_pose.p.y, shadow_hand_start_pose.p.z, shadow_hand_start_pose.r.x, shadow_hand_start_pose.r.y, shadow_hand_start_pose.r.z, shadow_hand_start_pose.r.w, 0, 0, 0, 0, 0, 0]) self.gym.set_actor_dof_properties(env_ptr, shadow_hand_actor, shadow_hand_dof_props) hand_idx = self.gym.get_actor_index(env_ptr, shadow_hand_actor, gymapi.DOMAIN_SIM) self.hand_indices.append(hand_idx) self.gym.set_actor_dof_properties(env_ptr, shadow_hand_another_actor, shadow_hand_another_dof_props) another_hand_idx = self.gym.get_actor_index(env_ptr, shadow_hand_another_actor, gymapi.DOMAIN_SIM) self.another_hand_indices.append(another_hand_idx) # Randomize colors and textures for rigid body num_bodies = self.gym.get_actor_rigid_body_count(env_ptr, shadow_hand_actor) hand_rigid_body_index = [[0, 1, 2, 3], [4, 5, 6, 7], [8, 9, 10, 11], [12, 13, 14, 15], [16, 17, 18, 19, 20], [21, 22, 23, 24, 25]] for n in self.agent_index[0]: colorx = random.uniform(0, 1) colory = random.uniform(0, 1) colorz = random.uniform(0, 1) for m in n: for o in hand_rigid_body_index[m]: self.gym.set_rigid_body_color(env_ptr, shadow_hand_actor, o, gymapi.MESH_VISUAL, gymapi.Vec3(colorx, colory, colorz)) for n in self.agent_index[1]: colorx = random.uniform(0, 1) colory = random.uniform(0, 1) colorz = random.uniform(0, 1) for m in n: for o in hand_rigid_body_index[m]: self.gym.set_rigid_body_color(env_ptr, shadow_hand_another_actor, o, gymapi.MESH_VISUAL, gymapi.Vec3(colorx, colory, colorz)) # Create fingertip force-torque sensors self.gym.enable_actor_dof_force_sensors(env_ptr, shadow_hand_actor) self.gym.enable_actor_dof_force_sensors(env_ptr, shadow_hand_another_actor) # Add object object_handle = self.gym.create_actor(env_ptr, object_asset, object_start_pose, "object", i, 0, 0) self.object_init_state.append([object_start_pose.p.x, object_start_pose.p.y, object_start_pose.p.z, object_start_pose.r.x, object_start_pose.r.y, object_start_pose.r.z, object_start_pose.r.w, 0, 0, 0, 0, 0, 0]) object_idx = self.gym.get_actor_index(env_ptr, object_handle, gymapi.DOMAIN_SIM) self.object_indices.append(object_idx) # Add goal object goal_handle = self.gym.create_actor(env_ptr, goal_asset, goal_start_pose, "goal_object", i + self.num_envs, 0, 0) goal_object_idx = self.gym.get_actor_index(env_ptr, goal_handle, gymapi.DOMAIN_SIM) self.goal_object_indices.append(goal_object_idx) if self.object_type != "block": self.gym.set_rigid_body_color( env_ptr, object_handle, 0, gymapi.MESH_VISUAL, gymapi.Vec3(0.6, 0.72, 0.98)) self.gym.set_rigid_body_color( env_ptr, goal_handle, 0, gymapi.MESH_VISUAL, gymapi.Vec3(0.6, 0.72, 0.98)) if self.obs_type in ["point_cloud"]: camera_handle = self.gym.create_camera_sensor(env_ptr, self.camera_props) self.gym.set_camera_location(camera_handle, env_ptr, gymapi.Vec3(0.25, -0.57, 0.85), gymapi.Vec3(-0.24, -0.57, 0)) camera_tensor = self.gym.get_camera_image_gpu_tensor(self.sim, env_ptr, camera_handle, gymapi.IMAGE_DEPTH) torch_cam_tensor = gymtorch.wrap_tensor(camera_tensor) cam_vinv = torch.inverse( (torch.tensor(self.gym.get_camera_view_matrix(self.sim, env_ptr, camera_handle)))).to(self.device) cam_proj = torch.tensor(self.gym.get_camera_proj_matrix(self.sim, env_ptr, camera_handle), device=self.device) origin = self.gym.get_env_origin(env_ptr) self.env_origin[i][0] = origin.x self.env_origin[i][1] = origin.y self.env_origin[i][2] = origin.z self.camera_tensors.append(torch_cam_tensor) self.camera_view_matrixs.append(cam_vinv) self.camera_proj_matrixs.append(cam_proj) self.cameras.append(camera_handle) if self.aggregate_mode > 0: self.gym.end_aggregate(env_ptr) self.envs.append(env_ptr) self.shadow_hands.append(shadow_hand_actor) # Convert each tensor to a pytorch type self.object_init_state = to_torch(self.object_init_state, device=self.device, dtype=torch.float).view( self.num_envs, 13) self.goal_states = self.object_init_state.clone() self.goal_init_state = self.goal_states.clone() self.hand_start_states = to_torch(self.hand_start_states, device=self.device).view(self.num_envs, 13) self.fingertip_handles = to_torch(self.fingertip_handles, dtype=torch.long, device=self.device) self.fingertip_another_handles = to_torch(self.fingertip_another_handles, dtype=torch.long, device=self.device) self.hand_indices = to_torch(self.hand_indices, dtype=torch.long, device=self.device) self.another_hand_indices = to_torch(self.another_hand_indices, dtype=torch.long, device=self.device) self.object_indices = to_torch(self.object_indices, dtype=torch.long, device=self.device) self.goal_object_indices = to_torch(self.goal_object_indices, dtype=torch.long, device=self.device)
[docs] def compute_reward(self, actions): """ Compute the reward of all environment. The core function is compute_hand_reward( self.rew_buf, self.reset_buf, self.reset_goal_buf, self.progress_buf, self.successes, self.consecutive_successes, self.max_episode_length, self.object_pos, self.object_rot, self.goal_pos, self.goal_rot, self.dist_reward_scale, self.rot_reward_scale, self.rot_eps, self.actions, self.action_penalty_scale, self.success_tolerance, self.reach_goal_bonus, self.fall_dist, self.fall_penalty, self.max_consecutive_successes, self.av_factor, (self.object_type == "pen") ) , which we will introduce in detail there Args: actions (tensor): Actions of agents in the all environment """ self.rew_buf[:], self.reset_buf[:], self.reset_goal_buf[:], self.progress_buf[:], self.successes[ :], self.consecutive_successes[ :] = compute_hand_reward( self.rew_buf, self.reset_buf, self.reset_goal_buf, self.progress_buf, self.successes, self.consecutive_successes, self.max_episode_length, self.object_pos, self.object_rot, self.goal_pos, self.goal_rot, self.dist_reward_scale, self.rot_reward_scale, self.rot_eps, self.actions, self.action_penalty_scale, self.success_tolerance, self.reach_goal_bonus, self.fall_dist, self.fall_penalty, self.max_consecutive_successes, self.av_factor, (self.object_type == "pen") ) self.extras['successes'] = self.successes self.extras['consecutive_successes'] = self.consecutive_successes # Whether to print out success information if self.print_success_stat: self.total_resets = self.total_resets + self.reset_buf.sum() direct_average_successes = self.total_successes + self.successes.sum() self.total_successes = self.total_successes + (self.successes * self.reset_buf).sum() # The direct average shows the overall result more quickly, but slightly undershoots long term # policy performance. print("Direct average consecutive successes = {:.1f}".format( direct_average_successes / (self.total_resets + self.num_envs))) if self.total_resets > 0: print("Post-Reset average consecutive successes = {:.1f}".format( self.total_successes / self.total_resets))
[docs] def compute_observations(self): """ Compute the observations of all environment. The core function is self.compute_full_state(True), which we will introduce in detail there """ # Refreash gym GPU state tensors and specify the state we need self.gym.refresh_dof_state_tensor(self.sim) self.gym.refresh_actor_root_state_tensor(self.sim) self.gym.refresh_rigid_body_state_tensor(self.sim) self.gym.refresh_force_sensor_tensor(self.sim) self.gym.refresh_dof_force_tensor(self.sim) if self.obs_type in ["point_cloud"]: self.gym.render_all_camera_sensors(self.sim) self.gym.start_access_image_tensors(self.sim) self.object_pose = self.root_state_tensor[self.object_indices, 0:7] self.object_pos = self.root_state_tensor[self.object_indices, 0:3] self.object_rot = self.root_state_tensor[self.object_indices, 3:7] self.object_linvel = self.root_state_tensor[self.object_indices, 7:10] self.object_angvel = self.root_state_tensor[self.object_indices, 10:13] self.goal_pose = self.goal_states[:, 0:7] self.goal_pos = self.goal_states[:, 0:3] self.goal_rot = self.goal_states[:, 3:7] self.fingertip_state = self.rigid_body_states[:, self.fingertip_handles][:, :, 0:13] self.fingertip_pos = self.rigid_body_states[:, self.fingertip_handles][:, :, 0:3] self.fingertip_another_state = self.rigid_body_states[:, self.fingertip_another_handles][:, :, 0:13] self.fingertip_another_pos = self.rigid_body_states[:, self.fingertip_another_handles][:, :, 0:3] if self.obs_type == "full_state": self.compute_full_state() elif self.obs_type == "point_cloud": self.compute_point_cloud_observation() if self.asymmetric_obs: self.compute_full_state(True)
[docs] def compute_full_state(self, asymm_obs=False): """ Compute the observations of all environment. The observation is composed of three parts: the state values of the left and right hands, and the information of objects and target. The state values of the left and right hands were the same for each task, including hand joint and finger positions, velocity, and force information. The detail 422-dimensional observational space as shown in below: Index Description 0 - 23 right shadow hand dof position 24 - 47 right shadow hand dof velocity 48 - 71 right shadow hand dof force 72 - 136 right shadow hand fingertip pose, linear velocity, angle velocity (5 x 13) 137 - 166 right shadow hand fingertip force, torque (5 x 6) 167 - 169 right shadow hand base position 170 - 172 right shadow hand base rotation 173 - 198 right shadow hand actions 199 - 222 left shadow hand dof position 223 - 246 left shadow hand dof velocity 247 - 270 left shadow hand dof force 271 - 335 left shadow hand fingertip pose, linear velocity, angle velocity (5 x 13) 336 - 365 left shadow hand fingertip force, torque (5 x 6) 366 - 368 left shadow hand base position 369 - 371 left shadow hand base rotation 372 - 397 left shadow hand actions 398 - 404 object pose 405 - 407 object linear velocity 408 - 410 object angle velocity 411 - 417 goal pose 418 - 421 goal rot - object rot """ # Fingertip observations, state(pose and vel) + force-torque sensors num_ft_states = 13 * int(self.num_fingertips / 2) num_ft_force_torques = 6 * int(self.num_fingertips / 2) self.obs_buf[:, 0:self.num_shadow_hand_dofs] = unscale(self.shadow_hand_dof_pos, self.shadow_hand_dof_lower_limits, self.shadow_hand_dof_upper_limits) self.obs_buf[:, self.num_shadow_hand_dofs:2 * self.num_shadow_hand_dofs] = self.vel_obs_scale * self.shadow_hand_dof_vel self.obs_buf[:, 2 * self.num_shadow_hand_dofs:3 * self.num_shadow_hand_dofs] = self.force_torque_obs_scale * self.dof_force_tensor[ :, :24] fingertip_obs_start = 72 self.obs_buf[:, fingertip_obs_start:fingertip_obs_start + num_ft_states] = self.fingertip_state.reshape( self.num_envs, num_ft_states) self.obs_buf[:, fingertip_obs_start + num_ft_states:fingertip_obs_start + num_ft_states + num_ft_force_torques] = self.force_torque_obs_scale * self.vec_sensor_tensor[ :, :30] hand_pose_start = fingertip_obs_start + 95 self.obs_buf[:, hand_pose_start:hand_pose_start + 3] = self.hand_positions[self.hand_indices, :] self.obs_buf[:, hand_pose_start + 3:hand_pose_start + 4] = \ get_euler_xyz(self.hand_orientations[self.hand_indices, :])[0].unsqueeze(-1) self.obs_buf[:, hand_pose_start + 4:hand_pose_start + 5] = \ get_euler_xyz(self.hand_orientations[self.hand_indices, :])[1].unsqueeze(-1) self.obs_buf[:, hand_pose_start + 5:hand_pose_start + 6] = \ get_euler_xyz(self.hand_orientations[self.hand_indices, :])[2].unsqueeze(-1) action_obs_start = hand_pose_start + 6 self.obs_buf[:, action_obs_start:action_obs_start + 26] = self.actions[:, :26] another_hand_start = action_obs_start + 26 self.obs_buf[:, another_hand_start:self.num_shadow_hand_dofs + another_hand_start] = unscale( self.shadow_hand_another_dof_pos, self.shadow_hand_dof_lower_limits, self.shadow_hand_dof_upper_limits) self.obs_buf[:, self.num_shadow_hand_dofs + another_hand_start:2 * self.num_shadow_hand_dofs + another_hand_start] = self.vel_obs_scale * self.shadow_hand_another_dof_vel self.obs_buf[:, 2 * self.num_shadow_hand_dofs + another_hand_start:3 * self.num_shadow_hand_dofs + another_hand_start] = self.force_torque_obs_scale * self.dof_force_tensor[ :, 24:48] fingertip_another_obs_start = another_hand_start + 72 self.obs_buf[:, fingertip_another_obs_start:fingertip_another_obs_start + num_ft_states] = self.fingertip_another_state.reshape( self.num_envs, num_ft_states) self.obs_buf[:, fingertip_another_obs_start + num_ft_states:fingertip_another_obs_start + num_ft_states + num_ft_force_torques] = self.force_torque_obs_scale * self.vec_sensor_tensor[ :, 30:] hand_another_pose_start = fingertip_another_obs_start + 95 self.obs_buf[:, hand_another_pose_start:hand_another_pose_start + 3] = self.hand_positions[ self.another_hand_indices, :] self.obs_buf[:, hand_another_pose_start + 3:hand_another_pose_start + 4] = \ get_euler_xyz(self.hand_orientations[self.another_hand_indices, :])[0].unsqueeze(-1) self.obs_buf[:, hand_another_pose_start + 4:hand_another_pose_start + 5] = \ get_euler_xyz(self.hand_orientations[self.another_hand_indices, :])[1].unsqueeze(-1) self.obs_buf[:, hand_another_pose_start + 5:hand_another_pose_start + 6] = \ get_euler_xyz(self.hand_orientations[self.another_hand_indices, :])[2].unsqueeze(-1) action_another_obs_start = hand_another_pose_start + 6 self.obs_buf[:, action_another_obs_start:action_another_obs_start + 26] = self.actions[:, 26:] obj_obs_start = action_another_obs_start + 26 self.obs_buf[:, obj_obs_start:obj_obs_start + 7] = self.object_pose self.obs_buf[:, obj_obs_start + 7:obj_obs_start + 10] = self.object_linvel self.obs_buf[:, obj_obs_start + 10:obj_obs_start + 13] = self.vel_obs_scale * self.object_angvel goal_obs_start = obj_obs_start + 13 self.obs_buf[:, goal_obs_start:goal_obs_start + 7] = self.goal_pose self.obs_buf[:, goal_obs_start + 7:goal_obs_start + 11] = quat_mul(self.object_rot, quat_conjugate(self.goal_rot))
[docs] def compute_point_cloud_observation(self, collect_demonstration=False): """ Compute the observations of all environment. The observation is composed of three parts: the state values of the left and right hands, and the information of objects and target. The state values of the left and right hands were the same for each task, including hand joint and finger positions, velocity, and force information. The detail 422-dimensional observational space as shown in below: Index Description 0 - 23 right shadow hand dof position 24 - 47 right shadow hand dof velocity 48 - 71 right shadow hand dof force 72 - 136 right shadow hand fingertip pose, linear velocity, angle velocity (5 x 13) 137 - 166 right shadow hand fingertip force, torque (5 x 6) 167 - 169 right shadow hand base position 170 - 172 right shadow hand base rotation 173 - 198 right shadow hand actions 199 - 222 left shadow hand dof position 223 - 246 left shadow hand dof velocity 247 - 270 left shadow hand dof force 271 - 335 left shadow hand fingertip pose, linear velocity, angle velocity (5 x 13) 336 - 365 left shadow hand fingertip force, torque (5 x 6) 366 - 368 left shadow hand base position 369 - 371 left shadow hand base rotation 372 - 397 left shadow hand actions 398 - 404 object pose 405 - 407 object linear velocity 408 - 410 object angle velocity 411 - 417 goal pose 418 - 421 goal rot - object rot """ # Fingertip observations, state(pose and vel) + force-torque sensors num_ft_states = 13 * int(self.num_fingertips / 2) num_ft_force_torques = 6 * int(self.num_fingertips / 2) self.obs_buf[:, 0:self.num_shadow_hand_dofs] = unscale(self.shadow_hand_dof_pos, self.shadow_hand_dof_lower_limits, self.shadow_hand_dof_upper_limits) self.obs_buf[:, self.num_shadow_hand_dofs:2 * self.num_shadow_hand_dofs] = self.vel_obs_scale * self.shadow_hand_dof_vel self.obs_buf[:, 2 * self.num_shadow_hand_dofs:3 * self.num_shadow_hand_dofs] = self.force_torque_obs_scale * self.dof_force_tensor[ :, :24] fingertip_obs_start = 72 self.obs_buf[:, fingertip_obs_start:fingertip_obs_start + num_ft_states] = self.fingertip_state.reshape( self.num_envs, num_ft_states) self.obs_buf[:, fingertip_obs_start + num_ft_states:fingertip_obs_start + num_ft_states + num_ft_force_torques] = self.force_torque_obs_scale * self.vec_sensor_tensor[ :, :30] hand_pose_start = fingertip_obs_start + 95 self.obs_buf[:, hand_pose_start:hand_pose_start + 3] = self.hand_positions[self.hand_indices, :] self.obs_buf[:, hand_pose_start + 3:hand_pose_start + 4] = \ get_euler_xyz(self.hand_orientations[self.hand_indices, :])[0].unsqueeze(-1) self.obs_buf[:, hand_pose_start + 4:hand_pose_start + 5] = \ get_euler_xyz(self.hand_orientations[self.hand_indices, :])[1].unsqueeze(-1) self.obs_buf[:, hand_pose_start + 5:hand_pose_start + 6] = \ get_euler_xyz(self.hand_orientations[self.hand_indices, :])[2].unsqueeze(-1) action_obs_start = hand_pose_start + 6 self.obs_buf[:, action_obs_start:action_obs_start + 26] = self.actions[:, :26] another_hand_start = action_obs_start + 26 self.obs_buf[:, another_hand_start:self.num_shadow_hand_dofs + another_hand_start] = unscale( self.shadow_hand_another_dof_pos, self.shadow_hand_dof_lower_limits, self.shadow_hand_dof_upper_limits) self.obs_buf[:, self.num_shadow_hand_dofs + another_hand_start:2 * self.num_shadow_hand_dofs + another_hand_start] = self.vel_obs_scale * self.shadow_hand_another_dof_vel self.obs_buf[:, 2 * self.num_shadow_hand_dofs + another_hand_start:3 * self.num_shadow_hand_dofs + another_hand_start] = self.force_torque_obs_scale * self.dof_force_tensor[ :, 24:48] fingertip_another_obs_start = another_hand_start + 72 self.obs_buf[:, fingertip_another_obs_start:fingertip_another_obs_start + num_ft_states] = self.fingertip_another_state.reshape( self.num_envs, num_ft_states) self.obs_buf[:, fingertip_another_obs_start + num_ft_states:fingertip_another_obs_start + num_ft_states + num_ft_force_torques] = self.force_torque_obs_scale * self.vec_sensor_tensor[ :, 30:] hand_another_pose_start = fingertip_another_obs_start + 95 self.obs_buf[:, hand_another_pose_start:hand_another_pose_start + 3] = self.hand_positions[ self.another_hand_indices, :] self.obs_buf[:, hand_another_pose_start + 3:hand_another_pose_start + 4] = \ get_euler_xyz(self.hand_orientations[self.another_hand_indices, :])[0].unsqueeze(-1) self.obs_buf[:, hand_another_pose_start + 4:hand_another_pose_start + 5] = \ get_euler_xyz(self.hand_orientations[self.another_hand_indices, :])[1].unsqueeze(-1) self.obs_buf[:, hand_another_pose_start + 5:hand_another_pose_start + 6] = \ get_euler_xyz(self.hand_orientations[self.another_hand_indices, :])[2].unsqueeze(-1) action_another_obs_start = hand_another_pose_start + 6 self.obs_buf[:, action_another_obs_start:action_another_obs_start + 26] = self.actions[:, 26:] obj_obs_start = action_another_obs_start + 26 self.obs_buf[:, obj_obs_start:obj_obs_start + 7] = self.object_pose self.obs_buf[:, obj_obs_start + 7:obj_obs_start + 10] = self.object_linvel self.obs_buf[:, obj_obs_start + 10:obj_obs_start + 13] = self.vel_obs_scale * self.object_angvel goal_obs_start = obj_obs_start + 13 self.obs_buf[:, goal_obs_start:goal_obs_start + 7] = self.goal_pose self.obs_buf[:, goal_obs_start + 7:goal_obs_start + 11] = quat_mul(self.object_rot, quat_conjugate(self.goal_rot)) point_clouds = torch.zeros((self.num_envs, self.pointCloudDownsampleNum, 3), device=self.device) if self.camera_debug: import matplotlib.pyplot as plt self.camera_rgba_debug_fig = plt.figure("CAMERA_RGBD_DEBUG") camera_rgba_image = self.camera_visulization(is_depth_image=False) plt.imshow(camera_rgba_image) plt.pause(1e-9) for i in range(self.num_envs): # Here is an example. In practice, it's better not to convert tensor from GPU to CPU points = depth_image_to_point_cloud_GPU(self.camera_tensors[i], self.camera_view_matrixs[i], self.camera_proj_matrixs[i], self.camera_u2, self.camera_v2, self.camera_props.width, self.camera_props.height, 10, self.device) if points.shape[0] > 0: selected_points = self.sample_points(points, sample_num=self.pointCloudDownsampleNum, sample_mathed='random') else: selected_points = torch.zeros((self.num_envs, self.pointCloudDownsampleNum, 3), device=self.device) point_clouds[i] = selected_points if self.pointCloudVisualizer != None: import open3d as o3d points = point_clouds[0, :, :3].cpu().numpy() # colors = plt.get_cmap()(point_clouds[0, :, 3].cpu().numpy()) self.o3d_pc.points = o3d.utility.Vector3dVector(points) # self.o3d_pc.colors = o3d.utility.Vector3dVector(colors[..., :3]) if self.pointCloudVisualizerInitialized == False: self.pointCloudVisualizer.add_geometry(self.o3d_pc) self.pointCloudVisualizerInitialized = True else: self.pointCloudVisualizer.update(self.o3d_pc) self.gym.end_access_image_tensors(self.sim) point_clouds -= self.env_origin.view(self.num_envs, 1, 3) point_clouds_start = goal_obs_start + 11 self.obs_buf[:, point_clouds_start:].copy_(point_clouds.view(self.num_envs, self.pointCloudDownsampleNum * 3))
[docs] def reset_target_pose(self, env_ids, apply_reset=False): """ Reset and randomize the goal pose Args: env_ids (tensor): The index of the environment that needs to reset goal pose apply_reset (bool): Whether to reset the goal directly here, usually used when the same task wants to complete multiple goals """ rand_floats = torch_rand_float(-1.0, 1.0, (len(env_ids), 4), device=self.device) new_rot = randomize_rotation(rand_floats[:, 0], rand_floats[:, 1], self.x_unit_tensor[env_ids], self.y_unit_tensor[env_ids]) self.goal_states[env_ids, 0:3] = self.goal_init_state[env_ids, 0:3] self.goal_states[env_ids, 1] -= 0.4 self.goal_states[env_ids, 3:7] = new_rot self.root_state_tensor[self.goal_object_indices[env_ids], 0:3] = self.goal_states[env_ids, 0:3] + self.goal_displacement_tensor self.root_state_tensor[self.goal_object_indices[env_ids], 3:7] = self.goal_states[env_ids, 3:7] self.root_state_tensor[self.goal_object_indices[env_ids], 7:13] = torch.zeros_like( self.root_state_tensor[self.goal_object_indices[env_ids], 7:13]) if apply_reset: goal_object_indices = self.goal_object_indices[env_ids].to(torch.int32) # Sets actor root state buffer to values provided for given actor indices. Full actor root states buffer should # be provided for all actors. self.gym.set_actor_root_state_tensor_indexed(self.sim, gymtorch.unwrap_tensor(self.root_state_tensor), gymtorch.unwrap_tensor(goal_object_indices), len(env_ids)) self.reset_goal_buf[env_ids] = 0
[docs] def reset_idx(self, env_ids, goal_env_ids): """ Reset and randomize the environment Args: env_ids (tensor): The index of the environment that needs to reset goal_env_ids (tensor): The index of the environment that only goals need reset """ # Randomization can happen only at reset time, since it can reset actor positions on GPU if self.randomize: self.apply_randomizations(self.randomization_params) # Generate random values rand_floats = torch_rand_float(-1.0, 1.0, (len(env_ids), self.num_shadow_hand_dofs * 2 + 5), device=self.device) # Reset and randomize start object poses self.reset_target_pose(env_ids) # Reset object and goal self.root_state_tensor[self.object_indices[env_ids]] = self.object_init_state[env_ids].clone() self.root_state_tensor[self.object_indices[env_ids], 0:2] = self.object_init_state[env_ids, 0:2] + \ self.reset_position_noise * rand_floats[:, 0:2] self.root_state_tensor[self.object_indices[env_ids], self.up_axis_idx] = self.object_init_state[ env_ids, self.up_axis_idx] + \ self.reset_position_noise * rand_floats[ :, self.up_axis_idx] new_object_rot = randomize_rotation(rand_floats[:, 3], rand_floats[:, 4], self.x_unit_tensor[env_ids], self.y_unit_tensor[env_ids]) if self.object_type == "pen": rand_angle_y = torch.tensor(0.3) new_object_rot = randomize_rotation_pen(rand_floats[:, 3], rand_floats[:, 4], rand_angle_y, self.x_unit_tensor[env_ids], self.y_unit_tensor[env_ids], self.z_unit_tensor[env_ids]) self.root_state_tensor[self.object_indices[env_ids], 3:7] = new_object_rot self.root_state_tensor[self.object_indices[env_ids], 7:13] = torch.zeros_like( self.root_state_tensor[self.object_indices[env_ids], 7:13]) object_indices = torch.unique(torch.cat([self.object_indices[env_ids], self.goal_object_indices[env_ids], self.goal_object_indices[goal_env_ids]]).to(torch.int32)) # Reset shadow hand delta_max = self.shadow_hand_dof_upper_limits - self.shadow_hand_dof_default_pos delta_min = self.shadow_hand_dof_lower_limits - self.shadow_hand_dof_default_pos rand_delta = delta_min + (delta_max - delta_min) * rand_floats[:, 5:5 + self.num_shadow_hand_dofs] pos = self.shadow_hand_default_dof_pos + self.reset_dof_pos_noise * rand_delta self.shadow_hand_dof_pos[env_ids, :] = pos self.shadow_hand_another_dof_pos[env_ids, :] = pos self.shadow_hand_dof_vel[env_ids, :] = self.shadow_hand_dof_default_vel + \ self.reset_dof_vel_noise * rand_floats[:, 5 + self.num_shadow_hand_dofs:5 + self.num_shadow_hand_dofs * 2] self.shadow_hand_another_dof_vel[env_ids, :] = self.shadow_hand_dof_default_vel + \ self.reset_dof_vel_noise * rand_floats[:, 5 + self.num_shadow_hand_dofs:5 + self.num_shadow_hand_dofs * 2] self.prev_targets[env_ids, :self.num_shadow_hand_dofs] = pos self.cur_targets[env_ids, :self.num_shadow_hand_dofs] = pos self.prev_targets[env_ids, self.num_shadow_hand_dofs:self.num_shadow_hand_dofs * 2] = pos self.cur_targets[env_ids, self.num_shadow_hand_dofs:self.num_shadow_hand_dofs * 2] = pos hand_indices = self.hand_indices[env_ids].to(torch.int32) another_hand_indices = self.another_hand_indices[env_ids].to(torch.int32) all_hand_indices = torch.unique(torch.cat([hand_indices, another_hand_indices]).to(torch.int32)) # Sets DOF position targets to values provided for given actor indices. Full DOF position targets buffer should # be provided for all actors. For presimatic DOF, target is in meters. For revolute DOF, target is in radians. self.gym.set_dof_position_target_tensor_indexed(self.sim, gymtorch.unwrap_tensor(self.prev_targets), gymtorch.unwrap_tensor(all_hand_indices), len(all_hand_indices)) # Reset all position and velocity value in the environment self.hand_positions[all_hand_indices.to(torch.long), :] = self.saved_root_tensor[ all_hand_indices.to(torch.long), 0:3] self.hand_orientations[all_hand_indices.to(torch.long), :] = self.saved_root_tensor[ all_hand_indices.to(torch.long), 3:7] all_indices = torch.unique(torch.cat([all_hand_indices, object_indices]).to(torch.int32)) # Sets DOF state buffer to values provided for given actor indices. Full DOF state buffer should be provided for # all actors. DOF state includes position in meters for prismatic DOF, or radians for revolute DOF, and velocity # in m/s for prismatic DOF and rad/s for revolute DOF. self.gym.set_dof_state_tensor_indexed(self.sim, gymtorch.unwrap_tensor(self.dof_state), gymtorch.unwrap_tensor(all_hand_indices), len(all_hand_indices)) # Sets actor root state buffer to values provided for given actor indices. Full actor root states buffer should # be provided for all actors. self.gym.set_actor_root_state_tensor_indexed(self.sim, gymtorch.unwrap_tensor(self.root_state_tensor), gymtorch.unwrap_tensor(all_indices), len(all_indices)) self.progress_buf[env_ids] = 0 self.reset_buf[env_ids] = 0 self.successes[env_ids] = 0
[docs] def pre_physics_step(self, actions): """ The pre-processing of the physics step. Determine whether the reset environment is needed, and calculate the next movement of Shadowhand through the given action. The 52-dimensional action space as shown in below: Index Description 0 - 19 right shadow hand actuated joint 20 - 22 right shadow hand base translation 23 - 25 right shadow hand base rotation 26 - 45 left shadow hand actuated joint 46 - 48 left shadow hand base translation 49 - 51 left shadow hand base rotatio Args: actions (tensor): Actions of agents in the all environment """ env_ids = self.reset_buf.nonzero(as_tuple=False).squeeze(-1) goal_env_ids = self.reset_goal_buf.nonzero(as_tuple=False).squeeze(-1) # If only goals need reset, then call set API if len(goal_env_ids) > 0 and len(env_ids) == 0: self.reset_target_pose(goal_env_ids, apply_reset=True) # If goals need reset in addition to other envs, call set API in reset() elif len(goal_env_ids) > 0: self.reset_target_pose(goal_env_ids) if len(env_ids) > 0: self.reset_idx(env_ids, goal_env_ids) # Calculate the next movement of Shadowhand self.actions = actions.clone().to(self.device) if self.use_relative_control: targets = self.prev_targets[:, self.actuated_dof_indices] + self.shadow_hand_dof_speed_scale * self.dt * self.actions self.cur_targets[:, self.actuated_dof_indices] = tensor_clamp(targets, self.shadow_hand_dof_lower_limits[ self.actuated_dof_indices], self.shadow_hand_dof_upper_limits[ self.actuated_dof_indices]) else: self.cur_targets[:, self.actuated_dof_indices] = scale(self.actions[:, 6:26], self.shadow_hand_dof_lower_limits[ self.actuated_dof_indices], self.shadow_hand_dof_upper_limits[ self.actuated_dof_indices]) self.cur_targets[:, self.actuated_dof_indices] = self.act_moving_average * self.cur_targets[:, self.actuated_dof_indices] + ( 1.0 - self.act_moving_average) * self.prev_targets[ :, self.actuated_dof_indices] self.cur_targets[:, self.actuated_dof_indices] = tensor_clamp( self.cur_targets[:, self.actuated_dof_indices], self.shadow_hand_dof_lower_limits[self.actuated_dof_indices], self.shadow_hand_dof_upper_limits[self.actuated_dof_indices]) self.cur_targets[:, self.actuated_dof_indices + 24] = scale(self.actions[:, 32:52], self.shadow_hand_dof_lower_limits[ self.actuated_dof_indices], self.shadow_hand_dof_upper_limits[ self.actuated_dof_indices]) self.cur_targets[:, self.actuated_dof_indices + 24] = self.act_moving_average * self.cur_targets[:, self.actuated_dof_indices + 24] + ( 1.0 - self.act_moving_average) * self.prev_targets[ :, self.actuated_dof_indices] self.cur_targets[:, self.actuated_dof_indices + 24] = tensor_clamp( self.cur_targets[:, self.actuated_dof_indices + 24], self.shadow_hand_dof_lower_limits[self.actuated_dof_indices], self.shadow_hand_dof_upper_limits[self.actuated_dof_indices]) self.apply_forces[:, 1, :] = self.actions[:, 0:3] * self.dt * self.transition_scale * 100000 self.apply_forces[:, 1 + 26, :] = self.actions[:, 26:29] * self.dt * self.transition_scale * 100000 self.apply_torque[:, 1, :] = self.actions[:, 3:6] * self.dt * self.orientation_scale * 1000 self.apply_torque[:, 1 + 26, :] = self.actions[:, 29:32] * self.dt * self.orientation_scale * 1000 # Applies forces and torques to Shadowhand's base for the immediate timestep, in Newtons self.gym.apply_rigid_body_force_tensors(self.sim, gymtorch.unwrap_tensor(self.apply_forces), gymtorch.unwrap_tensor(self.apply_torque), gymapi.ENV_SPACE) self.prev_targets[:, self.actuated_dof_indices] = self.cur_targets[:, self.actuated_dof_indices] self.prev_targets[:, self.actuated_dof_indices + 24] = self.cur_targets[:, self.actuated_dof_indices + 24] # Sets DOF position targets to values provided for all DOFs in simulation. For presimatic DOF, # target is in meters. For revolute DOF, target is in radians. self.gym.set_dof_position_target_tensor(self.sim, gymtorch.unwrap_tensor(self.cur_targets))
[docs] def post_physics_step(self): """ The post-processing of the physics step. Compute the observation and reward, and visualize auxiliary lines for debug when needed """ self.progress_buf += 1 self.randomize_buf += 1 self.compute_observations() self.compute_reward(self.actions) # Draw axes on target object if self.viewer and self.debug_viz: self.gym.clear_lines(self.viewer) self.gym.refresh_rigid_body_state_tensor(self.sim) for i in range(self.num_envs): targetx = (self.goal_pos[i] + quat_apply(self.goal_rot[i], to_torch([1, 0, 0], device=self.device) * 0.2)).cpu().numpy() targety = (self.goal_pos[i] + quat_apply(self.goal_rot[i], to_torch([0, 1, 0], device=self.device) * 0.2)).cpu().numpy() targetz = (self.goal_pos[i] + quat_apply(self.goal_rot[i], to_torch([0, 0, 1], device=self.device) * 0.2)).cpu().numpy() p0 = self.goal_pos[i].cpu().numpy() + self.goal_displacement_tensor.cpu().numpy() self.gym.add_lines(self.viewer, self.envs[i], 1, [p0[0], p0[1], p0[2], targetx[0], targetx[1], targetx[2]], [0.85, 0.1, 0.1]) self.gym.add_lines(self.viewer, self.envs[i], 1, [p0[0], p0[1], p0[2], targety[0], targety[1], targety[2]], [0.1, 0.85, 0.1]) self.gym.add_lines(self.viewer, self.envs[i], 1, [p0[0], p0[1], p0[2], targetz[0], targetz[1], targetz[2]], [0.1, 0.1, 0.85]) objectx = (self.object_pos[i] + quat_apply(self.object_rot[i], to_torch([1, 0, 0], device=self.device) * 0.2)).cpu().numpy() objecty = (self.object_pos[i] + quat_apply(self.object_rot[i], to_torch([0, 1, 0], device=self.device) * 0.2)).cpu().numpy() objectz = (self.object_pos[i] + quat_apply(self.object_rot[i], to_torch([0, 0, 1], device=self.device) * 0.2)).cpu().numpy() p0 = self.object_pos[i].cpu().numpy() self.gym.add_lines(self.viewer, self.envs[i], 1, [p0[0], p0[1], p0[2], objectx[0], objectx[1], objectx[2]], [0.85, 0.1, 0.1]) self.gym.add_lines(self.viewer, self.envs[i], 1, [p0[0], p0[1], p0[2], objecty[0], objecty[1], objecty[2]], [0.1, 0.85, 0.1]) self.gym.add_lines(self.viewer, self.envs[i], 1, [p0[0], p0[1], p0[2], objectz[0], objectz[1], objectz[2]], [0.1, 0.1, 0.85])
[docs] def rand_row(self, tensor, dim_needed): row_total = tensor.shape[0] return tensor[torch.randint(low=0, high=row_total, size=(dim_needed,)), :]
[docs] def sample_points(self, points, sample_num=1000, sample_mathed='furthest'): eff_points = points[points[:, 2] > 0.04] if eff_points.shape[0] < sample_num: eff_points = points if sample_mathed == 'random': sampled_points = self.rand_row(eff_points, sample_num) elif sample_mathed == 'furthest': sampled_points_id = pointnet2_utils.furthest_point_sample(eff_points.reshape(1, *eff_points.shape), sample_num) sampled_points = eff_points.index_select(0, sampled_points_id[0].long()) return sampled_points
[docs] def camera_visulization(self, is_depth_image=False): if is_depth_image: camera_depth_tensor = self.gym.get_camera_image_gpu_tensor(self.sim, self.envs[0], self.cameras[0], gymapi.IMAGE_DEPTH) torch_depth_tensor = gymtorch.wrap_tensor(camera_depth_tensor) torch_depth_tensor = torch.clamp(torch_depth_tensor, -1, 1) torch_depth_tensor = scale(torch_depth_tensor, to_torch([0], dtype=torch.float, device=self.device), to_torch([256], dtype=torch.float, device=self.device)) camera_image = torch_depth_tensor.cpu().numpy() camera_image = Im.fromarray(camera_image) else: camera_rgba_tensor = self.gym.get_camera_image_gpu_tensor(self.sim, self.envs[0], self.cameras[0], gymapi.IMAGE_COLOR) torch_rgba_tensor = gymtorch.wrap_tensor(camera_rgba_tensor) camera_image = torch_rgba_tensor.cpu().numpy() camera_image = Im.fromarray(camera_image) return camera_image
##################################################################### ###=========================jit functions=========================### ##################################################################### @torch.jit.script def depth_image_to_point_cloud_GPU(camera_tensor, camera_view_matrix_inv, camera_proj_matrix, u, v, width: float, height: float, depth_bar: float, device: torch.device): # time1 = time.time() depth_buffer = camera_tensor.to(device) # Get the camera view matrix and invert it to transform points from camera to world space vinv = camera_view_matrix_inv # Get the camera projection matrix and get the necessary scaling # coefficients for deprojection proj = camera_proj_matrix fu = 2 / proj[0, 0] fv = 2 / proj[1, 1] centerU = width / 2 centerV = height / 2 Z = depth_buffer X = -(u - centerU) / width * Z * fu Y = (v - centerV) / height * Z * fv Z = Z.view(-1) valid = Z > -depth_bar X = X.view(-1) Y = Y.view(-1) position = torch.vstack((X, Y, Z, torch.ones(len(X), device=device)))[:, valid] position = position.permute(1, 0) position = position @ vinv points = position[:, 0:3] return points @torch.jit.script def compute_hand_reward( rew_buf, reset_buf, reset_goal_buf, progress_buf, successes, consecutive_successes, max_episode_length: float, object_pos, object_rot, target_pos, target_rot, dist_reward_scale: float, rot_reward_scale: float, rot_eps: float, actions, action_penalty_scale: float, success_tolerance: float, reach_goal_bonus: float, fall_dist: float, fall_penalty: float, max_consecutive_successes: int, av_factor: float, ignore_z_rot: bool ): """ Compute the reward of all environment. Args: rew_buf (tensor): The reward buffer of all environments at this time reset_buf (tensor): The reset buffer of all environments at this time reset_goal_buf (tensor): The only-goal reset buffer of all environments at this time progress_buf (tensor): The porgress buffer of all environments at this time successes (tensor): The successes buffer of all environments at this time consecutive_successes (tensor): The consecutive successes buffer of all environments at this time max_episode_length (float): The max episode length in this environment object_pos (tensor): The position of the object object_rot (tensor): The rotation of the object target_pos (tensor): The position of the target target_rot (tensor): The rotate of the target dist_reward_scale (float): The scale of the distance reward rot_reward_scale (float): The scale of the rotation reward rot_eps (float): The epsilon of the rotation calculate actions (tensor): The action buffer of all environments at this time action_penalty_scale (float): The scale of the action penalty reward success_tolerance (float): The tolerance of the success determined reach_goal_bonus (float): The reward given when the object reaches the goal fall_dist (float): When the object is far from the Shadowhand, it is judged as falling fall_penalty (float): The reward given when the object is fell max_consecutive_successes (float): The maximum of the consecutive successes av_factor (float): The average factor for calculate the consecutive successes ignore_z_rot (bool): Is it necessary to ignore the rot of the z-axis, which is usually used for some specific objects (e.g. pen) """ # Distance from the hand to the object goal_dist = torch.norm(target_pos - object_pos, p=2, dim=-1) if ignore_z_rot: success_tolerance = 2.0 * success_tolerance # Orientation alignment for the cube in hand and goal cube quat_diff = quat_mul(object_rot, quat_conjugate(target_rot)) rot_dist = 2.0 * torch.asin(torch.clamp(torch.norm(quat_diff[:, 0:3], p=2, dim=-1), max=1.0)) dist_rew = goal_dist action_penalty = torch.sum(actions ** 2, dim=-1) # Total reward is: position distance + orientation alignment + action regularization + success bonus + fall penalty reward = torch.exp(-0.2 * (dist_rew * dist_reward_scale + rot_dist)) # Find out which envs hit the goal and update successes count goal_resets = torch.where(torch.abs(goal_dist) <= 0, torch.ones_like(reset_goal_buf), reset_goal_buf) successes = torch.where(successes == 0, torch.where(goal_dist < 0.03, torch.ones_like(successes), successes), successes) # Fall penalty: distance to the goal is larger than a threashold reward = torch.where(object_pos[:, 2] <= 0.1, reward + fall_penalty, reward) # Check env termination conditions, including maximum success number resets = torch.where(object_pos[:, 2] <= 0.1, torch.ones_like(reset_buf), reset_buf) if max_consecutive_successes > 0: # Reset progress buffer on goal envs if max_consecutive_successes > 0 progress_buf = torch.where(torch.abs(rot_dist) <= success_tolerance, torch.zeros_like(progress_buf), progress_buf) resets = torch.where(successes >= max_consecutive_successes, torch.ones_like(resets), resets) resets = torch.where(progress_buf >= max_episode_length, torch.ones_like(resets), resets) # Apply penalty for not reaching the goal if max_consecutive_successes > 0: reward = torch.where(progress_buf >= max_episode_length, reward + 0.5 * fall_penalty, reward) num_resets = torch.sum(resets) finished_cons_successes = torch.sum(successes * resets.float()) cons_successes = torch.where(resets > 0, successes * resets, consecutive_successes) return reward, resets, goal_resets, progress_buf, successes, cons_successes @torch.jit.script def randomize_rotation(rand0, rand1, x_unit_tensor, y_unit_tensor): return quat_mul(quat_from_angle_axis(rand0 * np.pi, x_unit_tensor), quat_from_angle_axis(rand1 * np.pi, y_unit_tensor)) @torch.jit.script def randomize_rotation_pen(rand0, rand1, max_angle, x_unit_tensor, y_unit_tensor, z_unit_tensor): rot = quat_mul(quat_from_angle_axis(0.5 * np.pi + rand0 * max_angle, x_unit_tensor), quat_from_angle_axis(rand0 * np.pi, z_unit_tensor)) return rot