# Copyright (c) 2018-2023, NVIDIA Corporation
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# 1. Redistributions of source code must retain the above copyright notice, this
# list of conditions and the following disclaimer.
#
# 2. Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
#
# 3. Neither the name of the copyright holder nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
import os
from collections import OrderedDict
from isaacgym import gymapi
from isaacgym import gymtorch
project_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), '../..'))
from rofunc.learning.RofuncRL.tasks.utils.torch_jit_utils import *
from rofunc.learning.RofuncRL.tasks.isaacgymenv.base.vec_task import VecTask
from types import SimpleNamespace
from collections import deque
from typing import Deque, Dict, Tuple, Union
# python
import enum
import numpy as np
# ################### #
# Dimensions of robot #
# ################### #
[docs]class TrifingerDimensions(enum.Enum):
"""
Dimensions of the tri-finger robot.
Note: While it may not seem necessary for tri-finger robot since it is fixed base, for floating
base systems having this dimensions class is useful.
"""
# general state
# cartesian position + quaternion orientation
PoseDim = 7,
# linear velocity + angular velcoity
VelocityDim = 6
# state: pose + velocity
StateDim = 13
# force + torque
WrenchDim = 6
# for robot
# number of fingers
NumFingers = 3
# for three fingers
JointPositionDim = 9
JointVelocityDim = 9
JointTorqueDim = 9
# generalized coordinates
GeneralizedCoordinatesDim = JointPositionDim
GeneralizedVelocityDim = JointVelocityDim
# for objects
ObjectPoseDim = 7
ObjectVelocityDim = 6
# ################# #
# Different objects #
# ################# #
# radius of the area
ARENA_RADIUS = 0.195
[docs]class CuboidalObject:
"""
Fields for a cuboidal object.
@note Motivation for this class is that if domain randomization is performed over the
size of the cuboid, then its attributes are automatically updated as well.
"""
# 3D radius of the cuboid
radius_3d: float
# distance from wall to the center
max_com_distance_to_center: float
# minimum and mximum height for spawning the object
min_height: float
max_height = 0.1
NumKeypoints = 8
ObjectPositionDim = 3
KeypointsCoordsDim = NumKeypoints * ObjectPositionDim
def __init__(self, size: Union[float, Tuple[float, float, float]]):
"""Initialize the cuboidal object.
Args:
size: The size of the object along x, y, z in meters. If a single float is provided, then it is assumed that
object is a cube.
"""
# decide the size depedning on input type
if isinstance(size, float):
self._size = (size, size, size)
else:
self._size = size
# compute remaining attributes
self.__compute()
"""
Properties
"""
@property
def size(self) -> Tuple[float, float, float]:
"""
Returns the dimensions of the cuboid object (x, y, z) in meters.
"""
return self._size
"""
Configurations
"""
@size.setter
def size(self, size: Union[float, Tuple[float, float, float]]):
""" Set size of the object.
Args:
size: The size of the object along x, y, z in meters. If a single float is provided, then it is assumed
that object is a cube.
"""
# decide the size depedning on input type
if isinstance(size, float):
self._size = (size, size, size)
else:
self._size = size
# compute attributes
self.__compute()
"""
Private members
"""
def __compute(self):
"""Compute the attributes for the object.
"""
# compute 3D radius of the cuboid
max_len = max(self._size)
self.radius_3d = max_len * np.sqrt(3) / 2
# compute distance from wall to the center
self.max_com_distance_to_center = ARENA_RADIUS - self.radius_3d
# minimum height for spawning the object
self.min_height = self._size[2] / 2
[docs]class Trifinger(VecTask):
# constants
# directory where assets for the simulator are present
_trifinger_assets_dir = os.path.join(project_dir, "../../", "assets", "trifinger")
# robot urdf (path relative to `_trifinger_assets_dir`)
_robot_urdf_file = "robot_properties_fingers/urdf/pro/trifingerpro.urdf"
# stage urdf (path relative to `_trifinger_assets_dir`)
# _stage_urdf_file = "robot_properties_fingers/urdf/trifinger_stage.urdf"
_table_urdf_file = "robot_properties_fingers/urdf/table_without_border.urdf"
_boundary_urdf_file = "robot_properties_fingers/urdf/high_table_boundary.urdf"
# object urdf (path relative to `_trifinger_assets_dir`)
# TODO: Make object URDF configurable.
_object_urdf_file = "objects/urdf/cube_multicolor_rrc.urdf"
# physical dimensions of the object
# TODO: Make object dimensions configurable.
_object_dims = CuboidalObject(0.065)
# dimensions of the system
_dims = TrifingerDimensions
# Constants for limits
# Ref: https://github.com/rr-learning/rrc_simulation/blob/master/python/rrc_simulation/trifinger_platform.py#L68
# maximum joint torque (in N-m) applicable on each actuator
_max_torque_Nm = 0.36
# maximum joint velocity (in rad/s) on each actuator
_max_velocity_radps = 10
# History of state: Number of timesteps to save history for
# Note: Currently used only to manage history of object and frame states.
# This can be extended to other observations (as done in ANYmal).
_state_history_len = 2
# buffers to store the simulation data
# goal poses for the object [num. of instances, 7] where 7: (x, y, z, quat)
_object_goal_poses_buf: torch.Tensor
# DOF state of the system [num. of instances, num. of dof, 2] where last index: pos, vel
_dof_state: torch.Tensor
# Rigid body state of the system [num. of instances, num. of bodies, 13] where 13: (x, y, z, quat, v, omega)
_rigid_body_state: torch.Tensor
# Root prim states [num. of actors, 13] where 13: (x, y, z, quat, v, omega)
_actors_root_state: torch.Tensor
# Force-torque sensor array [num. of instances, num. of bodies * wrench]
_ft_sensors_values: torch.Tensor
# DOF position of the system [num. of instances, num. of dof]
_dof_position: torch.Tensor
# DOF velocity of the system [num. of instances, num. of dof]
_dof_velocity: torch.Tensor
# DOF torque of the system [num. of instances, num. of dof]
_dof_torque: torch.Tensor
# Fingertip links state list([num. of instances, num. of fingers, 13]) where 13: (x, y, z, quat, v, omega)
# The length of list is the history of the state: 0: t, 1: t-1, 2: t-2, ... step.
_fingertips_frames_state_history: Deque[torch.Tensor] = deque(maxlen=_state_history_len)
# Object prim state [num. of instances, 13] where 13: (x, y, z, quat, v, omega)
# The length of list is the history of the state: 0: t, 1: t-1, 2: t-2, ... step.
_object_state_history: Deque[torch.Tensor] = deque(maxlen=_state_history_len)
# stores the last action output
_last_action: torch.Tensor
# keeps track of the number of goal resets
_successes: torch.Tensor
# keeps track of number of consecutive successes
_consecutive_successes: float
_robot_limits: dict = {
"joint_position": SimpleNamespace(
# matches those on the real robot
low=np.array([-0.33, 0.0, -2.7] * _dims.NumFingers.value, dtype=np.float32),
high=np.array([1.0, 1.57, 0.0] * _dims.NumFingers.value, dtype=np.float32),
default=np.array([0.0, 0.9, -2.0] * _dims.NumFingers.value, dtype=np.float32),
),
"joint_velocity": SimpleNamespace(
low=np.full(_dims.JointVelocityDim.value, -_max_velocity_radps, dtype=np.float32),
high=np.full(_dims.JointVelocityDim.value, _max_velocity_radps, dtype=np.float32),
default=np.zeros(_dims.JointVelocityDim.value, dtype=np.float32),
),
"joint_torque": SimpleNamespace(
low=np.full(_dims.JointTorqueDim.value, -_max_torque_Nm, dtype=np.float32),
high=np.full(_dims.JointTorqueDim.value, _max_torque_Nm, dtype=np.float32),
default=np.zeros(_dims.JointTorqueDim.value, dtype=np.float32),
),
"fingertip_position": SimpleNamespace(
low=np.array([-0.4, -0.4, 0], dtype=np.float32),
high=np.array([0.4, 0.4, 0.5], dtype=np.float32),
),
"fingertip_orientation": SimpleNamespace(
low=-np.ones(4, dtype=np.float32),
high=np.ones(4, dtype=np.float32),
),
"fingertip_velocity": SimpleNamespace(
low=np.full(_dims.VelocityDim.value, -0.2, dtype=np.float32),
high=np.full(_dims.VelocityDim.value, 0.2, dtype=np.float32),
),
"fingertip_wrench": SimpleNamespace(
low=np.full(_dims.WrenchDim.value, -1.0, dtype=np.float32),
high=np.full(_dims.WrenchDim.value, 1.0, dtype=np.float32),
),
# used if we want to have joint stiffness/damping as parameters`
"joint_stiffness": SimpleNamespace(
low=np.array([1.0, 1.0, 1.0] * _dims.NumFingers.value, dtype=np.float32),
high=np.array([50.0, 50.0, 50.0] * _dims.NumFingers.value, dtype=np.float32),
),
"joint_damping": SimpleNamespace(
low=np.array([0.01, 0.03, 0.0001] * _dims.NumFingers.value, dtype=np.float32),
high=np.array([1.0, 3.0, 0.01] * _dims.NumFingers.value, dtype=np.float32),
),
}
# limits of the object (mapped later: str -> torch.tensor)
_object_limits: dict = {
"position": SimpleNamespace(
low=np.array([-0.3, -0.3, 0], dtype=np.float32),
high=np.array([0.3, 0.3, 0.3], dtype=np.float32),
default=np.array([0, 0, _object_dims.min_height], dtype=np.float32)
),
# difference between two positions
"position_delta": SimpleNamespace(
low=np.array([-0.6, -0.6, 0], dtype=np.float32),
high=np.array([0.6, 0.6, 0.3], dtype=np.float32),
default=np.array([0, 0, 0], dtype=np.float32)
),
"orientation": SimpleNamespace(
low=-np.ones(4, dtype=np.float32),
high=np.ones(4, dtype=np.float32),
default=np.array([0.0, 0.0, 0.0, 1.0], dtype=np.float32),
),
"velocity": SimpleNamespace(
low=np.full(_dims.VelocityDim.value, -0.5, dtype=np.float32),
high=np.full(_dims.VelocityDim.value, 0.5, dtype=np.float32),
default=np.zeros(_dims.VelocityDim.value, dtype=np.float32)
),
"scale": SimpleNamespace(
low=np.full(1, 0.0, dtype=np.float32),
high=np.full(1, 1.0, dtype=np.float32),
),
}
# PD gains for the robot (mapped later: str -> torch.tensor)
# Ref: https://github.com/rr-learning/rrc_simulation/blob/master/python/rrc_simulation/sim_finger.py#L49-L65
_robot_dof_gains = {
# The kp and kd gains of the PD control of the fingers.
# Note: This depends on simulation step size and is set for a rate of 250 Hz.
"stiffness": [10.0, 10.0, 10.0] * _dims.NumFingers.value,
"damping": [0.1, 0.3, 0.001] * _dims.NumFingers.value,
# The kd gains used for damping the joint motor velocities during the
# safety torque check on the joint motors.
"safety_damping": [0.08, 0.08, 0.04] * _dims.NumFingers.value
}
action_dim = _dims.JointTorqueDim.value
def __init__(self, cfg, rl_device, sim_device, graphics_device_id, headless, virtual_screen_capture, force_render):
self.cfg = cfg
self.obs_spec = {
"robot_q": self._dims.GeneralizedCoordinatesDim.value,
"robot_u": self._dims.GeneralizedVelocityDim.value,
"object_q": self._dims.ObjectPoseDim.value,
"object_q_des": self._dims.ObjectPoseDim.value,
"command": self.action_dim
}
if self.cfg["env"]["asymmetric_obs"]:
self.state_spec = {
# observations spec
**self.obs_spec,
# extra observations (added separately to make computations simpler)
"object_u": self._dims.ObjectVelocityDim.value,
"fingertip_state": self._dims.NumFingers.value * self._dims.StateDim.value,
"robot_a": self._dims.GeneralizedVelocityDim.value,
"fingertip_wrench": self._dims.NumFingers.value * self._dims.WrenchDim.value,
}
else:
self.state_spec = self.obs_spec
self.action_spec = {
"command": self.action_dim
}
self.cfg["env"]["numObservations"] = sum(self.obs_spec.values())
self.cfg["env"]["numStates"] = sum(self.state_spec.values())
self.cfg["env"]["numActions"] = sum(self.action_spec.values())
self.max_episode_length = self.cfg["env"]["episodeLength"]
self.randomize = self.cfg["task"]["randomize"]
self.randomization_params = self.cfg["task"]["randomization_params"]
# define prims present in the scene
prim_names = ["robot", "table", "boundary", "object", "goal_object"]
# mapping from name to asset instance
self.gym_assets = dict.fromkeys(prim_names)
# mapping from name to gym indices
self.gym_indices = dict.fromkeys(prim_names)
# mapping from name to gym rigid body handles
# name of finger tips links i.e. end-effector frames
fingertips_frames = ["finger_tip_link_0", "finger_tip_link_120", "finger_tip_link_240"]
self._fingertips_handles = OrderedDict.fromkeys(fingertips_frames, None)
# mapping from name to gym dof index
robot_dof_names = list()
for finger_pos in ['0', '120', '240']:
robot_dof_names += [f'finger_base_to_upper_joint_{finger_pos}',
f'finger_upper_to_middle_joint_{finger_pos}',
f'finger_middle_to_lower_joint_{finger_pos}']
self._robot_dof_indices = OrderedDict.fromkeys(robot_dof_names, None)
super().__init__(config=self.cfg, rl_device=rl_device, sim_device=sim_device,
graphics_device_id=graphics_device_id, headless=headless,
virtual_screen_capture=virtual_screen_capture, force_render=force_render)
if self.viewer != None:
cam_pos = gymapi.Vec3(0.7, 0.0, 0.7)
cam_target = gymapi.Vec3(0.0, 0.0, 0.0)
self.gym.viewer_camera_look_at(self.viewer, None, cam_pos, cam_target)
# change constant buffers from numpy/lists into torch tensors
# limits for robot
for limit_name in self._robot_limits:
# extract limit simple-namespace
limit_dict = self._robot_limits[limit_name].__dict__
# iterate over namespace attributes
for prop, value in limit_dict.items():
limit_dict[prop] = torch.tensor(value, dtype=torch.float, device=self.device)
# limits for the object
for limit_name in self._object_limits:
# extract limit simple-namespace
limit_dict = self._object_limits[limit_name].__dict__
# iterate over namespace attributes
for prop, value in limit_dict.items():
limit_dict[prop] = torch.tensor(value, dtype=torch.float, device=self.device)
# PD gains for actuation
for gain_name, value in self._robot_dof_gains.items():
self._robot_dof_gains[gain_name] = torch.tensor(value, dtype=torch.float, device=self.device)
# store the sampled goal poses for the object: [num. of instances, 7]
self._object_goal_poses_buf = torch.zeros((self.num_envs, 7), device=self.device, dtype=torch.float)
# get force torque sensor if enabled
if self.cfg["env"]["enable_ft_sensors"] or self.cfg["env"]["asymmetric_obs"]:
# # joint torques
# dof_force_tensor = self.gym.acquire_dof_force_tensor(self.sim)
# self._dof_torque = gymtorch.wrap_tensor(dof_force_tensor).view(self.num_envs,
# self._dims.JointTorqueDim.value)
# # force-torque sensor
num_ft_dims = self._dims.NumFingers.value * self._dims.WrenchDim.value
# sensor_tensor = self.gym.acquire_force_sensor_tensor(self.sim)
# self._ft_sensors_values = gymtorch.wrap_tensor(sensor_tensor).view(self.num_envs, num_ft_dims)
sensor_tensor = self.gym.acquire_force_sensor_tensor(self.sim)
self._ft_sensors_values = gymtorch.wrap_tensor(sensor_tensor).view(self.num_envs, num_ft_dims)
dof_force_tensor = self.gym.acquire_dof_force_tensor(self.sim)
self._dof_torque = gymtorch.wrap_tensor(dof_force_tensor).view(self.num_envs,
self._dims.JointTorqueDim.value)
# get gym GPU state tensors
actor_root_state_tensor = self.gym.acquire_actor_root_state_tensor(self.sim)
dof_state_tensor = self.gym.acquire_dof_state_tensor(self.sim)
rigid_body_tensor = self.gym.acquire_rigid_body_state_tensor(self.sim)
# refresh the buffer (to copy memory?)
self.gym.refresh_actor_root_state_tensor(self.sim)
self.gym.refresh_dof_state_tensor(self.sim)
self.gym.refresh_rigid_body_state_tensor(self.sim)
# create wrapper tensors for reference (consider everything as pointer to actual memory)
# DOF
self._dof_state = gymtorch.wrap_tensor(dof_state_tensor).view(self.num_envs, -1, 2)
self._dof_position = self._dof_state[..., 0]
self._dof_velocity = self._dof_state[..., 1]
# rigid body
self._rigid_body_state = gymtorch.wrap_tensor(rigid_body_tensor).view(self.num_envs, -1, 13)
# root actors
self._actors_root_state = gymtorch.wrap_tensor(actor_root_state_tensor).view(-1, 13)
# frames history
action_dim = sum(self.action_spec.values())
self._last_action = torch.zeros(self.num_envs, action_dim, dtype=torch.float, device=self.device)
fingertip_handles_indices = list(self._fingertips_handles.values())
object_indices = self.gym_indices["object"]
# timestep 0 is current tensor
curr_history_length = 0
while curr_history_length < self._state_history_len:
# add tensors to history list
print(self._rigid_body_state.shape)
self._fingertips_frames_state_history.append(self._rigid_body_state[:, fingertip_handles_indices])
self._object_state_history.append(self._actors_root_state[object_indices])
# update current history length
curr_history_length += 1
self._observations_scale = SimpleNamespace(low=None, high=None)
self._states_scale = SimpleNamespace(low=None, high=None)
self._action_scale = SimpleNamespace(low=None, high=None)
self._successes = torch.zeros(self.num_envs, device=self.device, dtype=torch.long)
self._successes_pos = torch.zeros(self.num_envs, device=self.device, dtype=torch.long)
self._successes_quat = torch.zeros(self.num_envs, device=self.device, dtype=torch.long)
self.__configure_mdp_spaces()
[docs] def create_sim(self):
self.up_axis_idx = 2 # index of up axis: Y=1, Z=2
self.sim = super().create_sim(self.device_id, self.graphics_device_id, self.physics_engine, self.sim_params)
self._create_ground_plane()
self._create_scene_assets()
self._create_envs(self.num_envs, self.cfg["env"]["envSpacing"], int(np.sqrt(self.num_envs)))
# If randomizing, apply once immediately on startup before the fist sim step
if self.randomize:
self.apply_randomizations(self.randomization_params)
def _create_ground_plane(self):
plane_params = gymapi.PlaneParams()
plane_params.normal = gymapi.Vec3(0.0, 0.0, 1.0)
plane_params.distance = 0.013
plane_params.static_friction = 1.0
plane_params.dynamic_friction = 1.0
self.gym.add_ground(self.sim, plane_params)
def _create_scene_assets(self):
""" Define Gym assets for stage, robot and object.
"""
# define assets
self.gym_assets["robot"] = self.__define_robot_asset()
self.gym_assets["table"] = self.__define_table_asset()
self.gym_assets["boundary"] = self.__define_boundary_asset()
self.gym_assets["object"] = self.__define_object_asset()
self.gym_assets["goal_object"] = self.__define_goal_object_asset()
# display the properties (only for debugging)
# robot
print("Trifinger Robot Asset: ")
print(f'\t Number of bodies: {self.gym.get_asset_rigid_body_count(self.gym_assets["robot"])}')
print(f'\t Number of shapes: {self.gym.get_asset_rigid_shape_count(self.gym_assets["robot"])}')
print(f'\t Number of dofs: {self.gym.get_asset_dof_count(self.gym_assets["robot"])}')
print(f'\t Number of actuated dofs: {self._dims.JointTorqueDim.value}')
# stage
print("Trifinger Table Asset: ")
print(f'\t Number of bodies: {self.gym.get_asset_rigid_body_count(self.gym_assets["table"])}')
print(f'\t Number of shapes: {self.gym.get_asset_rigid_shape_count(self.gym_assets["table"])}')
print("Trifinger Boundary Asset: ")
print(f'\t Number of bodies: {self.gym.get_asset_rigid_body_count(self.gym_assets["boundary"])}')
print(f'\t Number of shapes: {self.gym.get_asset_rigid_shape_count(self.gym_assets["boundary"])}')
def _create_envs(self, num_envs, spacing, num_per_row):
# define the dof properties for the robot
robot_dof_props = self.gym.get_asset_dof_properties(self.gym_assets["robot"])
# set dof properites based on the control mode
for k, dof_index in enumerate(self._robot_dof_indices.values()):
# note: since safety checks are employed, the simulator PD controller is not
# used. Instead the torque is computed manually and applied, even if the
# command mode is 'position'.
robot_dof_props['driveMode'][dof_index] = gymapi.DOF_MODE_EFFORT
robot_dof_props['stiffness'][dof_index] = 0.0
robot_dof_props['damping'][dof_index] = 0.0
# set dof limits
robot_dof_props['effort'][dof_index] = self._max_torque_Nm
robot_dof_props['velocity'][dof_index] = self._max_velocity_radps
robot_dof_props['lower'][dof_index] = float(self._robot_limits["joint_position"].low[k])
robot_dof_props['upper'][dof_index] = float(self._robot_limits["joint_position"].high[k])
self.envs = []
# define lower and upper region bound for each environment
env_lower_bound = gymapi.Vec3(-self.cfg["env"]["envSpacing"], -self.cfg["env"]["envSpacing"], 0.0)
env_upper_bound = gymapi.Vec3(self.cfg["env"]["envSpacing"], self.cfg["env"]["envSpacing"],
self.cfg["env"]["envSpacing"])
num_envs_per_row = int(np.sqrt(self.num_envs))
# initialize gym indices buffer as a list
# note: later the list is converted to torch tensor for ease in interfacing with IsaacGym.
for asset_name in self.gym_indices.keys():
self.gym_indices[asset_name] = list()
# count number of shapes and bodies
max_agg_bodies = 0
max_agg_shapes = 0
for asset in self.gym_assets.values():
max_agg_bodies += self.gym.get_asset_rigid_body_count(asset)
max_agg_shapes += self.gym.get_asset_rigid_shape_count(asset)
# iterate and create environment instances
for env_index in range(self.num_envs):
# create environment
env_ptr = self.gym.create_env(self.sim, env_lower_bound, env_upper_bound, num_envs_per_row)
# begin aggregration mode if enabled - this can improve simulation performance
if self.cfg["env"]["aggregate_mode"]:
self.gym.begin_aggregate(env_ptr, max_agg_bodies, max_agg_shapes, True)
# add trifinger robot to environment
trifinger_actor = self.gym.create_actor(env_ptr, self.gym_assets["robot"], gymapi.Transform(),
"robot", env_index, 0, 0)
trifinger_idx = self.gym.get_actor_index(env_ptr, trifinger_actor, gymapi.DOMAIN_SIM)
# add table to environment
table_handle = self.gym.create_actor(env_ptr, self.gym_assets["table"], gymapi.Transform(),
"table", env_index, 1, 0)
table_idx = self.gym.get_actor_index(env_ptr, table_handle, gymapi.DOMAIN_SIM)
# add stage to environment
boundary_handle = self.gym.create_actor(env_ptr, self.gym_assets["boundary"], gymapi.Transform(),
"boundary", env_index, 1, 0)
boundary_idx = self.gym.get_actor_index(env_ptr, boundary_handle, gymapi.DOMAIN_SIM)
# add object to environment
object_handle = self.gym.create_actor(env_ptr, self.gym_assets["object"], gymapi.Transform(),
"object", env_index, 0, 0)
object_idx = self.gym.get_actor_index(env_ptr, object_handle, gymapi.DOMAIN_SIM)
# add goal object to environment
goal_handle = self.gym.create_actor(env_ptr, self.gym_assets["goal_object"], gymapi.Transform(),
"goal_object", env_index + self.num_envs, 0, 0)
goal_object_idx = self.gym.get_actor_index(env_ptr, goal_handle, gymapi.DOMAIN_SIM)
# change settings of DOF
self.gym.set_actor_dof_properties(env_ptr, trifinger_actor, robot_dof_props)
# add color to instances
stage_color = gymapi.Vec3(0.73, 0.68, 0.72)
self.gym.set_rigid_body_color(env_ptr, table_handle, 0, gymapi.MESH_VISUAL_AND_COLLISION, stage_color)
self.gym.set_rigid_body_color(env_ptr, boundary_handle, 0, gymapi.MESH_VISUAL_AND_COLLISION, stage_color)
# end aggregation mode if enabled
if self.cfg["env"]["aggregate_mode"]:
self.gym.end_aggregate(env_ptr)
# add instances to list
self.envs.append(env_ptr)
self.gym_indices["robot"].append(trifinger_idx)
self.gym_indices["table"].append(table_idx)
self.gym_indices["boundary"].append(boundary_idx)
self.gym_indices["object"].append(object_idx)
self.gym_indices["goal_object"].append(goal_object_idx)
# convert gym indices from list to tensor
for asset_name, asset_indices in self.gym_indices.items():
self.gym_indices[asset_name] = torch.tensor(asset_indices, dtype=torch.long, device=self.device)
def __configure_mdp_spaces(self):
"""
Configures the observations, state and action spaces.
"""
# Action scale for the MDP
# Note: This is order sensitive.
if self.cfg["env"]["command_mode"] == "position":
# action space is joint positions
self._action_scale.low = self._robot_limits["joint_position"].low
self._action_scale.high = self._robot_limits["joint_position"].high
elif self.cfg["env"]["command_mode"] == "torque":
# action space is joint torques
self._action_scale.low = self._robot_limits["joint_torque"].low
self._action_scale.high = self._robot_limits["joint_torque"].high
else:
msg = f"Invalid command mode. Input: {self.cfg['env']['command_mode']} not in ['torque', 'position']."
raise ValueError(msg)
# Observations scale for the MDP
# check if policy outputs normalized action [-1, 1] or not.
if self.cfg["env"]["normalize_action"]:
obs_action_scale = SimpleNamespace(
low=torch.full((self.action_dim,), -1, dtype=torch.float, device=self.device),
high=torch.full((self.action_dim,), 1, dtype=torch.float, device=self.device)
)
else:
obs_action_scale = self._action_scale
object_obs_low = torch.cat([
self._object_limits["position"].low,
self._object_limits["orientation"].low,
] * 2)
object_obs_high = torch.cat([
self._object_limits["position"].high,
self._object_limits["orientation"].high,
] * 2)
# Note: This is order sensitive.
self._observations_scale.low = torch.cat([
self._robot_limits["joint_position"].low,
self._robot_limits["joint_velocity"].low,
object_obs_low,
obs_action_scale.low
])
self._observations_scale.high = torch.cat([
self._robot_limits["joint_position"].high,
self._robot_limits["joint_velocity"].high,
object_obs_high,
obs_action_scale.high
])
# State scale for the MDP
if self.cfg["env"]["asymmetric_obs"]:
# finger tip scaling
fingertip_state_scale = SimpleNamespace(
low=torch.cat([
self._robot_limits["fingertip_position"].low,
self._robot_limits["fingertip_orientation"].low,
self._robot_limits["fingertip_velocity"].low,
]),
high=torch.cat([
self._robot_limits["fingertip_position"].high,
self._robot_limits["fingertip_orientation"].high,
self._robot_limits["fingertip_velocity"].high,
])
)
states_low = [
self._observations_scale.low,
self._object_limits["velocity"].low,
fingertip_state_scale.low.repeat(self._dims.NumFingers.value),
self._robot_limits["joint_torque"].low,
self._robot_limits["fingertip_wrench"].low.repeat(self._dims.NumFingers.value),
]
states_high = [
self._observations_scale.high,
self._object_limits["velocity"].high,
fingertip_state_scale.high.repeat(self._dims.NumFingers.value),
self._robot_limits["joint_torque"].high,
self._robot_limits["fingertip_wrench"].high.repeat(self._dims.NumFingers.value),
]
# Note: This is order sensitive.
self._states_scale.low = torch.cat(states_low)
self._states_scale.high = torch.cat(states_high)
# check that dimensions of scalings are correct
# count number of dimensions
state_dim = sum(self.state_spec.values())
obs_dim = sum(self.obs_spec.values())
action_dim = sum(self.action_spec.values())
# check that dimensions match
# observations
if self._observations_scale.low.shape[0] != obs_dim or self._observations_scale.high.shape[0] != obs_dim:
msg = f"Observation scaling dimensions mismatch. " \
f"\tLow: {self._observations_scale.low.shape[0]}, " \
f"\tHigh: {self._observations_scale.high.shape[0]}, " \
f"\tExpected: {obs_dim}."
raise AssertionError(msg)
# state
if self.cfg["env"]["asymmetric_obs"] \
and (self._states_scale.low.shape[0] != state_dim or self._states_scale.high.shape[0] != state_dim):
msg = f"States scaling dimensions mismatch. " \
f"\tLow: {self._states_scale.low.shape[0]}, " \
f"\tHigh: {self._states_scale.high.shape[0]}, " \
f"\tExpected: {state_dim}."
raise AssertionError(msg)
# actions
if self._action_scale.low.shape[0] != action_dim or self._action_scale.high.shape[0] != action_dim:
msg = f"Actions scaling dimensions mismatch. " \
f"\tLow: {self._action_scale.low.shape[0]}, " \
f"\tHigh: {self._action_scale.high.shape[0]}, " \
f"\tExpected: {action_dim}."
raise AssertionError(msg)
# print the scaling
print(f'MDP Raw observation bounds\n'
f'\tLow: {self._observations_scale.low}\n'
f'\tHigh: {self._observations_scale.high}')
print(f'MDP Raw state bounds\n'
f'\tLow: {self._states_scale.low}\n'
f'\tHigh: {self._states_scale.high}')
print(f'MDP Raw action bounds\n'
f'\tLow: {self._action_scale.low}\n'
f'\tHigh: {self._action_scale.high}')
[docs] def compute_reward(self, actions):
self.rew_buf[:] = 0.
self.reset_buf[:] = 0.
self.rew_buf[:], self.reset_buf[:], log_dict = compute_trifinger_reward(
self.obs_buf,
self.reset_buf,
self.progress_buf,
self.max_episode_length,
self.cfg["sim"]["dt"],
self.cfg["env"]["reward_terms"]["finger_move_penalty"]["weight"],
self.cfg["env"]["reward_terms"]["finger_reach_object_rate"]["weight"],
self.cfg["env"]["reward_terms"]["object_dist"]["weight"],
self.cfg["env"]["reward_terms"]["object_rot"]["weight"],
self.env_steps_count,
self._object_goal_poses_buf,
self._object_state_history[0],
self._object_state_history[1],
self._fingertips_frames_state_history[0],
self._fingertips_frames_state_history[1],
self.cfg["env"]["reward_terms"]["keypoints_dist"]["activate"]
)
self.extras.update({"env/rewards/" + k: v.mean() for k, v in log_dict.items()})
[docs] def compute_observations(self):
# refresh memory buffers
self.gym.refresh_dof_state_tensor(self.sim)
self.gym.refresh_actor_root_state_tensor(self.sim)
self.gym.refresh_rigid_body_state_tensor(self.sim)
if self.cfg["env"]["enable_ft_sensors"] or self.cfg["env"]["asymmetric_obs"]:
self.gym.refresh_dof_force_tensor(self.sim)
self.gym.refresh_force_sensor_tensor(self.sim)
joint_torques = self._dof_torque
tip_wrenches = self._ft_sensors_values
else:
joint_torques = torch.zeros(self.num_envs, self._dims.JointTorqueDim.value, dtype=torch.float32,
device=self.device)
tip_wrenches = torch.zeros(self.num_envs, self._dims.NumFingers.value * self._dims.WrenchDim.value,
dtype=torch.float32, device=self.device)
# extract frame handles
fingertip_handles_indices = list(self._fingertips_handles.values())
object_indices = self.gym_indices["object"]
# update state histories
self._fingertips_frames_state_history.appendleft(self._rigid_body_state[:, fingertip_handles_indices])
self._object_state_history.appendleft(self._actors_root_state[object_indices])
# fill the observations and states buffer
self.obs_buf[:], self.states_buf[:] = compute_trifinger_observations_states(
self.cfg["env"]["asymmetric_obs"],
self._dof_position,
self._dof_velocity,
self._object_state_history[0],
self._object_goal_poses_buf,
self.actions,
self._fingertips_frames_state_history[0],
joint_torques,
tip_wrenches,
)
# normalize observations if flag is enabled
if self.cfg["env"]["normalize_obs"]:
# for normal obs
self.obs_buf = scale_transform(
self.obs_buf,
lower=self._observations_scale.low,
upper=self._observations_scale.high
)
[docs] def reset_idx(self, env_ids):
# randomization can happen only at reset time, since it can reset actor positions on GPU
if self.randomize:
self.apply_randomizations(self.randomization_params)
# A) Reset episode stats buffers
self.reset_buf[env_ids] = 0
self.progress_buf[env_ids] = 0
self._successes[env_ids] = 0
self._successes_pos[env_ids] = 0
self._successes_quat[env_ids] = 0
# B) Various randomizations at the start of the episode:
# -- Robot base position.
# -- Stage position.
# -- Coefficient of restituion and friction for robot, object, stage.
# -- Mass and size of the object
# -- Mass of robot links
# -- Robot joint state
robot_initial_state_config = self.cfg["env"]["reset_distribution"]["robot_initial_state"]
self._sample_robot_state(
env_ids,
distribution=robot_initial_state_config["type"],
dof_pos_stddev=robot_initial_state_config["dof_pos_stddev"],
dof_vel_stddev=robot_initial_state_config["dof_vel_stddev"]
)
# -- Sampling of initial pose of the object
object_initial_state_config = self.cfg["env"]["reset_distribution"]["object_initial_state"]
self._sample_object_poses(
env_ids,
distribution=object_initial_state_config["type"],
)
# -- Sampling of goal pose of the object
self._sample_object_goal_poses(
env_ids,
difficulty=self.cfg["env"]["task_difficulty"]
)
# C) Extract trifinger indices to reset
robot_indices = self.gym_indices["robot"][env_ids].to(torch.int32)
object_indices = self.gym_indices["object"][env_ids].to(torch.int32)
goal_object_indices = self.gym_indices["goal_object"][env_ids].to(torch.int32)
all_indices = torch.unique(torch.cat([robot_indices, object_indices, goal_object_indices]))
# D) Set values into simulator
# -- DOF
self.gym.set_dof_state_tensor_indexed(self.sim, gymtorch.unwrap_tensor(self._dof_state),
gymtorch.unwrap_tensor(robot_indices), len(robot_indices))
# -- actor root states
self.gym.set_actor_root_state_tensor_indexed(self.sim, gymtorch.unwrap_tensor(self._actors_root_state),
gymtorch.unwrap_tensor(all_indices), len(all_indices))
def _sample_robot_state(self, instances: torch.Tensor, distribution: str = 'default',
dof_pos_stddev: float = 0.0, dof_vel_stddev: float = 0.0):
"""Samples the robot DOF state based on the settings.
Type of robot initial state distribution: ["default", "random"]
- "default" means that robot is in default configuration.
- "random" means that noise is added to default configuration
- "none" means that robot is configuration is not reset between episodes.
Args:
instances: A tensor constraining indices of environment instances to reset.
distribution: Name of distribution to sample initial state from: ['default', 'random']
dof_pos_stddev: Noise scale to DOF position (used if 'type' is 'random')
dof_vel_stddev: Noise scale to DOF velocity (used if 'type' is 'random')
"""
# number of samples to generate
num_samples = instances.size()[0]
# sample dof state based on distribution type
if distribution == "none":
return
elif distribution == "default":
# set to default configuration
self._dof_position[instances] = self._robot_limits["joint_position"].default
self._dof_velocity[instances] = self._robot_limits["joint_velocity"].default
elif distribution == "random":
# sample uniform random from (-1, 1)
dof_state_dim = self._dims.JointPositionDim.value + self._dims.JointVelocityDim.value
dof_state_noise = 2 * torch.rand((num_samples, dof_state_dim,), dtype=torch.float,
device=self.device) - 1
# set to default configuration
self._dof_position[instances] = self._robot_limits["joint_position"].default
self._dof_velocity[instances] = self._robot_limits["joint_velocity"].default
# add noise
# DOF position
start_offset = 0
end_offset = self._dims.JointPositionDim.value
self._dof_position[instances] += dof_pos_stddev * dof_state_noise[:, start_offset:end_offset]
# DOF velocity
start_offset = end_offset
end_offset += self._dims.JointVelocityDim.value
self._dof_velocity[instances] += dof_vel_stddev * dof_state_noise[:, start_offset:end_offset]
else:
msg = f"Invalid robot initial state distribution. Input: {distribution} not in [`default`, `random`]."
raise ValueError(msg)
# reset robot fingertips state history
for idx in range(1, self._state_history_len):
self._fingertips_frames_state_history[idx][instances] = 0.0
def _sample_object_poses(self, instances: torch.Tensor, distribution: str):
"""Sample poses for the cube.
Type of distribution: ["default", "random", "none"]
- "default" means that pose is default configuration.
- "random" means that pose is randomly sampled on the table.
- "none" means no resetting of object pose between episodes.
Args:
instances: A tensor constraining indices of environment instances to reset.
distribution: Name of distribution to sample initial state from: ['default', 'random']
"""
# number of samples to generate
num_samples = instances.size()[0]
# sample poses based on distribution type
if distribution == "none":
return
elif distribution == "default":
pos_x, pos_y, pos_z = self._object_limits["position"].default
orientation = self._object_limits["orientation"].default
elif distribution == "random":
# For initialization
pos_x, pos_y = random_xy(num_samples, self._object_dims.max_com_distance_to_center, self.device)
# add a small offset to the height to account for scale randomisation (prevent ground intersection)
pos_z = self._object_dims.size[2] / 2 + 0.0015
orientation = random_yaw_orientation(num_samples, self.device)
else:
msg = f"Invalid object initial state distribution. Input: {distribution} " \
"not in [`default`, `random`, `none`]."
raise ValueError(msg)
# set buffers into simulator
# extract indices for goal object
object_indices = self.gym_indices["object"][instances]
# set values into buffer
# object buffer
self._object_state_history[0][instances, 0] = pos_x
self._object_state_history[0][instances, 1] = pos_y
self._object_state_history[0][instances, 2] = pos_z
self._object_state_history[0][instances, 3:7] = orientation
self._object_state_history[0][instances, 7:13] = 0
# reset object state history
for idx in range(1, self._state_history_len):
self._object_state_history[idx][instances] = 0.0
# root actor buffer
self._actors_root_state[object_indices] = self._object_state_history[0][instances]
def _sample_object_goal_poses(self, instances: torch.Tensor, difficulty: int):
"""Sample goal poses for the cube and sets them into the desired goal pose buffer.
Args:
instances: A tensor constraining indices of environment instances to reset.
difficulty: Difficulty level. The higher, the more difficult is the goal.
Possible levels are:
- -1: Random goal position on the table, including yaw orientation.
- 1: Random goal position on the table, no orientation.
- 2: Fixed goal position in the air with x,y = 0. No orientation.
- 3: Random goal position in the air, no orientation.
- 4: Random goal pose in the air, including orientation.
"""
# number of samples to generate
num_samples = instances.size()[0]
# sample poses based on task difficulty
if difficulty == -1:
# For initialization
pos_x, pos_y = random_xy(num_samples, self._object_dims.max_com_distance_to_center, self.device)
pos_z = self._object_dims.size[2] / 2
orientation = random_yaw_orientation(num_samples, self.device)
elif difficulty == 1:
# Random goal position on the table, no orientation.
pos_x, pos_y = random_xy(num_samples, self._object_dims.max_com_distance_to_center, self.device)
pos_z = self._object_dims.size[2] / 2
orientation = default_orientation(num_samples, self.device)
elif difficulty == 2:
# Fixed goal position in the air with x,y = 0. No orientation.
pos_x, pos_y = 0.0, 0.0
pos_z = self._object_dims.min_height + 0.05
orientation = default_orientation(num_samples, self.device)
elif difficulty == 3:
# Random goal position in the air, no orientation.
pos_x, pos_y = random_xy(num_samples, self._object_dims.max_com_distance_to_center, self.device)
pos_z = random_z(num_samples, self._object_dims.min_height, self._object_dims.max_height, self.device)
orientation = default_orientation(num_samples, self.device)
elif difficulty == 4:
# Random goal pose in the air, including orientation.
# Note: Set minimum height such that the cube does not intersect with the
# ground in any orientation
max_goal_radius = self._object_dims.max_com_distance_to_center
max_height = self._object_dims.max_height
orientation = random_orientation(num_samples, self.device)
# pick x, y, z according to the maximum height / radius at the current point
# in the cirriculum
pos_x, pos_y = random_xy(num_samples, max_goal_radius, self.device)
pos_z = random_z(num_samples, self._object_dims.radius_3d, max_height, self.device)
else:
msg = f"Invalid difficulty index for task: {difficulty}."
raise ValueError(msg)
# extract indices for goal object
goal_object_indices = self.gym_indices["goal_object"][instances]
# set values into buffer
# object goal buffer
self._object_goal_poses_buf[instances, 0] = pos_x
self._object_goal_poses_buf[instances, 1] = pos_y
self._object_goal_poses_buf[instances, 2] = pos_z
self._object_goal_poses_buf[instances, 3:7] = orientation
# root actor buffer
self._actors_root_state[goal_object_indices, 0:7] = self._object_goal_poses_buf[instances]
# self._actors_root_state[goal_object_indices, 2] = -10
[docs] def pre_physics_step(self, actions):
env_ids = self.reset_buf.nonzero(as_tuple=False).flatten()
if len(env_ids) > 0:
self.reset_idx(env_ids)
self.gym.simulate(self.sim)
self.actions = actions.clone().to(self.device)
# if normalized_action is true, then denormalize them.
if self.cfg["env"]["normalize_action"]:
# TODO: Default action should correspond to normalized value of 0.
action_transformed = unscale_transform(
self.actions,
lower=self._action_scale.low,
upper=self._action_scale.high
)
else:
action_transformed = self.actions
# compute command on the basis of mode selected
if self.cfg["env"]["command_mode"] == 'torque':
# command is the desired joint torque
computed_torque = action_transformed
elif self.cfg["env"]["command_mode"] == 'position':
# command is the desired joint positions
desired_dof_position = action_transformed
# compute torque to apply
computed_torque = self._robot_dof_gains["stiffness"] * (desired_dof_position - self._dof_position)
computed_torque -= self._robot_dof_gains["damping"] * self._dof_velocity
else:
msg = f"Invalid command mode. Input: {self.cfg['env']['command_mode']} not in ['torque', 'position']."
raise ValueError(msg)
# apply clamping of computed torque to actuator limits
applied_torque = saturate(
computed_torque,
lower=self._robot_limits["joint_torque"].low,
upper=self._robot_limits["joint_torque"].high
)
# apply safety damping and clamping of the action torque if enabled
if self.cfg["env"]["apply_safety_damping"]:
# apply damping by joint velocity
applied_torque -= self._robot_dof_gains["safety_damping"] * self._dof_velocity
# clamp input
applied_torque = saturate(
applied_torque,
lower=self._robot_limits["joint_torque"].low,
upper=self._robot_limits["joint_torque"].high
)
# set computed torques to simulator buffer.
self.gym.set_dof_actuation_force_tensor(self.sim, gymtorch.unwrap_tensor(applied_torque))
[docs] def post_physics_step(self):
self._step_info = {}
self.progress_buf += 1
self.randomize_buf += 1
self.compute_observations()
self.compute_reward(self.actions)
# check termination conditions (success only)
self._check_termination()
if torch.sum(self.reset_buf) > 0:
self._step_info['consecutive_successes'] = np.mean(self._successes.float().cpu().numpy())
self._step_info['consecutive_successes_pos'] = np.mean(self._successes_pos.float().cpu().numpy())
self._step_info['consecutive_successes_quat'] = np.mean(self._successes_quat.float().cpu().numpy())
def _check_termination(self):
"""Check whether the episode is done per environment.
"""
# Extract configuration for termination conditions
termination_config = self.cfg["env"]["termination_conditions"]
# Termination condition - successful completion
# Calculate distance between current object and goal
object_goal_position_dist = torch.norm(
self._object_goal_poses_buf[:, 0:3] - self._object_state_history[0][:, 0:3],
p=2, dim=-1
)
# log theoretical number of r eseats
goal_position_reset = torch.le(object_goal_position_dist,
termination_config["success"]["position_tolerance"])
self._step_info['env/current_position_goal/per_env'] = np.mean(goal_position_reset.float().cpu().numpy())
# For task with difficulty 4, we need to check if orientation matches as well.
# Compute the difference in orientation between object and goal pose
object_goal_orientation_dist = quat_diff_rad(self._object_state_history[0][:, 3:7],
self._object_goal_poses_buf[:, 3:7])
# Check for distance within tolerance
goal_orientation_reset = torch.le(object_goal_orientation_dist,
termination_config["success"]["orientation_tolerance"])
self._step_info['env/current_orientation_goal/per_env'] = np.mean(goal_orientation_reset.float().cpu().numpy())
if self.cfg["env"]['task_difficulty'] < 4:
# Check for task completion if position goal is within a threshold
task_completion_reset = goal_position_reset
elif self.cfg["env"]['task_difficulty'] == 4:
# Check for task completion if both position + orientation goal is within a threshold
task_completion_reset = torch.logical_and(goal_position_reset, goal_orientation_reset)
else:
# Check for task completion if both orientation goal is within a threshold
task_completion_reset = goal_orientation_reset
self._successes = task_completion_reset
self._successes_pos = goal_position_reset
self._successes_quat = goal_orientation_reset
"""
Helper functions - define assets
"""
def __define_robot_asset(self):
""" Define Gym asset for robot.
"""
# define tri-finger asset
robot_asset_options = gymapi.AssetOptions()
robot_asset_options.flip_visual_attachments = False
robot_asset_options.fix_base_link = True
robot_asset_options.collapse_fixed_joints = False
robot_asset_options.disable_gravity = False
robot_asset_options.default_dof_drive_mode = gymapi.DOF_MODE_EFFORT
robot_asset_options.thickness = 0.001
robot_asset_options.angular_damping = 0.01
robot_asset_options.vhacd_enabled = True
robot_asset_options.vhacd_params = gymapi.VhacdParams()
robot_asset_options.vhacd_params.resolution = 100000
robot_asset_options.vhacd_params.concavity = 0.0025
robot_asset_options.vhacd_params.alpha = 0.04
robot_asset_options.vhacd_params.beta = 1.0
robot_asset_options.vhacd_params.convex_hull_downsampling = 4
robot_asset_options.vhacd_params.max_num_vertices_per_ch = 256
if self.physics_engine == gymapi.SIM_PHYSX:
robot_asset_options.use_physx_armature = True
# load tri-finger asset
trifinger_asset = self.gym.load_asset(self.sim, self._trifinger_assets_dir,
self._robot_urdf_file, robot_asset_options)
# set the link properties for the robot
# Ref: https://github.com/rr-learning/rrc_simulation/blob/master/python/rrc_simulation/sim_finger.py#L563
trifinger_props = self.gym.get_asset_rigid_shape_properties(trifinger_asset)
for p in trifinger_props:
p.friction = 1.0
p.torsion_friction = 1.0
p.restitution = 0.8
self.gym.set_asset_rigid_shape_properties(trifinger_asset, trifinger_props)
# extract the frame handles
for frame_name in self._fingertips_handles.keys():
self._fingertips_handles[frame_name] = self.gym.find_asset_rigid_body_index(trifinger_asset,
frame_name)
# check valid handle
if self._fingertips_handles[frame_name] == gymapi.INVALID_HANDLE:
msg = f"Invalid handle received for frame: `{frame_name}`."
print(msg)
if self.cfg["env"]["enable_ft_sensors"] or self.cfg["env"]["asymmetric_obs"]:
sensor_pose = gymapi.Transform()
for fingertip_handle in self._fingertips_handles.values():
self.gym.create_asset_force_sensor(trifinger_asset, fingertip_handle, sensor_pose)
# extract the dof indices
# Note: need to write actuated dofs manually since the system contains fixed joints as well which show up.
for dof_name in self._robot_dof_indices.keys():
self._robot_dof_indices[dof_name] = self.gym.find_asset_dof_index(trifinger_asset, dof_name)
# check valid handle
if self._robot_dof_indices[dof_name] == gymapi.INVALID_HANDLE:
msg = f"Invalid index received for DOF: `{dof_name}`."
print(msg)
# return the asset
return trifinger_asset
def __define_table_asset(self):
""" Define Gym asset for stage.
"""
# define stage asset
table_asset_options = gymapi.AssetOptions()
table_asset_options.disable_gravity = True
table_asset_options.fix_base_link = True
table_asset_options.thickness = 0.001
# load stage asset
table_asset = self.gym.load_asset(self.sim, self._trifinger_assets_dir,
self._table_urdf_file, table_asset_options)
# set stage properties
table_props = self.gym.get_asset_rigid_shape_properties(table_asset)
# iterate over each mesh
for p in table_props:
p.friction = 0.1
p.torsion_friction = 0.1
self.gym.set_asset_rigid_shape_properties(table_asset, table_props)
# return the asset
return table_asset
def __define_boundary_asset(self):
""" Define Gym asset for stage.
"""
# define stage asset
boundary_asset_options = gymapi.AssetOptions()
boundary_asset_options.disable_gravity = True
boundary_asset_options.fix_base_link = True
boundary_asset_options.thickness = 0.001
boundary_asset_options.vhacd_enabled = True
boundary_asset_options.vhacd_params = gymapi.VhacdParams()
boundary_asset_options.vhacd_params.resolution = 100000
boundary_asset_options.vhacd_params.concavity = 0.0
boundary_asset_options.vhacd_params.alpha = 0.04
boundary_asset_options.vhacd_params.beta = 1.0
boundary_asset_options.vhacd_params.max_num_vertices_per_ch = 1024
# load stage asset
boundary_asset = self.gym.load_asset(self.sim, self._trifinger_assets_dir,
self._boundary_urdf_file, boundary_asset_options)
# set stage properties
boundary_props = self.gym.get_asset_rigid_shape_properties(boundary_asset)
self.gym.set_asset_rigid_shape_properties(boundary_asset, boundary_props)
# return the asset
return boundary_asset
def __define_object_asset(self):
""" Define Gym asset for object.
"""
# define object asset
object_asset_options = gymapi.AssetOptions()
object_asset_options.disable_gravity = False
object_asset_options.thickness = 0.001
object_asset_options.flip_visual_attachments = True
# load object asset
object_asset = self.gym.load_asset(self.sim, self._trifinger_assets_dir,
self._object_urdf_file, object_asset_options)
# set object properties
# Ref: https://github.com/rr-learning/rrc_simulation/blob/master/python/rrc_simulation/collision_objects.py#L96
object_props = self.gym.get_asset_rigid_shape_properties(object_asset)
for p in object_props:
p.friction = 1.0
p.torsion_friction = 0.001
p.restitution = 0.0
self.gym.set_asset_rigid_shape_properties(object_asset, object_props)
# return the asset
return object_asset
def __define_goal_object_asset(self):
""" Define Gym asset for goal object.
"""
# define object asset
object_asset_options = gymapi.AssetOptions()
object_asset_options.disable_gravity = True
object_asset_options.fix_base_link = True
object_asset_options.thickness = 0.001
object_asset_options.flip_visual_attachments = True
# load object asset
goal_object_asset = self.gym.load_asset(self.sim, self._trifinger_assets_dir,
self._object_urdf_file, object_asset_options)
# return the asset
return goal_object_asset
@property
def env_steps_count(self) -> int:
"""Returns the total number of environment steps aggregated across parallel environments."""
return self.gym.get_frame_count(self.sim) * self.num_envs
#####################################################################
###=========================jit functions=========================###
#####################################################################
@torch.jit.script
def lgsk_kernel(x: torch.Tensor, scale: float = 50.0, eps: float = 2) -> torch.Tensor:
"""Defines logistic kernel function to bound input to [-0.25, 0)
Ref: https://arxiv.org/abs/1901.08652 (page 15)
Args:
x: Input tensor.
scale: Scaling of the kernel function (controls how wide the 'bell' shape is')
eps: Controls how 'tall' the 'bell' shape is.
Returns:
Output tensor computed using kernel.
"""
scaled = x * scale
return 1.0 / (scaled.exp() + eps + (-scaled).exp())
@torch.jit.script
def gen_keypoints(pose: torch.Tensor, num_keypoints: int = 8, size: Tuple[float, float, float] = (0.065, 0.065, 0.065)):
num_envs = pose.shape[0]
keypoints_buf = torch.ones(num_envs, num_keypoints, 3, dtype=torch.float32, device=pose.device)
for i in range(num_keypoints):
# which dimensions to negate
n = [((i >> k) & 1) == 0 for k in range(3)]
corner_loc = [(1 if n[k] else -1) * s / 2 for k, s in enumerate(size)],
corner = torch.tensor(corner_loc, dtype=torch.float32, device=pose.device) * keypoints_buf[:, i, :]
keypoints_buf[:, i, :] = local_to_world_space(corner, pose)
return keypoints_buf
@torch.jit.script
def compute_trifinger_reward(
obs_buf: torch.Tensor,
reset_buf: torch.Tensor,
progress_buf: torch.Tensor,
episode_length: int,
dt: float,
finger_move_penalty_weight: float,
finger_reach_object_weight: float,
object_dist_weight: float,
object_rot_weight: float,
env_steps_count: int,
object_goal_poses_buf: torch.Tensor,
object_state: torch.Tensor,
last_object_state: torch.Tensor,
fingertip_state: torch.Tensor,
last_fingertip_state: torch.Tensor,
use_keypoints: bool
) -> Tuple[torch.Tensor, torch.Tensor, Dict[str, torch.Tensor]]:
ft_sched_start = 0
ft_sched_end = 5e7
# Reward penalising finger movement
fingertip_vel = (fingertip_state[:, :, 0:3] - last_fingertip_state[:, :, 0:3]) / dt
finger_movement_penalty = finger_move_penalty_weight * fingertip_vel.pow(2).view(-1, 9).sum(dim=-1)
# Reward for finger reaching the object
# distance from each finger to the centroid of the object, shape (N, 3).
curr_norms = torch.stack([
torch.norm(fingertip_state[:, i, 0:3] - object_state[:, 0:3], p=2, dim=-1)
for i in range(3)
], dim=-1)
# distance from each finger to the centroid of the object in the last timestep, shape (N, 3).
prev_norms = torch.stack([
torch.norm(last_fingertip_state[:, i, 0:3] - last_object_state[:, 0:3], p=2, dim=-1)
for i in range(3)
], dim=-1)
ft_sched_val = 1.0 if ft_sched_start <= env_steps_count <= ft_sched_end else 0.0
finger_reach_object_reward = finger_reach_object_weight * ft_sched_val * (curr_norms - prev_norms).sum(dim=-1)
if use_keypoints:
object_keypoints = gen_keypoints(object_state[:, 0:7])
goal_keypoints = gen_keypoints(object_goal_poses_buf[:, 0:7])
delta = object_keypoints - goal_keypoints
dist_l2 = torch.norm(delta, p=2, dim=-1)
keypoints_kernel_sum = lgsk_kernel(dist_l2, scale=30., eps=2.).mean(dim=-1)
pose_reward = object_dist_weight * dt * keypoints_kernel_sum
else:
# Reward for object distance
object_dist = torch.norm(object_state[:, 0:3] - object_goal_poses_buf[:, 0:3], p=2, dim=-1)
object_dist_reward = object_dist_weight * dt * lgsk_kernel(object_dist, scale=50., eps=2.)
# Reward for object rotation
# extract quaternion orientation
quat_a = object_state[:, 3:7]
quat_b = object_goal_poses_buf[:, 3:7]
angles = quat_diff_rad(quat_a, quat_b)
object_rot_reward = object_rot_weight * dt / (3. * torch.abs(angles) + 0.01)
pose_reward = object_dist_reward + object_rot_reward
total_reward = (
finger_movement_penalty
+ finger_reach_object_reward
+ pose_reward
)
# reset agents
reset = torch.zeros_like(reset_buf)
reset = torch.where(progress_buf >= episode_length - 1, torch.ones_like(reset_buf), reset)
info: Dict[str, torch.Tensor] = {
'finger_movement_penalty': finger_movement_penalty,
'finger_reach_object_reward': finger_reach_object_reward,
'pose_reward': finger_reach_object_reward,
'reward': total_reward,
}
return total_reward, reset, info
@torch.jit.script
def compute_trifinger_observations_states(
asymmetric_obs: bool,
dof_position: torch.Tensor,
dof_velocity: torch.Tensor,
object_state: torch.Tensor,
object_goal_poses: torch.Tensor,
actions: torch.Tensor,
fingertip_state: torch.Tensor,
joint_torques: torch.Tensor,
tip_wrenches: torch.Tensor
):
num_envs = dof_position.shape[0]
obs_buf = torch.cat([
dof_position,
dof_velocity,
object_state[:, 0:7], # pose
object_goal_poses,
actions
], dim=-1)
if asymmetric_obs:
states_buf = torch.cat([
obs_buf,
object_state[:, 7:13], # linear / angular velocity
fingertip_state.reshape(num_envs, -1),
joint_torques,
tip_wrenches
], dim=-1)
else:
states_buf = obs_buf
return obs_buf, states_buf
"""
Sampling of cuboidal object
"""
@torch.jit.script
def random_xy(num: int, max_com_distance_to_center: float, device: str) -> Tuple[torch.Tensor, torch.Tensor]:
"""Returns sampled uniform positions in circle (https://stackoverflow.com/a/50746409)"""
# sample radius of circle
radius = torch.sqrt(torch.rand(num, dtype=torch.float, device=device))
radius *= max_com_distance_to_center
# sample theta of point
theta = 2 * np.pi * torch.rand(num, dtype=torch.float, device=device)
# x,y-position of the cube
x = radius * torch.cos(theta)
y = radius * torch.sin(theta)
return x, y
@torch.jit.script
def random_z(num: int, min_height: float, max_height: float, device: str) -> torch.Tensor:
"""Returns sampled height of the goal object."""
z = torch.rand(num, dtype=torch.float, device=device)
z = (max_height - min_height) * z + min_height
return z
@torch.jit.script
def default_orientation(num: int, device: str) -> torch.Tensor:
"""Returns identity rotation transform."""
quat = torch.zeros((num, 4,), dtype=torch.float, device=device)
quat[..., -1] = 1.0
return quat
@torch.jit.script
def random_orientation(num: int, device: str) -> torch.Tensor:
"""Returns sampled rotation in 3D as quaternion.
Ref: https://docs.scipy.org/doc/scipy/reference/generated/scipy.spatial.transform.Rotation.random.html
"""
# sample random orientation from normal distribution
quat = torch.randn((num, 4,), dtype=torch.float, device=device)
# normalize the quaternion
quat = torch.nn.functional.normalize(quat, p=2., dim=-1, eps=1e-12)
return quat
@torch.jit.script
def random_orientation_within_angle(num: int, device: str, base: torch.Tensor, max_angle: float):
""" Generates random quaternions within max_angle of base
Ref: https://math.stackexchange.com/a/3448434
"""
quat = torch.zeros((num, 4,), dtype=torch.float, device=device)
rand = torch.rand((num, 3), dtype=torch.float, device=device)
c = torch.cos(rand[:, 0] * max_angle)
n = torch.sqrt((1. - c) / 2.)
quat[:, 3] = torch.sqrt((1 + c) / 2.)
quat[:, 2] = (rand[:, 1] * 2. - 1.) * n
quat[:, 0] = (torch.sqrt(1 - quat[:, 2] ** 2.) * torch.cos(2 * np.pi * rand[:, 2])) * n
quat[:, 1] = (torch.sqrt(1 - quat[:, 2] ** 2.) * torch.sin(2 * np.pi * rand[:, 2])) * n
# floating point errors can cause it to be slightly off, re-normalise
quat = torch.nn.functional.normalize(quat, p=2., dim=-1, eps=1e-12)
return quat_mul(quat, base)
@torch.jit.script
def random_angular_vel(num: int, device: str, magnitude_stdev: float) -> torch.Tensor:
"""Samples a random angular velocity with standard deviation `magnitude_stdev`"""
axis = torch.randn((num, 3,), dtype=torch.float, device=device)
axis /= torch.norm(axis, p=2, dim=-1).view(-1, 1)
magnitude = torch.randn((num, 1,), dtype=torch.float, device=device)
magnitude *= magnitude_stdev
return magnitude * axis
@torch.jit.script
def random_yaw_orientation(num: int, device: str) -> torch.Tensor:
"""Returns sampled rotation around z-axis."""
roll = torch.zeros(num, dtype=torch.float, device=device)
pitch = torch.zeros(num, dtype=torch.float, device=device)
yaw = 2 * np.pi * torch.rand(num, dtype=torch.float, device=device)
return quat_from_euler_xyz(roll, pitch, yaw)