# Copyright 2023, Junjia LIU, jjliu@mae.cuhk.edu.hk
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import random
from enum import Enum
from isaacgym import gymapi
from isaacgym import gymtorch
from isaacgym.torch_utils import *
import rofunc as rf
from rofunc.learning.RofuncRL.tasks.isaacgymenv.base.vec_task import VecTask
from rofunc.learning.RofuncRL.tasks.utils import torch_jit_utils as torch_utils
from rofunc.utils.oslab import get_rofunc_path
PERTURB_OBJS = [
["small", 60],
# ["large", 60],
]
[docs]class Humanoid_SMPLX(VecTask):
def __init__(self, cfg, rl_device, sim_device, graphics_device_id, headless, virtual_screen_capture, force_render):
self.cfg = cfg
self._pd_control = self.cfg["env"]["pdControl"]
self.power_scale = self.cfg["env"]["powerScale"]
self.debug_viz = self.cfg["env"]["enableDebugVis"]
self.plane_static_friction = self.cfg["env"]["plane"]["staticFriction"]
self.plane_dynamic_friction = self.cfg["env"]["plane"]["dynamicFriction"]
self.plane_restitution = self.cfg["env"]["plane"]["restitution"]
self.ref_hoi_obs_size = 324 + len(self.cfg["env"]["keyBodies"]) * 3
self._local_root_obs = self.cfg["env"]["localRootObs"]
self._root_height_obs = self.cfg["env"].get("rootHeightObs", True)
self._enable_early_termination = self.cfg["env"]["enableEarlyTermination"]
self.camera_follow = self.cfg["env"].get("cameraFollow", False)
# self.max_episode_length = self.cfg["env"]["episodeLength"]
key_bodies = self.cfg["env"]["keyBodies"]
self._setup_character_props(key_bodies)
self.cfg["env"]["numObservations"] = self.get_obs_size()
self.cfg["env"]["numActions"] = self.get_action_size()
# self.cfg["device_type"] = device_type
# self.cfg["device_id"] = device_id
# self.cfg["headless"] = headless
# super().__init__(cfg=self.cfg)
super().__init__(config=self.cfg, rl_device=rl_device, sim_device=sim_device,
graphics_device_id=graphics_device_id, headless=headless,
virtual_screen_capture=virtual_screen_capture, force_render=force_render)
dt = self.cfg["sim"]["dt"]
self.dt = self.control_freq_inv * dt
# get gym GPU state tensors
actor_root_state = self.gym.acquire_actor_root_state_tensor(self.sim)
dof_state_tensor = self.gym.acquire_dof_state_tensor(self.sim)
sensor_tensor = self.gym.acquire_force_sensor_tensor(self.sim)
rigid_body_state = self.gym.acquire_rigid_body_state_tensor(self.sim)
contact_force_tensor = self.gym.acquire_net_contact_force_tensor(self.sim)
# sensors_per_env = 2
# self.vec_sensor_tensor = gymtorch.wrap_tensor(sensor_tensor).view(self.num_envs, sensors_per_env * 6)
dof_force_tensor = self.gym.acquire_dof_force_tensor(self.sim)
self.dof_force_tensor = gymtorch.wrap_tensor(dof_force_tensor).view(self.num_envs, self.num_dof)
self.gym.refresh_dof_state_tensor(self.sim)
self.gym.refresh_actor_root_state_tensor(self.sim)
self.gym.refresh_rigid_body_state_tensor(self.sim)
self.gym.refresh_net_contact_force_tensor(self.sim)
self._root_states = gymtorch.wrap_tensor(actor_root_state)
num_actors = self.get_num_actors_per_env()
self._humanoid_root_states = self._root_states.view(
self.num_envs, num_actors, actor_root_state.shape[-1])[..., 0, :]
self._initial_humanoid_root_states = self._humanoid_root_states.clone()
self._initial_humanoid_root_states[:, 7:13] = 0
self._humanoid_actor_ids = num_actors * torch.arange(self.num_envs, device=self.device, dtype=torch.int32)
# create some wrapper tensors for different slices
self._dof_state = gymtorch.wrap_tensor(dof_state_tensor)
dofs_per_env = self._dof_state.shape[0] // self.num_envs
self._dof_pos = self._dof_state.view(self.num_envs, dofs_per_env, 2)[..., :self.num_dof, 0]
self._dof_vel = self._dof_state.view(self.num_envs, dofs_per_env, 2)[..., :self.num_dof, 1]
self._initial_dof_pos = torch.zeros_like(self._dof_pos, device=self.device, dtype=torch.float)
self._initial_dof_vel = torch.zeros_like(self._dof_vel, device=self.device, dtype=torch.float)
self._rigid_body_state = gymtorch.wrap_tensor(rigid_body_state)
bodies_per_env = self._rigid_body_state.shape[0] // self.num_envs
rigid_body_state_reshaped = self._rigid_body_state.view(self.num_envs, bodies_per_env, 13)
self._rigid_body_pos = rigid_body_state_reshaped[..., :self.num_bodies, 0:3]
self._rigid_body_rot = rigid_body_state_reshaped[..., :self.num_bodies, 3:7]
self._rigid_body_vel = rigid_body_state_reshaped[..., :self.num_bodies, 7:10]
self._rigid_body_ang_vel = rigid_body_state_reshaped[..., :self.num_bodies, 10:13]
contact_force_tensor = gymtorch.wrap_tensor(contact_force_tensor)
self._contact_forces = contact_force_tensor.view(self.num_envs, bodies_per_env, 3)[..., :self.num_bodies, :]
self._terminate_buf = torch.ones(self.num_envs, device=self.device, dtype=torch.long)
self._build_termination_heights()
contact_bodies = self.cfg["env"]["contactBodies"]
self._key_body_ids = self._build_key_body_ids_tensor(key_bodies)
self._contact_body_ids = self._build_contact_body_ids_tensor(contact_bodies)
if self.viewer != None:
self._init_camera()
[docs] def get_obs_size(self):
return self._num_obs
[docs] def get_action_size(self):
return self._num_actions
[docs] def get_num_actors_per_env(self):
num_actors = self._root_states.shape[0] // self.num_envs
return num_actors
[docs] def create_sim(self):
self.up_axis_idx = 2 # index of up axis: Y=1, Z=2
self.sim = super().create_sim(self.device_id, self.graphics_device_id, self.physics_engine, self.sim_params)
self._create_ground_plane()
self._create_envs(self.num_envs, self.cfg["env"]['envSpacing'], int(np.sqrt(self.num_envs)))
# def reset(self, env_ids=None):
# if (env_ids is None):
# env_ids = to_torch(np.arange(self.num_envs), device=self.device, dtype=torch.long)
# self.reset_idx(env_ids)
[docs] def reset(self):
actions = 0.01 * (1 - 2 * np.random.rand(self.num_envs, self.num_actions)).astype('f')
actions = to_torch(actions, device=self.rl_device, dtype=torch.float)
# step the simulator
obs, rewards, resets, extras = self.step(actions)
return torch.clamp(obs["obs"], -self.clip_obs, self.clip_obs)
[docs] def set_char_color(self, col, env_ids):
for env_id in env_ids:
env_ptr = self.envs[env_id]
handle = self.humanoid_handles[env_id]
for j in range(self.num_bodies):
self.gym.set_rigid_body_color(env_ptr, handle, j, gymapi.MESH_VISUAL,
gymapi.Vec3(col[0], col[1], col[2]))
[docs] def reset_idx(self, env_ids):
if len(env_ids) > 0:
self._reset_actors(env_ids)
self._reset_env_tensors(env_ids)
self._refresh_sim_tensors()
self._compute_observations(env_ids)
def _reset_env_tensors(self, env_ids):
env_ids_int32 = self._humanoid_actor_ids[env_ids]
self.gym.set_actor_root_state_tensor_indexed(self.sim,
gymtorch.unwrap_tensor(self._root_states),
gymtorch.unwrap_tensor(env_ids_int32), len(env_ids_int32))
self.gym.set_dof_state_tensor_indexed(self.sim,
gymtorch.unwrap_tensor(self._dof_state),
gymtorch.unwrap_tensor(env_ids_int32), len(env_ids_int32))
self.progress_buf[env_ids] = self.motion_times.clone()
self.reset_buf[env_ids] = 0
self._terminate_buf[env_ids] = 0
def _create_ground_plane(self):
plane_params = gymapi.PlaneParams()
plane_params.normal = gymapi.Vec3(0.0, 0.0, 1.0)
plane_params.static_friction = self.plane_static_friction
plane_params.dynamic_friction = self.plane_dynamic_friction
plane_params.restitution = self.plane_restitution
self.gym.add_ground(self.sim, plane_params)
def _setup_character_props(self, key_bodies):
asset_file = self.cfg["env"]["asset"]["assetFileName"]
num_key_bodies = len(key_bodies)
if (asset_file == "smplx/smplx_capsule.xml"):
self._dof_obs_size = (51) * 3
self._num_actions = (51) * 3
obj_obs_size = 15
self._num_obs = 1 + (52) * (3 + 6 + 3 + 3) - 3 + 10 * 3 + obj_obs_size + self.ref_hoi_obs_size
else:
raise rf.logger.beauty_print(f"Unsupported character config file: {asset_file}")
def _build_termination_heights(self):
self._termination_heights = 0.3
self._termination_heights = to_torch(self._termination_heights, device=self.device)
[docs] def get_num_amp_obs(self):
return self.ref_hoi_obs_size
def _create_envs(self, num_envs, spacing, num_per_row):
lower = gymapi.Vec3(-spacing, -spacing, 0.0)
upper = gymapi.Vec3(spacing, spacing, spacing)
# get rofunc path from rofunc package metadata
rofunc_path = get_rofunc_path()
asset_root = os.path.join(rofunc_path, "simulator/assets")
# Load humanoid asset
asset_file = self.cfg["env"]["asset"]["assetFileName"]
asset_path = os.path.join(asset_root, asset_file)
asset_root = os.path.dirname(asset_path)
asset_file = os.path.basename(asset_path)
asset_options = gymapi.AssetOptions()
asset_options.angular_damping = 0.01
asset_options.max_angular_velocity = 100.0
asset_options.default_dof_drive_mode = gymapi.DOF_MODE_NONE
# asset_options.fix_base_link = True
# asset_options.disable_gravity = True
humanoid_asset = self.gym.load_asset(self.sim, asset_root, asset_file, asset_options)
self.num_humanoid_bodies = self.gym.get_asset_rigid_body_count(humanoid_asset)
self.num_humanoid_shapes = self.gym.get_asset_rigid_shape_count(humanoid_asset)
actuator_props = self.gym.get_asset_actuator_properties(humanoid_asset)
motor_efforts = [prop.motor_effort for prop in actuator_props]
# create force sensors at the feet
right_foot_idx = self.gym.find_asset_rigid_body_index(humanoid_asset, "right_foot")
left_foot_idx = self.gym.find_asset_rigid_body_index(humanoid_asset, "left_foot")
sensor_pose = gymapi.Transform()
self.gym.create_asset_force_sensor(humanoid_asset, right_foot_idx, sensor_pose)
self.gym.create_asset_force_sensor(humanoid_asset, left_foot_idx, sensor_pose)
self.max_motor_effort = max(motor_efforts)
self.motor_efforts = to_torch(motor_efforts, device=self.device)
self.torso_index = 0
self.num_bodies = self.gym.get_asset_rigid_body_count(humanoid_asset)
self.num_dof = self.gym.get_asset_dof_count(humanoid_asset)
self.num_joints = self.gym.get_asset_joint_count(humanoid_asset)
self.humanoid_handles = []
self.envs = []
self.dof_limits_lower = []
self.dof_limits_upper = []
max_agg_bodies = self.num_humanoid_bodies + 2
max_agg_shapes = self.num_humanoid_shapes + 2
for i in range(self.num_envs):
# create env instance
env_ptr = self.gym.create_env(self.sim, lower, upper, num_per_row)
self.gym.begin_aggregate(env_ptr, max_agg_bodies, max_agg_shapes, True)
self._build_env(i, env_ptr, humanoid_asset)
self.gym.end_aggregate(env_ptr)
self.envs.append(env_ptr)
dof_prop = self.gym.get_actor_dof_properties(self.envs[0], self.humanoid_handles[0])
for j in range(self.num_dof):
if dof_prop["lower"][j] > dof_prop["upper"][j]:
self.dof_limits_lower.append(dof_prop["upper"][j])
self.dof_limits_upper.append(dof_prop["lower"][j])
else:
self.dof_limits_lower.append(dof_prop["lower"][j])
self.dof_limits_upper.append(dof_prop["upper"][j])
self.dof_limits_lower = to_torch(self.dof_limits_lower, device=self.device)
self.dof_limits_upper = to_torch(self.dof_limits_upper, device=self.device)
if self._pd_control:
self._build_pd_action_offset_scale()
def _build_env(self, env_id, env_ptr, humanoid_asset):
col_group = env_id
col_filter = self._get_humanoid_collision_filter()
segmentation_id = 0
start_pose = gymapi.Transform()
char_h = 0.89
start_pose.p = gymapi.Vec3(*get_axis_params(char_h, self.up_axis_idx))
start_pose.r = gymapi.Quat(0.0, 0.0, 0.0, 1.0)
humanoid_handle = self.gym.create_actor(
env_ptr,
humanoid_asset,
start_pose,
"humanoid",
col_group,
col_filter,
segmentation_id,
)
self.gym.enable_actor_dof_force_sensors(env_ptr, humanoid_handle)
for j in range(self.num_bodies):
self.gym.set_rigid_body_color(
env_ptr,
humanoid_handle,
j,
gymapi.MESH_VISUAL,
gymapi.Vec3(0.54, 0.85, 0.2),
)
if self._pd_control:
dof_prop = self.gym.get_asset_dof_properties(humanoid_asset)
dof_prop["driveMode"] = gymapi.DOF_MODE_POS
self.gym.set_actor_dof_properties(env_ptr, humanoid_handle, dof_prop)
self.humanoid_handles.append(humanoid_handle)
def _build_pd_action_offset_scale(self):
lim_low = self.dof_limits_lower.cpu().numpy()
lim_high = self.dof_limits_upper.cpu().numpy()
self._pd_action_offset = 0.5 * (lim_high + lim_low)
self._pd_action_scale = 0.5 * (lim_high - lim_low)
self._pd_action_offset = to_torch(self._pd_action_offset, device=self.device)
self._pd_action_scale = to_torch(self._pd_action_scale, device=self.device)
def _get_humanoid_collision_filter(self):
return 0
def _compute_reward(self, actions):
self.rew_buf[:] = compute_humanoid_reward(
self._curr_ref_obs,
self._curr_obs,
self._contact_forces,
self._tar_contact_forces,
len(self._key_body_ids),
self.reward_weights_p,
self.reward_weights_r,
self.reward_weights_pv,
self.reward_weights_rv,
self.reward_weights_op,
self.reward_weights_or,
self.reward_weights_opv,
self.reward_weights_orv,
self.reward_weights_ig,
self.reward_weights_cg1,
self.reward_weights_cg2,
)
def _compute_reset(self):
self.reset_buf[:], self._terminate_buf[:] = compute_humanoid_reset(self.reset_buf, self.progress_buf,
self._contact_forces,
self._rigid_body_pos,
self.max_episode_length,
self._enable_early_termination,
self._termination_heights,
self._curr_ref_obs, self._curr_obs,
)
def _refresh_sim_tensors(self):
self.gym.refresh_dof_state_tensor(self.sim)
self.gym.refresh_actor_root_state_tensor(self.sim)
self.gym.refresh_rigid_body_state_tensor(self.sim)
self.gym.refresh_force_sensor_tensor(self.sim)
self.gym.refresh_dof_force_tensor(self.sim)
self.gym.refresh_net_contact_force_tensor(self.sim)
def _compute_task_obs(self, env_ids=None):
if (env_ids is None):
root_states = self._humanoid_root_states
tar_states = self._target_states
else:
root_states = self._humanoid_root_states[env_ids]
tar_states = self._target_states[env_ids]
obs = compute_obj_observations(root_states, tar_states)
return obs
def _compute_observations(self, env_ids=None):
obs = self._compute_humanoid_obs(env_ids)
task_obs = self._compute_task_obs(env_ids)
obs = torch.cat([obs, task_obs], dim=-1)
if env_ids is None:
ts = self.progress_buf.clone()
self._curr_ref_obs = self.hoi_data_dict[0]['hoi_data'][ts].clone()
next_ts = torch.clamp(ts + 1, max=self.max_episode_length - 1)
ref_obs = self.hoi_data_dict[0]['hoi_data'][next_ts].clone()
self.obs_buf[:] = torch.cat((obs, ref_obs), dim=-1)
else:
ts = self.progress_buf[env_ids].clone()
self._curr_ref_obs[env_ids] = self.hoi_data_dict[0]['hoi_data'][ts].clone()
next_ts = torch.clamp(ts + 1, max=self.max_episode_length - 1)
ref_obs = self.hoi_data_dict[0]['hoi_data'][next_ts].clone()
self.obs_buf[env_ids] = torch.cat((obs, ref_obs), dim=-1)
def _compute_humanoid_obs(self, env_ids=None):
if env_ids is None:
body_pos = self._rigid_body_pos
body_rot = self._rigid_body_rot
body_vel = self._rigid_body_vel
body_ang_vel = self._rigid_body_ang_vel
contact_forces = self._contact_forces
else:
body_pos = self._rigid_body_pos[env_ids]
body_rot = self._rigid_body_rot[env_ids]
body_vel = self._rigid_body_vel[env_ids]
body_ang_vel = self._rigid_body_ang_vel[env_ids]
contact_forces = self._contact_forces[env_ids]
obs = compute_humanoid_observations_max(body_pos, body_rot, body_vel, body_ang_vel, self._local_root_obs,
self._root_height_obs,
contact_forces, self._contact_body_ids)
return obs
def _reset_actors(self, env_ids):
self._humanoid_root_states[env_ids] = self._initial_humanoid_root_states[env_ids]
self._dof_pos[env_ids] = self._initial_dof_pos[env_ids]
self._dof_vel[env_ids] = self._initial_dof_vel[env_ids]
[docs] def pre_physics_step(self, actions):
self.actions = actions.to(self.device).clone()
if self._pd_control:
pd_tar = self._action_to_pd_targets(self.actions)
pd_tar_tensor = gymtorch.unwrap_tensor(pd_tar)
self.gym.set_dof_position_target_tensor(self.sim, pd_tar_tensor)
else:
forces = self.actions * self.motor_efforts.unsqueeze(0) * self.power_scale
force_tensor = gymtorch.unwrap_tensor(forces)
self.gym.set_dof_actuation_force_tensor(self.sim, force_tensor)
[docs] def post_physics_step(self):
self.progress_buf += 1
self._refresh_sim_tensors()
# env_ids = self.reset_buf.nonzero(as_tuple=False).flatten()
# if len(env_ids) > 0:
# self.reset_idx(env_ids)
# extra calc of self._curr_hoi_obs_buf, for correct calculate of imitation reward
self._compute_hoi_observations()
self._compute_observations()
self._compute_reward(self.actions)
self._compute_reset()
self.extras["terminate"] = self._terminate_buf
# debug viz
if self.viewer and self.debug_viz:
self._update_debug_viz()
[docs] def render(self, sync_frame_time=False):
if self.viewer and self.camera_follow:
self._update_camera()
super().render(sync_frame_time)
def _build_key_body_ids_tensor(self, key_body_names):
env_ptr = self.envs[0]
actor_handle = self.humanoid_handles[0]
body_ids = []
for body_name in key_body_names:
body_id = self.gym.find_actor_rigid_body_handle(
env_ptr, actor_handle, body_name
)
assert body_id != -1
body_ids.append(body_id)
body_ids = to_torch(body_ids, device=self.device, dtype=torch.long)
return body_ids
def _build_contact_body_ids_tensor(self, contact_body_names):
env_ptr = self.envs[0]
actor_handle = self.humanoid_handles[0]
body_ids = []
for body_name in contact_body_names:
body_id = self.gym.find_actor_rigid_body_handle(
env_ptr, actor_handle, body_name
)
assert body_id != -1
body_ids.append(body_id)
body_ids = to_torch(body_ids, device=self.device, dtype=torch.long)
return body_ids
def _action_to_pd_targets(self, action):
pd_tar = self._pd_action_offset + self._pd_action_scale * action
return pd_tar
def _init_camera(self):
self.gym.refresh_actor_root_state_tensor(self.sim)
self._cam_prev_char_pos = self._humanoid_root_states[0, 0:3].cpu().numpy()
cam_pos = gymapi.Vec3(self._cam_prev_char_pos[0],
self._cam_prev_char_pos[1] - 3.0,
1.0)
cam_target = gymapi.Vec3(self._cam_prev_char_pos[0],
self._cam_prev_char_pos[1],
1.0)
self.gym.viewer_camera_look_at(self.viewer, None, cam_pos, cam_target)
def _update_camera(self):
self.gym.refresh_actor_root_state_tensor(self.sim)
char_root_pos = self._humanoid_root_states[0, 0:3].cpu().numpy()
cam_trans = self.gym.get_viewer_camera_transform(self.viewer, None)
cam_pos = np.array([cam_trans.p.x, cam_trans.p.y, cam_trans.p.z])
cam_delta = cam_pos - self._cam_prev_char_pos
new_cam_target = gymapi.Vec3(char_root_pos[0], char_root_pos[1], 1.0)
new_cam_pos = gymapi.Vec3(char_root_pos[0] + cam_delta[0],
char_root_pos[1] + cam_delta[1],
cam_pos[2])
self.gym.viewer_camera_look_at(self.viewer, None, new_cam_pos, new_cam_target)
self._cam_prev_char_pos[:] = char_root_pos
# # # fixed camera
# new_cam_target = gymapi.Vec3(0, 0.5, 1.0)
# new_cam_pos = gymapi.Vec3(1, -1, 1.6)
# self.gym.viewer_camera_look_at(self.viewer, None, new_cam_pos, new_cam_target)
def _update_debug_viz(self):
self.gym.clear_lines(self.viewer)
[docs]class HumanoidPhysHOITask(Humanoid_SMPLX):
[docs] class StateInit(Enum):
Default = 0
Start = 1
Random = 2
Hybrid = 3
def __init__(self, cfg, rl_device, sim_device, graphics_device_id, headless, virtual_screen_capture, force_render):
state_init = cfg["env"]["stateInit"]
self._state_init = HumanoidPhysHOITask.StateInit[state_init]
self._hybrid_init_prob = cfg["env"]["hybridInitProb"]
self._reset_default_env_ids = []
self._reset_ref_env_ids = []
self.motion_file = cfg['env']['motion_file']
self.play_dataset = cfg['env']['playdataset']
self.projtype = cfg['env']['projtype']
self.robot_type = cfg["env"]["asset"]["assetFileName"]
self.reward_weights = cfg["env"]["rewardWeights"]
self.reward_weights_p = cfg["env"]["rewardWeights"]["p"]
self.reward_weights_r = cfg["env"]["rewardWeights"]["r"]
self.reward_weights_pv = cfg["env"]["rewardWeights"]["pv"]
self.reward_weights_rv = cfg["env"]["rewardWeights"]["rv"]
self.reward_weights_op = cfg["env"]["rewardWeights"]["op"]
self.reward_weights_or = cfg["env"]["rewardWeights"]["or"]
self.reward_weights_opv = cfg["env"]["rewardWeights"]["opv"]
self.reward_weights_orv = cfg["env"]["rewardWeights"]["orv"]
self.reward_weights_ig = cfg["env"]["rewardWeights"]["ig"]
self.reward_weights_cg1 = cfg["env"]["rewardWeights"]["cg1"]
self.reward_weights_cg2 = cfg["env"]["rewardWeights"]["cg2"]
self.save_images = cfg['env']['saveImages']
self.init_vel = cfg['env']['initVel']
self.ball_size = cfg['env']['ballSize']
super().__init__(cfg=cfg,
rl_device=rl_device,
sim_device=sim_device,
graphics_device_id=graphics_device_id,
headless=headless,
virtual_screen_capture=virtual_screen_capture,
force_render=force_render)
if rf.oslab.is_absl_path(self.motion_file):
motion_file_path = self.motion_file
elif self.motion_file.split("/")[0] == "examples":
motion_file_path = os.path.join(
os.path.dirname(os.path.abspath(__file__)),
"../../../../../../" + self.motion_file,
)
else:
raise ValueError("Unsupported motion file path")
self.motion_file = motion_file_path
self._load_motion(self.motion_file)
self._curr_ref_obs = torch.zeros((self.num_envs, self.ref_hoi_obs_size), device=self.device, dtype=torch.float)
self._hist_ref_obs = torch.zeros((self.num_envs, self.ref_hoi_obs_size), device=self.device, dtype=torch.float)
self._curr_obs = torch.zeros((self.num_envs, self.ref_hoi_obs_size), device=self.device, dtype=torch.float)
self._hist_obs = torch.zeros((self.num_envs, self.ref_hoi_obs_size), device=self.device, dtype=torch.float)
self._tar_pos = torch.zeros([self.num_envs, 3], device=self.device, dtype=torch.float)
self._build_target_tensors()
# self._build_marker_state_tensors()
if self.projtype == "Mouse" or self.projtype == "Auto":
self._build_proj_tensors()
[docs] def post_physics_step(self):
if self.projtype == "Mouse" or self.projtype == "Auto":
self._update_proj()
super().post_physics_step()
self._update_hist_hoi_obs()
self._compute_hoi_observations()
def _update_hist_hoi_obs(self, env_ids=None):
self._hist_obs = self._curr_obs.clone()
def _setup_character_props(self, key_bodies):
super()._setup_character_props(key_bodies)
def _load_motion(self, motion_file):
# '''load HOI dataset'''
self.num_motions = 1
data_path = motion_file
self.hoi_data_dict = {}
loaded_dict = {}
hoi_data = torch.load(data_path)
loaded_dict['hoi_data'] = hoi_data.detach().to('cuda')
# '''change the data framerate'''
# NOTE: this is used for temporary testing, and is not rigorous that may yield incorrect rotations.
dataFramesScale = self.cfg["env"]["dataFramesScale"]
scale_hoi_data = torch.nn.functional.interpolate(loaded_dict['hoi_data'].unsqueeze(
1).transpose(0, 2), scale_factor=dataFramesScale, mode='linear', align_corners=True)
loaded_dict['hoi_data'] = scale_hoi_data.transpose(0, 2).squeeze(1).clone().contiguous()
self.max_episode_length = loaded_dict['hoi_data'].shape[0]
self.fps_data = 30.
# self.fps_data = self.cfg["env"]["dataFPS"]*dataFramesScale
loaded_dict['root_pos'] = loaded_dict['hoi_data'][:, 0:3].clone()
loaded_dict['root_pos_vel'] = (loaded_dict['root_pos'][1:, :].clone() -
loaded_dict['root_pos'][:-1, :].clone()) * self.fps_data
loaded_dict['root_pos_vel'] = torch.cat(
(torch.zeros((1, loaded_dict['root_pos_vel'].shape[-1])).to('cuda'), loaded_dict['root_pos_vel']), dim=0)
loaded_dict['root_rot'] = loaded_dict['hoi_data'][:, 3:6].clone()
loaded_dict['root_rot_data'] = loaded_dict['root_rot'].clone()
loaded_dict['root_rot_vel'] = (loaded_dict['root_rot'][1:, :].clone() -
loaded_dict['root_rot'][:-1, :].clone()) * self.fps_data
loaded_dict['root_rot_vel'] = torch.cat(
(torch.zeros((1, loaded_dict['root_rot_vel'].shape[-1])).to('cuda'), loaded_dict['root_rot_vel']), dim=0)
loaded_dict['root_rot'] = torch_utils.exp_map_to_quat(loaded_dict['root_rot']).clone()
loaded_dict['dof_pos'] = loaded_dict['hoi_data'][:, 9:9 + 153].clone()
loaded_dict['dof_pos_vel'] = (loaded_dict['dof_pos'][1:, :].clone() -
loaded_dict['dof_pos'][:-1, :].clone()) * self.fps_data
loaded_dict['dof_pos_vel'] = torch.cat(
(torch.zeros((1, loaded_dict['dof_pos_vel'].shape[-1])).to('cuda'), loaded_dict['dof_pos_vel']), dim=0)
loaded_dict['body_pos'] = loaded_dict['hoi_data'][:, 162: 162 + 52 * 3].clone().view(self.max_episode_length,
52, 3)
loaded_dict['key_body_pos'] = loaded_dict['body_pos'][:,
self._key_body_ids, :].view(self.max_episode_length, -1).clone()
loaded_dict['key_body_pos_vel'] = (loaded_dict['key_body_pos'][1:, :].clone() -
loaded_dict['key_body_pos'][:-1, :].clone()) * self.fps_data
loaded_dict['key_body_pos_vel'] = torch.cat(
(torch.zeros((1, loaded_dict['key_body_pos_vel'].shape[-1])).to('cuda'), loaded_dict['key_body_pos_vel']),
dim=0)
loaded_dict['obj_pos'] = loaded_dict['hoi_data'][:, 318:321].clone()
loaded_dict['obj_pos_vel'] = (loaded_dict['obj_pos'][1:, :].clone() -
loaded_dict['obj_pos'][:-1, :].clone()) * self.fps_data
if self.init_vel:
loaded_dict['obj_pos_vel'] = torch.cat((loaded_dict['obj_pos_vel'][:1], loaded_dict['obj_pos_vel']), dim=0)
else:
loaded_dict['obj_pos_vel'] = torch.cat(
(torch.zeros((1, loaded_dict['obj_pos_vel'].shape[-1])).to('cuda'), loaded_dict['obj_pos_vel']), dim=0)
loaded_dict['obj_rot'] = -loaded_dict['hoi_data'][:, 321:324].clone()
loaded_dict['obj_rot_vel'] = (loaded_dict['obj_rot'][1:, :].clone() -
loaded_dict['obj_rot'][:-1, :].clone()) * self.fps_data
loaded_dict['obj_rot_vel'] = torch.cat(
(torch.zeros((1, loaded_dict['obj_rot_vel'].shape[-1])).to('cuda'), loaded_dict['obj_rot_vel']), dim=0)
loaded_dict['obj_rot'] = torch_utils.exp_map_to_quat(-loaded_dict['hoi_data'][:, 321:324]).clone()
loaded_dict['contact'] = torch.round(loaded_dict['hoi_data'][:, 330:331].clone())
loaded_dict['hoi_data'] = torch.cat((
loaded_dict['root_pos'].clone(),
loaded_dict['root_rot'].clone(),
loaded_dict['dof_pos'].clone(),
loaded_dict['dof_pos_vel'].clone(),
loaded_dict['obj_pos'].clone(),
loaded_dict['obj_rot'].clone(),
loaded_dict['obj_pos_vel'].clone(),
loaded_dict['key_body_pos'][:, :].clone(),
loaded_dict['contact'].clone()
), dim=-1)
assert (self.ref_hoi_obs_size == loaded_dict['hoi_data'].shape[-1])
self.hoi_data_dict[0] = loaded_dict
def _update_marker(self):
self._marker_states[:, :3] = self.hoi_data_dict[0]['obj_pos'][self.progress_buf, :] # .clone()
self._marker_states[:, 3:7] = self.hoi_data_dict[0]['obj_rot'][self.progress_buf, :] # .clone() #rand_rot
self._marker_states[:, 7:10] = self.hoi_data_dict[0]['obj_pos_vel'][self.progress_buf, :] # .clone()
self._marker_states[:, 10:13] = self.hoi_data_dict[0]['obj_rot_vel'][self.progress_buf, :] # .clone()
self.gym.set_actor_root_state_tensor_indexed(self.sim, gymtorch.unwrap_tensor(self._root_states),
gymtorch.unwrap_tensor(self._marker_actor_ids),
len(self._marker_actor_ids))
def _create_envs(self, num_envs, spacing, num_per_row):
self._target_handles = []
self._load_target_asset()
# self._marker_handles = []
# self._load_marker_asset()
if self.projtype == "Mouse" or self.projtype == "Auto":
self._proj_handles = []
self._load_proj_asset()
super()._create_envs(num_envs, spacing, num_per_row)
def _build_env(self, env_id, env_ptr, humanoid_asset):
super()._build_env(env_id, env_ptr, humanoid_asset)
self._build_target(env_id, env_ptr)
# self._build_marker(env_id, env_ptr)
if self.projtype == "Mouse" or self.projtype == "Auto":
self._build_proj(env_id, env_ptr)
def _build_proj(self, env_id, env_ptr):
col_group = env_id
col_filter = 0
segmentation_id = 0
for i, obj in enumerate(PERTURB_OBJS):
default_pose = gymapi.Transform()
default_pose.p.x = 200 + i
default_pose.p.z = 1
obj_type = obj[0]
if (obj_type == "small"):
proj_asset = self._small_proj_asset
elif (obj_type == "large"):
proj_asset = self._large_proj_asset
proj_handle = self.gym.create_actor(env_ptr, proj_asset, default_pose,
"proj{:d}".format(i), col_group, col_filter, segmentation_id)
self._proj_handles.append(proj_handle)
self.gym.set_actor_scale(env_ptr, proj_handle, 1)
def _build_proj_tensors(self):
self._proj_dist_min = 4
self._proj_dist_max = 5
self._proj_h_min = 0.25
self._proj_h_max = 2
self._proj_steps = 150
self._proj_warmup_steps = 1
self._proj_speed_min = 30
self._proj_speed_max = 40
num_actors = self.get_num_actors_per_env()
num_objs = len(PERTURB_OBJS)
self._proj_states = self._root_states.view(
self.num_envs, num_actors, self._root_states.shape[-1])[..., (num_actors - num_objs):, :]
self._proj_actor_ids = num_actors * np.arange(self.num_envs)
self._proj_actor_ids = np.expand_dims(self._proj_actor_ids, axis=-1)
self._proj_actor_ids = self._proj_actor_ids + \
np.reshape(np.array(self._proj_handles), [self.num_envs, num_objs])
self._proj_actor_ids = self._proj_actor_ids.flatten()
self._proj_actor_ids = to_torch(self._proj_actor_ids, device=self.device, dtype=torch.int32)
bodies_per_env = self._rigid_body_state.shape[0] // self.num_envs
contact_force_tensor = self.gym.acquire_net_contact_force_tensor(self.sim)
contact_force_tensor = gymtorch.wrap_tensor(contact_force_tensor)
self._proj_contact_forces = contact_force_tensor.view(
self.num_envs, bodies_per_env, 3)[..., (num_actors - num_objs):, :]
self._calc_perturb_times()
self.gym.subscribe_viewer_keyboard_event(self.viewer, gymapi.KEY_SPACE, "space_shoot")
self.gym.subscribe_viewer_keyboard_event(self.viewer, gymapi.KEY_R, "reset")
self.gym.subscribe_viewer_mouse_event(self.viewer, gymapi.MOUSE_LEFT_BUTTON, "mouse_shoot")
def _load_proj_asset(self):
asset_root = "physhoi/data/assets/mjcf/"
small_asset_file = "block_projectile.urdf"
# small_asset_file = "ball_medium.urdf"
small_asset_options = gymapi.AssetOptions()
small_asset_options.angular_damping = 0.01
small_asset_options.linear_damping = 0.01
small_asset_options.max_angular_velocity = 100.0
small_asset_options.density = 200.0
# small_asset_options.fix_base_link = True
small_asset_options.default_dof_drive_mode = gymapi.DOF_MODE_NONE
self._small_proj_asset = self.gym.load_asset(self.sim, asset_root, small_asset_file, small_asset_options)
def _load_marker_asset(self):
asset_root = "physhoi/data/assets/mjcf/"
asset_file = "location_marker.urdf"
asset_options = gymapi.AssetOptions()
asset_options.angular_damping = 0.0
asset_options.linear_damping = 0.0
asset_options.max_angular_velocity = 0.0
asset_options.density = 0
asset_options.fix_base_link = True
asset_options.default_dof_drive_mode = gymapi.DOF_MODE_NONE
self._marker_asset = self.gym.load_asset(self.sim, asset_root, asset_file, asset_options)
def _load_target_asset(self): # smplx
rofunc_path = get_rofunc_path()
asset_root = os.path.join(rofunc_path, "simulator/assets")
asset_file = "mjcf/basketball/ball.urdf"
asset_options = gymapi.AssetOptions()
asset_options.angular_damping = 0.01
asset_options.linear_damping = 0.01
asset_options.max_angular_velocity = 100.0
asset_options.density = 1000.0
asset_options.default_dof_drive_mode = gymapi.DOF_MODE_NONE
asset_options.vhacd_enabled = True
asset_options.vhacd_params.max_convex_hulls = 1
asset_options.vhacd_params.max_num_vertices_per_ch = 64
asset_options.vhacd_params.resolution = 300000
# asset_options.vhacd_params.max_convex_hulls = 10
# asset_options.disable_gravity = True
# asset_options.fix_base_link = True
self._target_asset = self.gym.load_asset(self.sim, asset_root, asset_file, asset_options)
def _build_target(self, env_id, env_ptr):
col_group = env_id
col_filter = 0
segmentation_id = 0
default_pose = gymapi.Transform()
target_handle = self.gym.create_actor(env_ptr, self._target_asset, default_pose,
"target", col_group, col_filter, segmentation_id)
# set ball color
# if self.cfg["headless"] == False:
# self.gym.set_rigid_body_color(env_ptr, target_handle, 0, gymapi.MESH_VISUAL, gymapi.Vec3(1.5, 1.5, 1.5))
# # gymapi.Vec3(0., 1.0, 1.5))
#
# rofunc_path = get_rofunc_path()
# asset_root = os.path.join(rofunc_path, "simulator/assets")
# texture_file = "mjcf/basketball/basketball.png"
# h = self.gym.create_texture_from_file(self.sim, os.path.join(asset_root, texture_file))
# self.gym.set_rigid_body_texture(env_ptr, target_handle, 0, gymapi.MESH_VISUAL, h)
self._target_handles.append(target_handle)
self.gym.set_actor_scale(env_ptr, target_handle, self.ball_size)
def _build_marker(self, env_id, env_ptr):
col_group = env_id
col_filter = 2
segmentation_id = 0
default_pose = gymapi.Transform()
marker_handle = self.gym.create_actor(env_ptr, self._marker_asset, default_pose,
"marker", col_group, col_filter, segmentation_id)
self.gym.set_rigid_body_color(env_ptr, marker_handle, 0, gymapi.MESH_VISUAL, gymapi.Vec3(0.8, 0.0, 0.0))
self._marker_handles.append(marker_handle)
def _build_target_tensors(self):
num_actors = self.get_num_actors_per_env()
self._target_states = self._root_states.view(self.num_envs, num_actors, self._root_states.shape[-1])[..., 1, :]
self._tar_actor_ids = to_torch(num_actors * np.arange(self.num_envs), device=self.device, dtype=torch.int32) + 1
bodies_per_env = self._rigid_body_state.shape[0] // self.num_envs
contact_force_tensor = self.gym.acquire_net_contact_force_tensor(self.sim)
contact_force_tensor = gymtorch.wrap_tensor(contact_force_tensor)
self._tar_contact_forces = contact_force_tensor.view(self.num_envs, bodies_per_env, 3)[..., self.num_bodies, :]
def _build_marker_state_tensors(self):
num_actors = self._root_states.shape[0] // self.num_envs
self._marker_states = self._root_states.view(self.num_envs, num_actors, self._root_states.shape[-1])[..., 2, :]
self._marker_pos = self._marker_states[..., :3]
self._marker_actor_ids = to_torch(num_actors * np.arange(self.num_envs),
device=self.device, dtype=torch.int32) + 2
def _reset_target(self, env_ids):
self._target_states[env_ids, :3] = self.hoi_data_dict[0]['obj_pos'][self.motion_times, :] # .clone()+0.5
self._target_states[env_ids, 3:7] = self.hoi_data_dict[0]['obj_rot'][self.motion_times, :] # .clone() #rand_rot
self._target_states[env_ids, 7:10] = self.hoi_data_dict[0]['obj_pos_vel'][self.motion_times, :] # .clone()
self._target_states[env_ids, 10:13] = self.hoi_data_dict[0]['obj_rot_vel'][self.motion_times, :] # .clone()
def _reset_env_tensors(self, env_ids):
super()._reset_env_tensors(env_ids)
env_ids_int32 = self._tar_actor_ids[env_ids]
self.gym.set_actor_root_state_tensor_indexed(self.sim, gymtorch.unwrap_tensor(self._root_states),
gymtorch.unwrap_tensor(env_ids_int32), len(env_ids_int32))
[docs] def reset_idx(self, env_ids):
self._reset_default_env_ids = []
self._reset_ref_env_ids = []
super().reset_idx(env_ids)
def _reset_actors(self, env_ids):
if self._state_init == HumanoidPhysHOITask.StateInit.Default:
self._reset_default(env_ids)
elif (self._state_init == HumanoidPhysHOITask.StateInit.Start
or self._state_init == HumanoidPhysHOITask.StateInit.Random):
self._reset_ref_state_init(env_ids)
elif self._state_init == HumanoidPhysHOITask.StateInit.Hybrid:
self._reset_hybrid_state_init(env_ids)
else:
assert (False), "Unsupported state initialization strategy: {:s}".format(str(self._state_init))
self._reset_target(env_ids)
def _reset_default(self, env_ids):
self._humanoid_root_states[env_ids] = self._initial_humanoid_root_states[env_ids]
self._dof_pos[env_ids] = self._initial_dof_pos[env_ids]
self._dof_vel[env_ids] = self._initial_dof_vel[env_ids]
self._reset_default_env_ids = env_ids
def _reset_ref_state_init(self, env_ids):
num_envs = env_ids.shape[0]
if (self._state_init == HumanoidPhysHOITask.StateInit.Random
or self._state_init == HumanoidPhysHOITask.StateInit.Hybrid):
motion_times = torch.randint(
0, self.hoi_data_dict[0]['hoi_data'].shape[0] - 2, (num_envs,), device=self.device, dtype=torch.long)
elif (self._state_init == HumanoidPhysHOITask.StateInit.Start):
motion_times = torch.zeros(num_envs, device=self.device, dtype=torch.long) # .int()
self.motion_times = motion_times.clone()
# TODO: i should has shape of env_ids
i = random.randint(0, self.num_motions - 1)
self._set_env_state(env_ids=env_ids,
root_pos=self.hoi_data_dict[i]['root_pos'][motion_times, :].clone(),
root_rot=self.hoi_data_dict[i]['root_rot'][motion_times, :].clone(),
dof_pos=self.hoi_data_dict[i]['dof_pos'][motion_times, :].clone(),
root_vel=self.hoi_data_dict[i]['root_pos_vel'][motion_times, :].clone(),
root_ang_vel=self.hoi_data_dict[i]['root_rot_vel'][motion_times, :].clone(),
dof_vel=self.hoi_data_dict[i]['dof_pos_vel'][motion_times, :].clone(),
)
def _reset_hybrid_state_init(self, env_ids):
num_envs = env_ids.shape[0]
ref_probs = to_torch(np.array([self._hybrid_init_prob] * num_envs), device=self.device)
ref_init_mask = torch.bernoulli(ref_probs) == 1.0
ref_reset_ids = env_ids[ref_init_mask]
if (len(ref_reset_ids) > 0):
self._reset_ref_state_init(ref_reset_ids)
default_reset_ids = env_ids[torch.logical_not(ref_init_mask)]
if (len(default_reset_ids) > 0):
self._reset_default(default_reset_ids)
def _set_env_state(self, env_ids, root_pos, root_rot, dof_pos, root_vel, root_ang_vel, dof_vel):
self._humanoid_root_states[env_ids, 0:3] = root_pos
self._humanoid_root_states[env_ids, 3:7] = root_rot
self._humanoid_root_states[env_ids, 7:10] = root_vel
self._humanoid_root_states[env_ids, 10:13] = root_ang_vel
self._dof_pos[env_ids] = dof_pos
self._dof_vel[env_ids] = dof_vel
def _compute_hoi_observations(self, env_ids=None):
key_body_pos = self._rigid_body_pos[:, self._key_body_ids, :]
# diffvel, set 0 for the first frame
hist_dof_pos = self._hist_obs[:, 7:7 + 153]
dof_diffvel = (self._dof_pos - hist_dof_pos) * self.fps_data
dof_diffvel = dof_diffvel * (self.progress_buf != 1).to(float).unsqueeze(dim=-1)
if (env_ids is None):
self._curr_obs[:] = build_hoi_observations(self._rigid_body_pos[:, 0, :],
self._rigid_body_rot[:, 0, :],
self._rigid_body_vel[:, 0, :],
self._rigid_body_ang_vel[:, 0, :],
self._dof_pos, self._dof_vel, key_body_pos,
self._local_root_obs, self._root_height_obs,
self._dof_obs_size, self._target_states,
dof_diffvel)
else:
self._curr_obs[env_ids] = build_hoi_observations(self._rigid_body_pos[env_ids][:, 0, :],
self._rigid_body_rot[env_ids][:, 0, :],
self._rigid_body_vel[env_ids][:, 0, :],
self._rigid_body_ang_vel[env_ids][:, 0, :],
self._dof_pos[env_ids], self._dof_vel[env_ids],
key_body_pos[env_ids],
self._local_root_obs, self._root_height_obs,
self._dof_obs_size, self._target_states[env_ids],
dof_diffvel[env_ids])
def _calc_perturb_times(self):
self._perturb_timesteps = []
total_steps = 0
for i, obj in enumerate(PERTURB_OBJS):
curr_time = obj[1]
total_steps += curr_time
self._perturb_timesteps.append(total_steps)
self._perturb_timesteps = np.array(self._perturb_timesteps)
def _update_proj(self):
if self.projtype == 'Auto':
curr_timestep = self.progress_buf.cpu().numpy()[0]
curr_timestep = curr_timestep % (self._perturb_timesteps[-1] + 1)
perturb_step = np.where(self._perturb_timesteps == curr_timestep)[0]
if (len(perturb_step) > 0):
perturb_id = perturb_step[0]
n = self.num_envs
humanoid_root_pos = self._humanoid_root_states[..., 0:3]
rand_theta = torch.rand([n], dtype=self._proj_states.dtype, device=self._proj_states.device)
rand_theta *= 2 * np.pi
rand_dist = (self._proj_dist_max - self._proj_dist_min) * \
torch.rand([n], dtype=self._proj_states.dtype,
device=self._proj_states.device) + self._proj_dist_min
pos_x = rand_dist * torch.cos(rand_theta)
pos_y = -rand_dist * torch.sin(rand_theta)
pos_z = (self._proj_h_max - self._proj_h_min) * \
torch.rand([n], dtype=self._proj_states.dtype,
device=self._proj_states.device) + self._proj_h_min
self._proj_states[..., perturb_id, 0] = humanoid_root_pos[..., 0] + pos_x
self._proj_states[..., perturb_id, 1] = humanoid_root_pos[..., 1] + pos_y
self._proj_states[..., perturb_id, 2] = pos_z
self._proj_states[..., perturb_id, 3:6] = 0.0
self._proj_states[..., perturb_id, 6] = 1.0
tar_body_idx = np.random.randint(self.num_bodies)
tar_body_idx = 1
launch_tar_pos = self._rigid_body_pos[..., tar_body_idx, :]
launch_dir = launch_tar_pos - self._proj_states[..., perturb_id, 0:3]
launch_dir += 0.1 * torch.randn_like(launch_dir)
launch_dir = torch.nn.functional.normalize(launch_dir, dim=-1)
launch_speed = (self._proj_speed_max - self._proj_speed_min) * \
torch.rand_like(launch_dir[:, 0:1]) + self._proj_speed_min
launch_vel = launch_speed * launch_dir
launch_vel[..., 0:2] += self._rigid_body_vel[..., tar_body_idx, 0:2]
self._proj_states[..., perturb_id, 7:10] = launch_vel
self._proj_states[..., perturb_id, 10:13] = 0.0
self.gym.set_actor_root_state_tensor_indexed(self.sim, gymtorch.unwrap_tensor(self._root_states),
gymtorch.unwrap_tensor(self._proj_actor_ids),
len(self._proj_actor_ids))
elif self.projtype == 'Mouse':
# mouse control
for evt in self.gym.query_viewer_action_events(self.viewer):
if evt.action == "reset" and evt.value > 0:
self.gym.set_sim_rigid_body_states(self.sim, self._proj_states, gymapi.STATE_ALL)
elif (evt.action == "space_shoot" or evt.action == "mouse_shoot") and evt.value > 0:
if evt.action == "mouse_shoot":
pos = self.gym.get_viewer_mouse_position(self.viewer)
window_size = self.gym.get_viewer_size(self.viewer)
xcoord = round(pos.x * window_size.x)
ycoord = round(pos.y * window_size.y)
print(f"Fired projectile with mouse at coords: {xcoord} {ycoord}")
cam_pose = self.gym.get_viewer_camera_transform(self.viewer, None)
cam_fwd = cam_pose.r.rotate(gymapi.Vec3(0, 0, 1))
spawn = cam_pose.p
speed = 25
vel = cam_fwd * speed
angvel = 1.57 - 3.14 * np.random.random(3)
self._proj_states[..., 0] = spawn.x
self._proj_states[..., 1] = spawn.y
self._proj_states[..., 2] = spawn.z
self._proj_states[..., 7] = vel.x
self._proj_states[..., 8] = vel.y
self._proj_states[..., 9] = vel.z
self.gym.set_actor_root_state_tensor_indexed(self.sim, gymtorch.unwrap_tensor(self._root_states),
gymtorch.unwrap_tensor(self._proj_actor_ids),
len(self._proj_actor_ids))
[docs] def play_dataset_step(self, time):
t = time
### update object ###
self._target_states[:, :3] = self.hoi_data_dict[0]['obj_pos'][t, :]
self._target_states[:, 3:7] = self.hoi_data_dict[0]['obj_rot'][t, :]
self._target_states[:, 7:10] = torch.zeros_like(self._target_states[:, 7:10])
self._target_states[:, 10:13] = torch.zeros_like(self._target_states[:, 10:13])
### update subject ###
_humanoid_root_pos = self.hoi_data_dict[0]['root_pos'][t, :].clone()
_humanoid_root_rot = self.hoi_data_dict[0]['root_rot'][t, :].clone()
self._humanoid_root_states[:, 0:3] = _humanoid_root_pos
self._humanoid_root_states[:, 3:7] = _humanoid_root_rot
self._humanoid_root_states[:, 7:10] = torch.zeros_like(self._humanoid_root_states[:, 7:10])
self._humanoid_root_states[:, 10:13] = torch.zeros_like(self._humanoid_root_states[:, 10:13])
self._dof_pos[:] = self.hoi_data_dict[0]['dof_pos'][t, :].clone()
self._dof_vel[:] = torch.zeros_like(self._dof_vel[:])
contact = self.hoi_data_dict[0]['contact'][t, :]
self.gym.set_actor_root_state_tensor(self.sim, gymtorch.unwrap_tensor(self._root_states))
self.gym.set_dof_state_tensor(self.sim, gymtorch.unwrap_tensor(self._dof_state))
self._refresh_sim_tensors()
# ### draw contact label ###
obj_contact = torch.any(contact > 0.1, dim=-1)
for env_id, env_ptr in enumerate(self.envs):
env_ptr = self.envs[env_id]
handle = self._target_handles[env_id]
if obj_contact == True:
self.gym.set_rigid_body_color(env_ptr, handle, 0, gymapi.MESH_VISUAL,
gymapi.Vec3(1., 0., 0.))
else:
self.gym.set_rigid_body_color(env_ptr, handle, 0, gymapi.MESH_VISUAL,
gymapi.Vec3(0., 1., 0.))
self.render(t=t)
self.gym.simulate(self.sim)
[docs] def get_dataset_step(self, time):
t = time
### update object ###
self._target_states[:, :3] = self.hoi_data_dict[0]['obj_pos'][t, :]
self._target_states[:, 3:7] = self.hoi_data_dict[0]['obj_rot'][t, :]
self._target_states[:, 7:10] = torch.zeros_like(self._target_states[:, 7:10])
self._target_states[:, 10:13] = torch.zeros_like(self._target_states[:, 10:13])
### update subject ###
_humanoid_root_pos = self.hoi_data_dict[0]['root_pos'][t, :].clone()
_humanoid_root_rot = self.hoi_data_dict[0]['root_rot'][t, :].clone()
self._humanoid_root_states[:, 0:3] = _humanoid_root_pos
self._humanoid_root_states[:, 3:7] = _humanoid_root_rot
self._humanoid_root_states[:, 7:10] = torch.zeros_like(self._humanoid_root_states[:, 7:10])
self._humanoid_root_states[:, 10:13] = torch.zeros_like(self._humanoid_root_states[:, 10:13])
self._dof_pos[:] = self.hoi_data_dict[0]['dof_pos'][t, :].clone()
self._dof_vel[:] = torch.zeros_like(self._dof_vel[:])
contact = self.hoi_data_dict[0]['contact'][t, :]
return self._dof_state
def _draw_task_play(self, t):
# self._update_marker()
cols = np.array([[1.0, 0.0, 0.0]], dtype=np.float32) # color
self.gym.clear_lines(self.viewer)
starts = self.hoi_data_dict[0]['hoi_data'][t, :3]
for i, env_ptr in enumerate(self.envs):
for j in range(len(self._key_body_ids)):
vec = self.hoi_data_dict[0]['key_body_pos'][t, j * 3:j * 3 + 3]
vec = torch.cat([starts, vec], dim=-1).cpu().numpy().reshape([1, 6])
self.gym.add_lines(self.viewer, env_ptr, 1, vec, cols)
[docs] def render(self, sync_frame_time=False, t=0):
super().render(sync_frame_time)
if self.viewer:
# self._draw_task()
if self.save_images:
env_ids = 0
if self.play_dataset:
frame_id = t
else:
frame_id = self.progress_buf[env_ids]
dataname = self.motion_file[len('physhoi/data/motions/BallPlay/'):-3]
rgb_filename = "physhoi/data/images/" + dataname + "/rgb_env%d_frame%05d.png" % (env_ids, frame_id)
os.makedirs("physhoi/data/images/" + dataname, exist_ok=True)
self.gym.write_viewer_image_to_file(self.viewer, rgb_filename)
# def _draw_task(self):
# # self._update_marker()
#
# # # draw obj contact
# # obj_contact = torch.any(torch.abs(self._tar_contact_forces[..., 0:2]) > 0.1, dim=-1)
# # for env_id, env_ptr in enumerate(self.envs):
# # env_ptr = self.envs[env_id]
# # handle = self._target_handles[env_id]
#
# # if obj_contact[env_id] == True:
# # self.gym.set_rigid_body_color(env_ptr, handle, 0, gymapi.MESH_VISUAL,
# # gymapi.Vec3(1., 0., 0.))
# # else:
# # self.gym.set_rigid_body_color(env_ptr, handle, 0, gymapi.MESH_VISUAL,
# # gymapi.Vec3(0., 1., 0.))
#####################################################################
### =========================jit functions=========================###
#####################################################################
#@torch.jit.script
[docs]def build_hoi_observations(root_pos, root_rot, root_vel, root_ang_vel, dof_pos, dof_vel, key_body_pos,
local_root_obs, root_height_obs, dof_obs_size, target_states, dof_diffvel):
contact = torch.zeros(key_body_pos.shape[0], 1).cuda()
obs = torch.cat((root_pos, root_rot, dof_pos, dof_diffvel, target_states[:, :10], key_body_pos.contiguous(
).view(-1, key_body_pos.shape[1] * key_body_pos.shape[2]), contact), dim=-1)
return obs
@torch.jit.script
def compute_obj_observations(root_states, tar_states):
# type: (Tensor, Tensor) -> Tensor
root_pos = root_states[:, 0:3]
root_rot = root_states[:, 3:7]
tar_pos = tar_states[:, 0:3]
tar_rot = tar_states[:, 3:7]
tar_vel = tar_states[:, 7:10]
tar_ang_vel = tar_states[:, 10:13]
heading_rot = torch_utils.calc_heading_quat_inv(root_rot)
local_tar_pos = tar_pos - root_pos
local_tar_pos[..., -1] = tar_pos[..., -1]
local_tar_pos = quat_rotate(heading_rot, local_tar_pos)
local_tar_vel = quat_rotate(heading_rot, tar_vel)
local_tar_ang_vel = quat_rotate(heading_rot, tar_ang_vel)
local_tar_rot = quat_mul(heading_rot, tar_rot)
local_tar_rot_obs = torch_utils.quat_to_tan_norm(local_tar_rot)
obs = torch.cat([local_tar_pos, local_tar_rot_obs, local_tar_vel, local_tar_ang_vel], dim=-1)
return obs
@torch.jit.script
def compute_humanoid_observations_max(body_pos, body_rot, body_vel, body_ang_vel, local_root_obs, root_height_obs,
contact_forces, contact_body_ids):
# type: (Tensor, Tensor, Tensor, Tensor, bool, bool, Tensor, Tensor) -> Tensor
root_pos = body_pos[:, 0, :]
root_rot = body_rot[:, 0, :]
root_h = root_pos[:, 2:3]
heading_rot = torch_utils.calc_heading_quat_inv(root_rot)
if (not root_height_obs):
root_h_obs = torch.zeros_like(root_h)
else:
root_h_obs = root_h
heading_rot_expand = heading_rot.unsqueeze(-2)
heading_rot_expand = heading_rot_expand.repeat((1, body_pos.shape[1], 1))
flat_heading_rot = heading_rot_expand.reshape(heading_rot_expand.shape[0] * heading_rot_expand.shape[1],
heading_rot_expand.shape[2])
root_pos_expand = root_pos.unsqueeze(-2)
local_body_pos = body_pos - root_pos_expand
flat_local_body_pos = local_body_pos.reshape(
local_body_pos.shape[0] * local_body_pos.shape[1], local_body_pos.shape[2])
flat_local_body_pos = quat_rotate(flat_heading_rot, flat_local_body_pos)
local_body_pos = flat_local_body_pos.reshape(
local_body_pos.shape[0], local_body_pos.shape[1] * local_body_pos.shape[2])
local_body_pos = local_body_pos[..., 3:] # remove root pos
flat_body_rot = body_rot.reshape(body_rot.shape[0] * body_rot.shape[1], body_rot.shape[2])
flat_local_body_rot = quat_mul(flat_heading_rot, flat_body_rot)
flat_local_body_rot_obs = torch_utils.quat_to_tan_norm(flat_local_body_rot)
local_body_rot_obs = flat_local_body_rot_obs.reshape(
body_rot.shape[0], body_rot.shape[1] * flat_local_body_rot_obs.shape[1])
if (local_root_obs):
root_rot_obs = torch_utils.quat_to_tan_norm(root_rot)
local_body_rot_obs[..., 0:6] = root_rot_obs
flat_body_vel = body_vel.reshape(body_vel.shape[0] * body_vel.shape[1], body_vel.shape[2])
flat_local_body_vel = quat_rotate(flat_heading_rot, flat_body_vel)
local_body_vel = flat_local_body_vel.reshape(body_vel.shape[0], body_vel.shape[1] * body_vel.shape[2])
flat_body_ang_vel = body_ang_vel.reshape(body_ang_vel.shape[0] * body_ang_vel.shape[1], body_ang_vel.shape[2])
flat_local_body_ang_vel = quat_rotate(flat_heading_rot, flat_body_ang_vel)
local_body_ang_vel = flat_local_body_ang_vel.reshape(
body_ang_vel.shape[0], body_ang_vel.shape[1] * body_ang_vel.shape[2])
body_contact_buf = contact_forces[:, contact_body_ids, :].clone().view(contact_forces.shape[0], -1)
obs = torch.cat((root_h_obs, local_body_pos, local_body_rot_obs,
local_body_vel, local_body_ang_vel, body_contact_buf), dim=-1)
return obs
@torch.jit.script
def compute_humanoid_reward(hoi_ref, hoi_obs, contact_buf, tar_contact_forces, len_keypos, w_p, w_r, w_pv, w_rv, w_op,
w_or, w_opv, w_orv, w_ig, w_cg1, w_cg2):
# type: (Tensor, Tensor, Tensor, Tensor, int, float, float, float, float, float, float, float, float, float, float, float) -> Tensor
### data preprocess ###
# simulated states
root_pos = hoi_obs[:, :3]
root_rot = hoi_obs[:, 3:3 + 4]
dof_pos = hoi_obs[:, 7:7 + 51 * 3]
dof_pos_vel = hoi_obs[:, 160:160 + 51 * 3]
obj_pos = hoi_obs[:, 313:313 + 3]
obj_rot = hoi_obs[:, 316:316 + 4]
obj_pos_vel = hoi_obs[:, 320:320 + 3]
key_pos = hoi_obs[:, 323:323 + len_keypos * 3]
contact = hoi_obs[:, -1:] # fake one
key_pos = torch.cat((root_pos, key_pos), dim=-1)
body_rot = torch.cat((root_rot, dof_pos), dim=-1)
ig = key_pos.view(-1, len_keypos + 1, 3).transpose(0, 1) - obj_pos[:, :3]
ig = ig.transpose(0, 1).view(-1, (len_keypos + 1) * 3)
# reference states
ref_root_pos = hoi_ref[:, :3]
ref_root_rot = hoi_ref[:, 3:3 + 4]
ref_dof_pos = hoi_ref[:, 7:7 + 51 * 3]
ref_dof_pos_vel = hoi_ref[:, 160:160 + 51 * 3]
ref_obj_pos = hoi_ref[:, 313:313 + 3]
ref_obj_rot = hoi_ref[:, 316:316 + 4]
ref_obj_pos_vel = hoi_ref[:, 320:320 + 3]
ref_key_pos = hoi_ref[:, 323:323 + len_keypos * 3]
ref_obj_contact = hoi_ref[:, -1:]
ref_key_pos = torch.cat((ref_root_pos, ref_key_pos), dim=-1)
ref_body_rot = torch.cat((ref_root_rot, ref_dof_pos), dim=-1)
ref_ig = ref_key_pos.view(-1, len_keypos + 1, 3).transpose(0, 1) - ref_obj_pos[:, :3]
ref_ig = ref_ig.transpose(0, 1).view(-1, (len_keypos + 1) * 3)
### body reward ###
# body pos reward
ep = torch.mean((ref_key_pos - key_pos) ** 2, dim=-1)
rp = torch.exp(-ep * w_p)
# body rot reward
er = torch.mean((ref_body_rot - body_rot) ** 2, dim=-1)
rr = torch.exp(-er * w_r)
# body pos vel reward
epv = torch.zeros_like(ep)
rpv = torch.exp(-epv * w_pv)
# body rot vel reward
erv = torch.mean((ref_dof_pos_vel - dof_pos_vel) ** 2, dim=-1)
rrv = torch.exp(-erv * w_rv)
rb = rp * rr * rpv * rrv
### object reward ###
# object pos reward
eop = torch.mean((ref_obj_pos - obj_pos) ** 2, dim=-1)
rop = torch.exp(-eop * w_op)
# object rot reward
eor = torch.zeros_like(ep) # torch.mean((ref_obj_rot - obj_rot)**2,dim=-1)
ror = torch.exp(-eor * w_or)
# object pos vel reward
eopv = torch.mean((ref_obj_pos_vel - obj_pos_vel) ** 2, dim=-1)
ropv = torch.exp(-eopv * w_opv)
# object rot vel reward
eorv = torch.zeros_like(ep) # torch.mean((ref_obj_rot_vel - obj_rot_vel)**2,dim=-1)
rorv = torch.exp(-eorv * w_orv)
ro = rop * ror * ropv * rorv
### interaction graph reward ###
eig = torch.mean((ref_ig - ig) ** 2, dim=-1)
rig = torch.exp(-eig * w_ig)
### simplified contact graph reward ###
# Since Isaac Gym does not yet provide API for detailed collision detection in GPU pipeline,
# we use force detection to approximate the contact status.
# In this case we use the CG node istead of the CG edge for imitation.
# TODO: update the code once collision detection API is available.
# body ids
# Pelvis, 0
# L_Hip, 1
# L_Knee, 2
# L_Ankle, 3
# L_Toe, 4
# R_Hip, 5
# R_Knee, 6
# R_Ankle, 7
# R_Toe, 8
# Torso, 9
# Spine, 10
# Chest, 11
# Neck, 12
# Head, 13
# L_Thorax, 14
# L_Shoulder, 15
# L_Elbow, 16
# L_Wrist, 17
# L_Hand, 18-32
# R_Thorax, 33
# R_Shoulder, 34
# R_Elbow, 35
# R_Wrist, 36
# R_Hand, 37-51
# body contact
contact_body_ids = [0, 1, 2, 5, 6, 9, 10, 11, 12, 13, 14, 15, 16, 33, 34, 35]
body_contact_buf = contact_buf[:, contact_body_ids, :].clone()
body_contact = torch.all(torch.abs(body_contact_buf) < 0.1, dim=-1)
body_contact = torch.all(body_contact, dim=-1) # =1 when no contact happens to the body
# object contact
# =1 when contact happens to the object
obj_contact = torch.any(torch.abs(tar_contact_forces[..., 0:2]) > 0.1, dim=-1)
ref_body_contact = torch.ones_like(ref_obj_contact) # no body contact for all time
ecg1 = torch.abs(body_contact - ref_body_contact[:, 0])
rcg1 = torch.exp(-ecg1 * w_cg1)
ecg2 = torch.abs(obj_contact - ref_obj_contact[:, 0])
rcg2 = torch.exp(-ecg2 * w_cg2)
rcg = rcg1 * rcg2
### task-agnostic HOI imitation reward ###
reward = rb * ro * rig * rcg
return reward
@torch.jit.script
def compute_humanoid_reset(reset_buf, progress_buf, contact_buf, rigid_body_pos,
max_episode_length, enable_early_termination, termination_heights, hoi_ref, hoi_obs):
# type: (Tensor, Tensor, Tensor, Tensor, float, bool, Tensor, Tensor, Tensor) -> Tuple[Tensor, Tensor]
terminated = torch.zeros_like(reset_buf)
if (enable_early_termination):
body_height = rigid_body_pos[:, 0, 2] # root height
body_fall = body_height < termination_heights # [4096]
has_failed = body_fall.clone()
has_failed *= (progress_buf > 1)
terminated = torch.where(has_failed, torch.ones_like(reset_buf), terminated)
reset = torch.where(progress_buf >= max_episode_length - 1, torch.ones_like(reset_buf), terminated)
return reset, terminated