# Copyright (c) 2020, NVIDIA CORPORATION. All rights reserved.
# NVIDIA CORPORATION and its licensors retain all intellectual property
# and proprietary rights in and to this software, related documentation
# and any modifications thereto. Any use, reproduction, disclosure or
# distribution of this software and related documentation without an express
# license agreement from NVIDIA CORPORATION is strictly prohibited.
import sys
import os
import operator
from copy import deepcopy
import random
from gym import spaces
from isaacgym import gymapi
from isaacgym.gymutil import get_property_setter_map, get_property_getter_map, get_default_setter_args, apply_random_samples, check_buckets, generate_random_samples
import numpy as np
import torch
# Base class for RL tasks
[docs]class BaseTask():
def __init__(self, cfg, enable_camera_sensors=False):
self.gym = gymapi.acquire_gym()
self.obs_dict = {}
self.device_type = cfg.get("device_type", "cuda")
self.device_id = cfg.get("device_id", 0)
self.device = "cpu"
if self.device_type == "cuda" or self.device_type == "GPU":
self.device = "cuda" + ":" + str(self.device_id)
self.headless = cfg["headless"]
# double check!
self.graphics_device_id = self.device_id
if enable_camera_sensors == False and self.headless == True:
self.graphics_device_id = -1
self.num_envs = cfg["env"]["numEnvs"]
self.num_obs = cfg["env"]["numObservations"]
self.num_states = cfg["env"].get("numStates", 0)
self.num_actions = cfg["env"]["numActions"]
self.control_freq_inv = cfg["env"].get("controlFrequencyInv", 1)
# optimization flags for pytorch JIT
torch._C._jit_set_profiling_mode(False)
torch._C._jit_set_profiling_executor(False)
# allocate buffers
self.obs_buf = torch.zeros(
(self.num_envs, self.num_obs), device=self.device, dtype=torch.float)
self.states_buf = torch.zeros(
(self.num_envs, self.num_states), device=self.device, dtype=torch.float)
self.rew_buf = torch.zeros(
self.num_envs, device=self.device, dtype=torch.float)
self.reset_buf = torch.ones(
self.num_envs, device=self.device, dtype=torch.long)
self.progress_buf = torch.zeros(
self.num_envs, device=self.device, dtype=torch.long)
self.randomize_buf = torch.zeros(
self.num_envs, device=self.device, dtype=torch.long)
self.extras = {}
self.obs_space = spaces.Box(
np.ones(self.num_obs, dtype=np.float32) * -np.Inf,
np.ones(self.num_obs, dtype=np.float32) * np.Inf,
)
self.state_space = spaces.Box(
np.ones(self.num_states, dtype=np.float32) * -np.Inf,
np.ones(self.num_states, dtype=np.float32) * np.Inf,
)
self.act_space = spaces.Box(
np.ones(self.num_actions, dtype=np.float32) * -1.0,
np.ones(self.num_actions, dtype=np.float32) * 1.0,
)
self.original_props = {}
self.dr_randomizations = {}
self.first_randomization = True
self.actor_params_generator = None
self.extern_actor_params = {}
for env_id in range(self.num_envs):
self.extern_actor_params[env_id] = None
self.last_step = -1
self.last_rand_step = -1
# create envs, sim and viewer
self.create_sim()
self.gym.prepare_sim(self.sim)
# todo: read from config
self.enable_viewer_sync = True
self.viewer = None
# if running with a viewer, set up keyboard shortcuts and camera
if self.headless == False:
# subscribe to keyboard shortcuts
self.viewer = self.gym.create_viewer(
self.sim, gymapi.CameraProperties())
self.gym.subscribe_viewer_keyboard_event(
self.viewer, gymapi.KEY_ESCAPE, "QUIT")
self.gym.subscribe_viewer_keyboard_event(
self.viewer, gymapi.KEY_V, "toggle_viewer_sync")
# set the camera position based on up axis
sim_params = self.gym.get_sim_params(self.sim)
if sim_params.up_axis == gymapi.UP_AXIS_Z:
cam_pos = gymapi.Vec3(20.0, 25.0, 3.0)
cam_target = gymapi.Vec3(10.0, 15.0, 0.0)
else:
cam_pos = gymapi.Vec3(20.0, 3.0, 25.0)
cam_target = gymapi.Vec3(10.0, 0.0, 15.0)
self.gym.viewer_camera_look_at(
self.viewer, None, cam_pos, cam_target)
[docs] def reset_done(self):
"""Reset the environment.
Returns:
Observation dictionary, indices of environments being reset
"""
done_env_ids = self.reset_buf.nonzero(as_tuple=False).flatten()
if len(done_env_ids) > 0:
self.reset_idx(done_env_ids)
self.obs_dict["obs"] = torch.clamp(
self.obs_buf, -np.Inf, np.Inf
).to(self.rl_device)
# asymmetric actor-critic
if self.num_states > 0:
self.obs_dict["states"] = self.get_state()
return self.obs_dict, done_env_ids
@property
def observation_space(self):
"""Get the environment's observation space."""
return self.obs_space
@property
def action_space(self):
"""Get the environment's action space."""
return self.act_space
# set gravity based on up axis and return axis index
[docs] def set_sim_params_up_axis(self, sim_params, axis):
if axis == 'z':
sim_params.up_axis = gymapi.UP_AXIS_Z
sim_params.gravity.x = 0
sim_params.gravity.y = 0
sim_params.gravity.z = -9.81
return 2
return 1
[docs] def create_sim(self, compute_device, graphics_device, physics_engine, sim_params):
sim = self.gym.create_sim(compute_device, graphics_device, physics_engine, sim_params)
if sim is None:
print("*** Failed to create sim")
quit()
return sim
[docs] def step(self, actions):
if self.dr_randomizations.get('actions', None):
actions = self.dr_randomizations['actions']['noise_lambda'](actions)
# apply actions
self.pre_physics_step(actions)
# step physics and render each frame
self._physics_step()
# to fix!
if self.device == 'cpu':
self.gym.fetch_results(self.sim, True)
# compute observations, rewards, resets, ...
self.post_physics_step()
if self.dr_randomizations.get('observations', None):
self.obs_buf = self.dr_randomizations['observations']['noise_lambda'](self.obs_buf)
self.obs_dict["obs"] = torch.clamp(
self.obs_buf, -np.Inf, np.Inf
).to(self.rl_device)
return (
self.obs_dict,
self.rew_buf.to(self.rl_device),
self.reset_buf.to(self.rl_device),
self.extras,
)
[docs] def get_states(self):
return self.states_buf
[docs] def render(self, sync_frame_time=False):
if self.viewer:
# check for window closed
if self.gym.query_viewer_has_closed(self.viewer):
sys.exit()
# check for keyboard events
for evt in self.gym.query_viewer_action_events(self.viewer):
if evt.action == "QUIT" and evt.value > 0:
sys.exit()
elif evt.action == "toggle_viewer_sync" and evt.value > 0:
self.enable_viewer_sync = not self.enable_viewer_sync
# fetch results
if self.device != 'cpu':
self.gym.fetch_results(self.sim, True)
# step graphics
if self.enable_viewer_sync:
self.gym.step_graphics(self.sim)
self.gym.draw_viewer(self.viewer, self.sim, True)
else:
self.gym.poll_viewer_events(self.viewer)
[docs] def get_actor_params_info(self, dr_params, env):
"""Returns a flat array of actor params, their names and ranges."""
if "actor_params" not in dr_params:
return None
params = []
names = []
lows = []
highs = []
param_getters_map = get_property_getter_map(self.gym)
for actor, actor_properties in dr_params["actor_params"].items():
handle = self.gym.find_actor_handle(env, actor)
for prop_name, prop_attrs in actor_properties.items():
if prop_name == 'color':
continue # this is set randomly
props = param_getters_map[prop_name](env, handle)
if not isinstance(props, list):
props = [props]
for prop_idx, prop in enumerate(props):
for attr, attr_randomization_params in prop_attrs.items():
name = prop_name+'_'+str(prop_idx)+'_'+attr
lo_hi = attr_randomization_params['range']
distr = attr_randomization_params['distribution']
if 'uniform' not in distr:
lo_hi = (-1.0*float('Inf'), float('Inf'))
if isinstance(prop, np.ndarray):
for attr_idx in range(prop[attr].shape[0]):
params.append(prop[attr][attr_idx])
names.append(name+'_'+str(attr_idx))
lows.append(lo_hi[0])
highs.append(lo_hi[1])
else:
params.append(getattr(prop, attr))
names.append(name)
lows.append(lo_hi[0])
highs.append(lo_hi[1])
return params, names, lows, highs
# Apply randomizations only on resets, due to current PhysX limitations
[docs] def apply_randomizations(self, dr_params):
# If we don't have a randomization frequency, randomize every step
rand_freq = dr_params.get("frequency", 1)
# First, determine what to randomize:
# - non-environment parameters when > frequency steps have passed since the last non-environment
# - physical environments in the reset buffer, which have exceeded the randomization frequency threshold
# - on the first call, randomize everything
self.last_step = self.gym.get_frame_count(self.sim)
if self.first_randomization:
do_nonenv_randomize = True
env_ids = list(range(self.num_envs))
else:
do_nonenv_randomize = (self.last_step - self.last_rand_step) >= rand_freq
rand_envs = torch.where(self.randomize_buf >= rand_freq, torch.ones_like(self.randomize_buf), torch.zeros_like(self.randomize_buf))
rand_envs = torch.logical_and(rand_envs, self.reset_buf)
env_ids = torch.nonzero(rand_envs, as_tuple=False).squeeze(-1).tolist()
self.randomize_buf[rand_envs] = 0
if do_nonenv_randomize:
self.last_rand_step = self.last_step
param_setters_map = get_property_setter_map(self.gym)
param_setter_defaults_map = get_default_setter_args(self.gym)
param_getters_map = get_property_getter_map(self.gym)
# On first iteration, check the number of buckets
if self.first_randomization:
check_buckets(self.gym, self.envs, dr_params)
for nonphysical_param in ["observations", "actions"]:
if nonphysical_param in dr_params and do_nonenv_randomize:
dist = dr_params[nonphysical_param]["distribution"]
op_type = dr_params[nonphysical_param]["operation"]
sched_type = dr_params[nonphysical_param]["schedule"] if "schedule" in dr_params[nonphysical_param] else None
sched_step = dr_params[nonphysical_param]["schedule_steps"] if "schedule" in dr_params[nonphysical_param] else None
op = operator.add if op_type == 'additive' else operator.mul
if sched_type == 'linear':
sched_scaling = 1.0 / sched_step * \
min(self.last_step, sched_step)
elif sched_type == 'constant':
sched_scaling = 0 if self.last_step < sched_step else 1
else:
sched_scaling = 1
if dist == 'gaussian':
mu, var = dr_params[nonphysical_param]["range"]
mu_corr, var_corr = dr_params[nonphysical_param].get("range_correlated", [0., 0.])
if op_type == 'additive':
mu *= sched_scaling
var *= sched_scaling
mu_corr *= sched_scaling
var_corr *= sched_scaling
elif op_type == 'scaling':
var = var * sched_scaling # scale up var over time
mu = mu * sched_scaling + 1.0 * \
(1.0 - sched_scaling) # linearly interpolate
var_corr = var_corr * sched_scaling # scale up var over time
mu_corr = mu_corr * sched_scaling + 1.0 * \
(1.0 - sched_scaling) # linearly interpolate
def noise_lambda(tensor, param_name=nonphysical_param):
params = self.dr_randomizations[param_name]
corr = params.get('corr', None)
if corr is None:
corr = torch.randn_like(tensor)
params['corr'] = corr
corr = corr * params['var_corr'] + params['mu_corr']
return op(
tensor, corr + torch.randn_like(tensor) * params['var'] + params['mu'])
self.dr_randomizations[nonphysical_param] = {'mu': mu, 'var': var, 'mu_corr': mu_corr, 'var_corr': var_corr, 'noise_lambda': noise_lambda}
elif dist == 'uniform':
lo, hi = dr_params[nonphysical_param]["range"]
lo_corr, hi_corr = dr_params[nonphysical_param].get("range_correlated", [0., 0.])
if op_type == 'additive':
lo *= sched_scaling
hi *= sched_scaling
lo_corr *= sched_scaling
hi_corr *= sched_scaling
elif op_type == 'scaling':
lo = lo * sched_scaling + 1.0 * (1.0 - sched_scaling)
hi = hi * sched_scaling + 1.0 * (1.0 - sched_scaling)
lo_corr = lo_corr * sched_scaling + 1.0 * (1.0 - sched_scaling)
hi_corr = hi_corr * sched_scaling + 1.0 * (1.0 - sched_scaling)
def noise_lambda(tensor, param_name=nonphysical_param):
params = self.dr_randomizations[param_name]
corr = params.get('corr', None)
if corr is None:
corr = torch.randn_like(tensor)
params['corr'] = corr
corr = corr * (params['hi_corr'] - params['lo_corr']) + params['lo_corr']
return op(tensor, corr + torch.rand_like(tensor) * (params['hi'] - params['lo']) + params['lo'])
self.dr_randomizations[nonphysical_param] = {'lo': lo, 'hi': hi, 'lo_corr': lo_corr, 'hi_corr': hi_corr, 'noise_lambda': noise_lambda}
if "sim_params" in dr_params and do_nonenv_randomize:
prop_attrs = dr_params["sim_params"]
prop = self.gym.get_sim_params(self.sim)
if self.first_randomization:
self.original_props["sim_params"] = {
attr: getattr(prop, attr) for attr in dir(prop)}
for attr, attr_randomization_params in prop_attrs.items():
apply_random_samples(
prop, self.original_props["sim_params"], attr, attr_randomization_params, self.last_step)
self.gym.set_sim_params(self.sim, prop)
# If self.actor_params_generator is initialized: use it to
# sample actor simulation params. This gives users the
# freedom to generate samples from arbitrary distributions,
# e.g. use full-covariance distributions instead of the DR's
# default of treating each simulation parameter independently.
extern_offsets = {}
if self.actor_params_generator is not None:
for env_id in env_ids:
self.extern_actor_params[env_id] = \
self.actor_params_generator.sample()
extern_offsets[env_id] = 0
for actor, actor_properties in dr_params["actor_params"].items():
for env_id in env_ids:
env = self.envs[env_id]
handle = self.gym.find_actor_handle(env, actor)
extern_sample = self.extern_actor_params[env_id]
for prop_name, prop_attrs in actor_properties.items():
if prop_name == 'color':
num_bodies = self.gym.get_actor_rigid_body_count(
env, handle)
for n in range(num_bodies):
self.gym.set_rigid_body_color(env, handle, n, gymapi.MESH_VISUAL,
gymapi.Vec3(random.uniform(0, 1), random.uniform(0, 1), random.uniform(0, 1)))
continue
if prop_name == 'scale':
attr_randomization_params = prop_attrs
sample = generate_random_samples(attr_randomization_params, 1,
self.last_step, None)
og_scale = 1
if attr_randomization_params['operation'] == 'scaling':
new_scale = og_scale * sample
elif attr_randomization_params['operation'] == 'additive':
new_scale = og_scale + sample
self.gym.set_actor_scale(env, handle, new_scale)
continue
prop = param_getters_map[prop_name](env, handle)
if isinstance(prop, list):
if self.first_randomization:
self.original_props[prop_name] = [
{attr: getattr(p, attr) for attr in dir(p)} for p in prop]
for p, og_p in zip(prop, self.original_props[prop_name]):
for attr, attr_randomization_params in prop_attrs.items():
smpl = None
if self.actor_params_generator is not None:
smpl, extern_offsets[env_id] = get_attr_val_from_sample(
extern_sample, extern_offsets[env_id], p, attr)
apply_random_samples(
p, og_p, attr, attr_randomization_params,
self.last_step, smpl)
else:
if self.first_randomization:
self.original_props[prop_name] = deepcopy(prop)
for attr, attr_randomization_params in prop_attrs.items():
smpl = None
if self.actor_params_generator is not None:
smpl, extern_offsets[env_id] = get_attr_val_from_sample(
extern_sample, extern_offsets[env_id], prop, attr)
apply_random_samples(
prop, self.original_props[prop_name], attr,
attr_randomization_params, self.last_step, smpl)
setter = param_setters_map[prop_name]
default_args = param_setter_defaults_map[prop_name]
setter(env, handle, prop, *default_args)
if self.actor_params_generator is not None:
for env_id in env_ids: # check that we used all dims in sample
if extern_offsets[env_id] > 0:
extern_sample = self.extern_actor_params[env_id]
if extern_offsets[env_id] != extern_sample.shape[0]:
print('env_id', env_id,
'extern_offset', extern_offsets[env_id],
'vs extern_sample.shape', extern_sample.shape)
raise Exception("Invalid extern_sample size")
self.first_randomization = False
[docs] def pre_physics_step(self, actions):
raise NotImplementedError
def _physics_step(self):
for i in range(self.control_freq_inv):
self.render()
self.gym.simulate(self.sim)
return
[docs] def post_physics_step(self):
raise NotImplementedError
[docs]def get_attr_val_from_sample(sample, offset, prop, attr):
"""Retrieves param value for the given prop and attr from the sample."""
if sample is None:
return None, 0
if isinstance(prop, np.ndarray):
smpl = sample[offset:offset+prop[attr].shape[0]]
return smpl, offset+prop[attr].shape[0]
else:
return sample[offset], offset+1