# Copyright 2023, Junjia LIU, jjliu@mae.cuhk.edu.hk
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Union, Tuple, Any, Optional
import gym
import gymnasium
import collections
import numpy as np
from packaging import version
import torch
from rofunc.utils.logger.beauty_logger import beauty_print
__all__ = ["wrap_env"]
[docs]class Wrapper(object):
def __init__(self, env: Any, device=None) -> None:
"""Base wrapper class for RL environments
:param env: The environment to wrap
:type env: Any supported RL environment
"""
self._env = env
# device (faster than @property)
if hasattr(self._env, "device"):
self.device = torch.device(self._env.device)
else:
self.device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
if device is not None:
self.device = torch.device(device)
def __getattr__(self, key: str) -> Any:
"""Get an attribute from the wrapped environment
:param key: The attribute name
:type key: str
:raises AttributeError: If the attribute does not exist
:return: The attribute value
:rtype: Any
"""
if hasattr(self._env, key):
return getattr(self._env, key)
raise AttributeError("Wrapped environment ({}) does not have attribute '{}'" \
.format(self._env.__class__.__name__, key))
[docs] def reset(self) -> Tuple[torch.Tensor, Any]:
"""Reset the environment
:raises NotImplementedError: Not implemented
:return: Observation, info
:rtype: torch.Tensor and any other info
"""
raise NotImplementedError
[docs] def step(self, actions: torch.Tensor) -> Tuple[torch.Tensor, torch.Tensor, torch.Tensor, torch.Tensor, Any]:
"""Perform a step in the environment
:param actions: The actions to perform
:type actions: torch.Tensor
:raises NotImplementedError: Not implemented
:return: Observation, reward, terminated, truncated, info
:rtype: tuple of torch.Tensor and any other info
"""
raise NotImplementedError
[docs] def render(self, *args, **kwargs) -> None:
"""Render the environment
:raises NotImplementedError: Not implemented
"""
raise NotImplementedError
[docs] def close(self) -> None:
"""Close the environment
:raises NotImplementedError: Not implemented
"""
raise NotImplementedError
@property
def num_envs(self) -> int:
"""Number of environments
If the wrapped environment does not have the ``num_envs`` property, it will be set to 1
"""
return self._env.num_envs if hasattr(self._env, "num_envs") else 1
@property
def state_space(self) -> gym.Space:
"""State space
If the wrapped environment does not have the ``state_space`` property,
the value of the ``observation_space`` property will be used
"""
return self._env.state_space if hasattr(self._env, "state_space") else self._env.observation_space
@property
def observation_space(self) -> gym.Space:
"""Observation space
"""
return self._env.observation_space
@property
def action_space(self) -> gym.Space:
"""Action space
"""
return self._env.action_space
[docs]class IsaacGymPreview2Wrapper(Wrapper):
def __init__(self, env: Any) -> None:
"""Isaac Gym environment (preview 2) wrapper
:param env: The environment to wrap
:type env: Any supported Isaac Gym environment (preview 2) environment
"""
super().__init__(env)
self._reset_once = True
self._obs_buf = None
[docs] def step(self, actions: torch.Tensor) -> Tuple[torch.Tensor, torch.Tensor, torch.Tensor, torch.Tensor, Any]:
"""Perform a step in the environment
:param actions: The actions to perform
:type actions: torch.Tensor
:return: Observation, reward, terminated, truncated, info
:rtype: tuple of torch.Tensor and any other info
"""
self._obs_buf, reward, terminated, info = self._env.step(actions)
truncated = torch.zeros_like(terminated)
return self._obs_buf, reward.view(-1, 1), terminated.view(-1, 1), truncated.view(-1, 1), info
[docs] def reset(self) -> Tuple[torch.Tensor, Any]:
"""Reset the environment
:return: Observation, info
:rtype: torch.Tensor and any other info
"""
if self._reset_once:
self._obs_buf = self._env.reset()
self._reset_once = False
return self._obs_buf, {}
[docs] def render(self, *args, **kwargs) -> None:
"""Render the environment
"""
pass
[docs] def close(self) -> None:
"""Close the environment
"""
pass
[docs]class IsaacGymPreview3Wrapper(Wrapper):
def __init__(self, env: Any) -> None:
"""Isaac Gym environment (preview 3) wrapper
:param env: The environment to wrap
:type env: Any supported Isaac Gym environment (preview 3) environment
"""
super().__init__(env)
self._reset_once = True
self._obs_dict = None
[docs] def step(self, actions: torch.Tensor) -> Tuple[torch.Tensor, torch.Tensor, torch.Tensor, torch.Tensor, Any]:
"""Perform a step in the environment
:param actions: The actions to perform
:type actions: torch.Tensor
:return: Observation, reward, terminated, truncated, info
:rtype: tuple of torch.Tensor and any other info
"""
self._obs_dict, reward, terminated, info = self._env.step(actions)
truncated = torch.zeros_like(terminated)
if isinstance(self._obs_dict, dict):
return self._obs_dict["obs"], reward.view(-1, 1), terminated.view(-1, 1), truncated.view(-1, 1), info
else:
return self._obs_dict, reward.view(-1, 1), terminated.view(-1, 1), truncated.view(-1, 1), info
[docs] def reset(self) -> Tuple[torch.Tensor, Any]:
"""Reset the environment
:return: Observation, info
:rtype: torch.Tensor and any other info
"""
if self._reset_once:
self._obs_dict = self._env.reset()
self._reset_once = False
if isinstance(self._obs_dict, dict):
return self._obs_dict["obs"], {}
else:
return self._obs_dict, {}
[docs] def render(self, *args, **kwargs) -> None:
"""Render the environment
"""
pass
[docs] def close(self) -> None:
"""Close the environment
"""
pass
[docs]class OmniverseIsaacGymWrapper(Wrapper):
def __init__(self, env: Any) -> None:
"""Omniverse Isaac Gym environment wrapper
:param env: The environment to wrap
:type env: Any supported Omniverse Isaac Gym environment
"""
super().__init__(env)
self._reset_once = True
self._obs_dict = None
[docs] def run(self, trainer: Optional["omni.isaac.gym.vec_env.vec_env_mt.TrainerMT"] = None) -> None:
"""Run the simulation in the main thread
This method is valid only for the Omniverse Isaac Gym multi-threaded environments
:param trainer: Trainer which should implement a ``run`` method that initiates the RL loop on a new thread
:type trainer: omni.isaac.gym.vec_env.vec_env_mt.TrainerMT, optional
"""
self._env.run(trainer)
def _process_data(self):
self._obs = torch.clamp(self._obs, -self._env._task.clip_obs, self._env._task.clip_obs).to(self._env._task.rl_device).clone()
self._rew = self._rew.to(self._env._task.rl_device).clone()
self._states = torch.clamp(self._states, -self._env._task.clip_obs, self._env._task.clip_obs).to(
self._env._task.rl_device).clone()
self._resets = self._resets.to(self._env._task.rl_device).clone()
self._extras = self._extras.copy()
[docs] def step(self, actions: torch.Tensor) -> Tuple[torch.Tensor, torch.Tensor, torch.Tensor, torch.Tensor, Any]:
"""Perform a step in the environment
:param actions: The actions to perform
:type actions: torch.Tensor
:return: Observation, reward, terminated, truncated, info
:rtype: tuple of torch.Tensor and any other info
"""
if self._env._task.randomize_actions:
actions = self._env._task._dr_randomizer.apply_actions_randomization(actions=actions,
reset_buf=self._env._task.reset_buf)
actions = torch.clamp(actions, -self._env._task.clip_actions, self._env._task.clip_actions).to(
self._env._task.device).clone()
self._env._task.pre_physics_step(actions)
for _ in range(self._env._task.control_frequency_inv):
self._env._world.step(render=self._env._render)
self._env.sim_frame_count += 1
self._obs, self._rew, self._resets, self._extras = self._task.post_physics_step()
if self._env._task.randomize_observations:
self._obs = self._env._task._dr_randomizer.apply_observations_randomization(
observations=self._obs.to(device=self._env._task.rl_device), reset_buf=self._env._task.reset_buf)
self._states = self._env._task.get_states()
self._process_data()
self._obs_dict = {"obs": self._obs, "states": self._states}
# self._obs_dict, reward, terminated, info = self._env.step(actions)
# truncated = torch.zeros_like(terminated)
return self._obs_dict["obs"], self._rew.view(-1, 1), self._resets.view(-1, 1), self._resets.view(-1, 1), self._extras
[docs] def reset(self) -> Tuple[torch.Tensor, Any]:
"""Reset the environment
:return: Observation, info
:rtype: torch.Tensor and any other info
"""
if self._reset_once:
# self._obs_dict = self._env.reset()
self._env._task.reset()
actions = torch.zeros((self.num_envs, self._env._task.num_actions), device=self._env._task.rl_device)
self.step(actions)
self._reset_once = False
return self._obs_dict["obs"], {}
[docs] def render(self, *args, **kwargs) -> None:
"""Render the environment
"""
pass
[docs] def close(self) -> None:
"""Close the environment
"""
self._env.close()
[docs]class IsaacOrbitWrapper(Wrapper):
def __init__(self, env: Any) -> None:
"""Isaac Orbit environment wrapper
:param env: The environment to wrap
:type env: Any supported Isaac Orbit environment
"""
super().__init__(env)
self._reset_once = True
self._obs_dict = None
[docs] def step(self, actions: torch.Tensor) -> Tuple[torch.Tensor, torch.Tensor, torch.Tensor, torch.Tensor, Any]:
"""Perform a step in the environment
:param actions: The actions to perform
:type actions: torch.Tensor
:return: Observation, reward, terminated, truncated, info
:rtype: tuple of torch.Tensor and any other info
"""
self._obs_dict, reward, terminated, info = self._env.step(actions)
truncated = torch.zeros_like(terminated)
return self._obs_dict["policy"], reward.view(-1, 1), terminated.view(-1, 1), truncated.view(-1, 1), info
[docs] def reset(self) -> Tuple[torch.Tensor, Any]:
"""Reset the environment
:return: Observation, info
:rtype: torch.Tensor and any other info
"""
if self._reset_once:
self._obs_dict = self._env.reset()
self._reset_once = False
return self._obs_dict["policy"], {}
[docs] def render(self, *args, **kwargs) -> None:
"""Render the environment
"""
pass
[docs] def close(self) -> None:
"""Close the environment
"""
self._env.close()
[docs]class GymWrapper(Wrapper):
def __init__(self, env: Any) -> None:
"""OpenAI Gym environment wrapper
:param env: The environment to wrap
:type env: Any supported OpenAI Gym environment
"""
super().__init__(env)
self._vectorized = False
try:
if isinstance(env, gym.vector.SyncVectorEnv) or isinstance(env, gym.vector.AsyncVectorEnv):
self._vectorized = True
self._reset_once = True
self._obs_tensor = None
self._info_dict = None
except Exception as e:
print("[WARNING] Failed to check for a vectorized environment: {}".format(e))
self._drepecated_api = version.parse(gym.__version__) < version.parse(" 0.26.0")
if self._drepecated_api:
beauty_print("Using a deprecated version of OpenAI Gym's API: {}".format(gym.__version__), type="warning")
@property
def state_space(self) -> gym.Space:
"""State space
An alias for the ``observation_space`` property
"""
if self._vectorized:
return self._env.single_observation_space
return self._env.observation_space
@property
def observation_space(self) -> gym.Space:
"""Observation space
"""
if self._vectorized:
return self._env.single_observation_space
return self._env.observation_space
@property
def action_space(self) -> gym.Space:
"""Action space
"""
if self._vectorized:
return self._env.single_action_space
return self._env.action_space
def _observation_to_tensor(self, observation: Any, space: Optional[gym.Space] = None) -> torch.Tensor:
"""Convert the OpenAI Gym observation to a flat tensor
:param observation: The OpenAI Gym observation to convert to a tensor
:type observation: Any supported OpenAI Gym observation space
:raises: ValueError if the observation space type is not supported
:return: The observation as a flat tensor
:rtype: torch.Tensor
"""
observation_space = self._env.observation_space if self._vectorized else self.observation_space
space = space if space is not None else observation_space
obs_shape = observation_space.shape # TODO: Check if this is correct
if self._vectorized and isinstance(space, gym.spaces.MultiDiscrete):
return torch.tensor(observation, device=self.device, dtype=torch.int64).view(self.num_envs, *obs_shape)
elif isinstance(observation, int):
return torch.tensor(observation, device=self.device, dtype=torch.int64).view(self.num_envs, *obs_shape)
elif isinstance(observation, np.ndarray):
return torch.tensor(observation, device=self.device, dtype=torch.float32).view(self.num_envs, *obs_shape)
elif isinstance(space, gym.spaces.Discrete):
return torch.tensor(observation, device=self.device, dtype=torch.float32).view(self.num_envs, *obs_shape)
elif isinstance(space, gym.spaces.Box):
return torch.tensor(observation, device=self.device, dtype=torch.float32).view(self.num_envs, *obs_shape)
elif isinstance(space, gym.spaces.Dict):
tmp = torch.cat([self._observation_to_tensor(observation[k], space[k]) \
for k in sorted(space.keys())], dim=-1).view(self.num_envs, -1)
return tmp
else:
raise ValueError("Observation space type {} not supported. Please report this issue".format(type(space)))
def _tensor_to_action(self, actions: torch.Tensor) -> Any:
"""Convert the action to the OpenAI Gym expected format
:param actions: The actions to perform
:type actions: torch.Tensor
:raise ValueError: If the action space type is not supported
:return: The action in the OpenAI Gym format
:rtype: Any supported OpenAI Gym action space
"""
space = self._env.action_space if self._vectorized else self.action_space
if self._vectorized:
if isinstance(space, gym.spaces.MultiDiscrete):
return np.array(actions.cpu().numpy(), dtype=space.dtype).reshape(space.shape)
elif isinstance(space, gym.spaces.Tuple):
if isinstance(space[0], gym.spaces.Box):
return np.array(actions.cpu().numpy(), dtype=space[0].dtype).reshape(space.shape)
elif isinstance(space[0], gym.spaces.Discrete):
return np.array(actions.cpu().numpy(), dtype=space[0].dtype).reshape(-1)
elif isinstance(space, gym.spaces.Discrete):
return actions.item()
elif isinstance(space, gym.spaces.Box):
return np.array(actions.cpu().numpy(), dtype=space.dtype).reshape(space.shape)
raise ValueError("Action space type {} not supported. Please report this issue".format(type(space)))
[docs] def step(self, actions: torch.Tensor) -> Tuple[torch.Tensor, torch.Tensor, torch.Tensor, torch.Tensor, Any]:
"""Perform a step in the environment
:param actions: The actions to perform
:type actions: torch.Tensor
:return: Observation, reward, terminated, truncated, info
:rtype: tuple of torch.Tensor and any other info
"""
if self._drepecated_api:
observation, reward, terminated, info = self._env.step(self._tensor_to_action(actions))
# truncated: https://gymnasium.farama.org/tutorials/handling_time_limits
if type(info) is list:
truncated = np.array([d.get("TimeLimit.truncated", False) for d in info], dtype=terminated.dtype)
terminated *= np.logical_not(truncated)
else:
truncated = info.get("TimeLimit.truncated", False)
if truncated:
terminated = False
else:
observation, reward, terminated, truncated, info = self._env.step(self._tensor_to_action(actions))
# convert response to torch
observation = self._observation_to_tensor(observation)
reward = torch.tensor(reward, device=self.device, dtype=torch.float32).view(self.num_envs, -1)
terminated = torch.tensor(terminated, device=self.device, dtype=torch.bool).view(self.num_envs, -1)
truncated = torch.tensor(truncated, device=self.device, dtype=torch.bool).view(self.num_envs, -1)
# save observation and info for vectorized envs
if self._vectorized:
self._obs_tensor = observation
self._info_dict = info
return observation, reward, terminated, truncated, info
[docs] def reset(self, seed=None) -> Tuple[torch.Tensor, Any]:
"""Reset the environment
:return: Observation, info
:rtype: torch.Tensor and any other info
"""
# handle vectorized envs
if self._vectorized:
if not self._reset_once:
return self._obs_tensor, self._info_dict
self._reset_once = False
# reset the env/envs
if self._drepecated_api:
observation = self._env.reset()
info = {}
else:
observation, info = self._env.reset(seed=seed) if seed is not None else self._env.reset()
return self._observation_to_tensor(observation), info
[docs] def render(self, *args, **kwargs) -> None:
"""Render the environment
"""
self._env.render(*args, **kwargs)
[docs] def close(self) -> None:
"""Close the environment
"""
self._env.close()
[docs]class GymnasiumWrapper(Wrapper):
def __init__(self, env: Any, seed: int = None) -> None:
"""Gymnasium environment wrapper
:param env: The environment to wrap
:type env: Any supported Gymnasium environment
"""
super().__init__(env)
self._vectorized = False
self.seed = seed
try:
if isinstance(env, gymnasium.vector.SyncVectorEnv) or isinstance(env, gymnasium.vector.AsyncVectorEnv):
self._vectorized = True
self._reset_once = True
self._obs_tensor = None
self._info_dict = None
except Exception as e:
print("[WARNING] Failed to check for a vectorized environment: {}".format(e))
@property
def state_space(self) -> gymnasium.Space:
"""State space
An alias for the ``observation_space`` property
"""
if self._vectorized:
return self._env.single_observation_space
return self._env.observation_space
@property
def observation_space(self) -> gymnasium.Space:
"""Observation space
"""
if self._vectorized:
return self._env.single_observation_space
return self._env.observation_space
@property
def action_space(self) -> gymnasium.Space:
"""Action space
"""
if self._vectorized:
return self._env.single_action_space
return self._env.action_space
def _observation_to_tensor(self, observation: Any, space: Optional[gymnasium.Space] = None) -> torch.Tensor:
"""Convert the Gymnasium observation to a flat tensor
:param observation: The Gymnasium observation to convert to a tensor
:type observation: Any supported Gymnasium observation space
:raises: ValueError if the observation space type is not supported
:return: The observation as a flat tensor
:rtype: torch.Tensor
"""
observation_space = self._env.observation_space if self._vectorized else self.observation_space
space = space if space is not None else observation_space
if self._vectorized and isinstance(space, gymnasium.spaces.MultiDiscrete):
return torch.tensor(observation, device=self.device, dtype=torch.int64).view(self.num_envs, -1)
elif isinstance(observation, int):
return torch.tensor(observation, device=self.device, dtype=torch.int64).view(self.num_envs, -1)
elif isinstance(observation, np.ndarray):
return torch.tensor(observation, device=self.device, dtype=torch.float32).view(self.num_envs, -1)
elif isinstance(space, gymnasium.spaces.Discrete):
return torch.tensor(observation, device=self.device, dtype=torch.float32).view(self.num_envs, -1)
elif isinstance(space, gymnasium.spaces.Box):
return torch.tensor(observation, device=self.device, dtype=torch.float32).view(self.num_envs, -1)
elif isinstance(space, gymnasium.spaces.Dict):
tmp = torch.cat([self._observation_to_tensor(observation[k], space[k]) \
for k in sorted(space.keys())], dim=-1).view(self.num_envs, -1)
return tmp
else:
raise ValueError("Observation space type {} not supported. Please report this issue".format(type(space)))
def _tensor_to_action(self, actions: torch.Tensor) -> Any:
"""Convert the action to the Gymnasium expected format
:param actions: The actions to perform
:type actions: torch.Tensor
:raise ValueError: If the action space type is not supported
:return: The action in the Gymnasium format
:rtype: Any supported Gymnasium action space
"""
space = self._env.action_space if self._vectorized else self.action_space
if self._vectorized:
if isinstance(space, gymnasium.spaces.MultiDiscrete):
return np.array(actions.cpu().numpy(), dtype=space.dtype).reshape(space.shape)
elif isinstance(space, gymnasium.spaces.Tuple):
if isinstance(space[0], gymnasium.spaces.Box):
return np.array(actions.cpu().numpy(), dtype=space[0].dtype).reshape(space.shape)
elif isinstance(space[0], gymnasium.spaces.Discrete):
return np.array(actions.cpu().numpy(), dtype=space[0].dtype).reshape(-1)
if isinstance(space, gymnasium.spaces.Discrete):
return actions.item()
elif isinstance(space, gymnasium.spaces.Box):
return np.array(actions.cpu().numpy(), dtype=space.dtype).reshape(space.shape)
raise ValueError("Action space type {} not supported. Please report this issue".format(type(space)))
[docs] def step(self, actions: torch.Tensor) -> Tuple[torch.Tensor, torch.Tensor, torch.Tensor, torch.Tensor, Any]:
"""Perform a step in the environment
:param actions: The actions to perform
:type actions: torch.Tensor
:return: Observation, reward, terminated, truncated, info
:rtype: tuple of torch.Tensor and any other info
"""
observation, reward, terminated, truncated, info = self._env.step(self._tensor_to_action(actions))
# convert response to torch
observation = self._observation_to_tensor(observation)
reward = torch.tensor(reward, device=self.device, dtype=torch.float32).view(self.num_envs, -1)
terminated = torch.tensor(terminated, device=self.device, dtype=torch.bool).view(self.num_envs, -1)
truncated = torch.tensor(truncated, device=self.device, dtype=torch.bool).view(self.num_envs, -1)
# save observation and info for vectorized envs
if self._vectorized:
self._obs_tensor = observation
self._info_dict = info
return observation, reward, terminated, truncated, info
[docs] def reset(self) -> Tuple[torch.Tensor, Any]:
"""Reset the environment
:return: Observation, info
:rtype: torch.Tensor and any other info
"""
# handle vectorized envs
if self._vectorized:
if not self._reset_once:
return self._obs_tensor, self._info_dict
self._reset_once = False
# reset the env/envs
if self.seed is None:
observation, info = self._env.reset()
else:
observation, info = self._env.reset(seed=self.seed)
return self._observation_to_tensor(observation), info
[docs] def render(self, *args, **kwargs) -> None:
"""Render the environment
"""
self._env.render(*args, **kwargs)
[docs] def close(self) -> None:
"""Close the environment
"""
self._env.close()
[docs]class DeepMindWrapper(Wrapper):
def __init__(self, env: Any) -> None:
"""DeepMind environment wrapper
:param env: The environment to wrap
:type env: Any supported DeepMind environment
"""
super().__init__(env)
from dm_env import specs
self._specs = specs
# observation and action spaces
self._observation_space = self._spec_to_space(self._env.observation_spec())
self._action_space = self._spec_to_space(self._env.action_spec())
@property
def state_space(self) -> gym.Space:
"""State space
An alias for the ``observation_space`` property
"""
return self._observation_space
@property
def observation_space(self) -> gym.Space:
"""Observation space
"""
return self._observation_space
@property
def action_space(self) -> gym.Space:
"""Action space
"""
return self._action_space
def _spec_to_space(self, spec: Any) -> gym.Space:
"""Convert the DeepMind spec to a Gym space
:param spec: The DeepMind spec to convert
:type spec: Any supported DeepMind spec
:raises: ValueError if the spec type is not supported
:return: The Gym space
:rtype: gym.Space
"""
if isinstance(spec, self._specs.DiscreteArray):
return gym.spaces.Discrete(spec.num_values)
elif isinstance(spec, self._specs.BoundedArray):
return gym.spaces.Box(shape=spec.shape,
dtype=spec.dtype,
low=spec.minimum if spec.minimum.ndim else np.full(spec.shape, spec.minimum),
high=spec.maximum if spec.maximum.ndim else np.full(spec.shape, spec.maximum))
elif isinstance(spec, self._specs.Array):
return gym.spaces.Box(shape=spec.shape,
dtype=spec.dtype,
low=np.full(spec.shape, float("-inf")),
high=np.full(spec.shape, float("inf")))
elif isinstance(spec, collections.OrderedDict):
return gym.spaces.Dict({k: self._spec_to_space(v) for k, v in spec.items()})
else:
raise ValueError("Spec type {} not supported. Please report this issue".format(type(spec)))
def _observation_to_tensor(self, observation: Any, spec: Optional[Any] = None) -> torch.Tensor:
"""Convert the DeepMind observation to a flat tensor
:param observation: The DeepMind observation to convert to a tensor
:type observation: Any supported DeepMind observation
:raises: ValueError if the observation spec type is not supported
:return: The observation as a flat tensor
:rtype: torch.Tensor
"""
spec = spec if spec is not None else self._env.observation_spec()
if isinstance(spec, self._specs.DiscreteArray):
return torch.tensor(observation, device=self.device, dtype=torch.float32).reshape(self.num_envs, -1)
elif isinstance(spec, self._specs.Array): # includes BoundedArray
return torch.tensor(observation, device=self.device, dtype=torch.float32).reshape(self.num_envs, -1)
elif isinstance(spec, collections.OrderedDict):
return torch.cat([self._observation_to_tensor(observation[k], spec[k]) \
for k in sorted(spec.keys())], dim=-1).reshape(self.num_envs, -1)
else:
raise ValueError("Observation spec type {} not supported. Please report this issue".format(type(spec)))
def _tensor_to_action(self, actions: torch.Tensor) -> Any:
"""Convert the action to the DeepMind expected format
:param actions: The actions to perform
:type actions: torch.Tensor
:raise ValueError: If the action space type is not supported
:return: The action in the DeepMind expected format
:rtype: Any supported DeepMind action
"""
spec = self._env.action_spec()
if isinstance(spec, self._specs.DiscreteArray):
return np.array(actions.item(), dtype=spec.dtype)
elif isinstance(spec, self._specs.Array): # includes BoundedArray
return np.array(actions.cpu().numpy(), dtype=spec.dtype).reshape(spec.shape)
else:
raise ValueError("Action spec type {} not supported. Please report this issue".format(type(spec)))
[docs] def step(self, actions: torch.Tensor) -> Tuple[torch.Tensor, torch.Tensor, torch.Tensor, torch.Tensor, Any]:
"""Perform a step in the environment
:param actions: The actions to perform
:type actions: torch.Tensor
:return: Observation, reward, terminated, truncated, info
:rtype: tuple of torch.Tensor and any other info
"""
timestep = self._env.step(self._tensor_to_action(actions))
observation = timestep.observation
reward = timestep.reward if timestep.reward is not None else 0
terminated = timestep.last()
truncated = False
info = {}
# convert response to torch
return self._observation_to_tensor(observation), \
torch.tensor(reward, device=self.device, dtype=torch.float32).view(self.num_envs, -1), \
torch.tensor(terminated, device=self.device, dtype=torch.bool).view(self.num_envs, -1), \
torch.tensor(truncated, device=self.device, dtype=torch.bool).view(self.num_envs, -1), \
info
[docs] def reset(self) -> Tuple[torch.Tensor, Any]:
"""Reset the environment
:return: The state of the environment
:rtype: torch.Tensor
"""
timestep = self._env.reset()
return self._observation_to_tensor(timestep.observation), {}
[docs] def render(self, *args, **kwargs) -> None:
"""Render the environment
OpenCV is used to render the environment.
Install OpenCV with ``pip install opencv-python``
"""
frame = self._env.physics.render(480, 640, camera_id=0)
# render the frame using OpenCV
import cv2
cv2.imshow("env", cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
cv2.waitKey(1)
[docs] def close(self) -> None:
"""Close the environment
"""
self._env.close()
[docs]class RobosuiteWrapper(Wrapper):
def __init__(self, env: Any) -> None:
"""Robosuite environment wrapper
:param env: The environment to wrap
:type env: Any supported robosuite environment
"""
super().__init__(env)
# observation and action spaces
self._observation_space = self._spec_to_space(self._env.observation_spec())
self._action_space = self._spec_to_space(self._env.action_spec)
@property
def state_space(self) -> gym.Space:
"""State space
An alias for the ``observation_space`` property
"""
return self._observation_space
@property
def observation_space(self) -> gym.Space:
"""Observation space
"""
return self._observation_space
@property
def action_space(self) -> gym.Space:
"""Action space
"""
return self._action_space
def _spec_to_space(self, spec: Any) -> gym.Space:
"""Convert the robosuite spec to a Gym space
:param spec: The robosuite spec to convert
:type spec: Any supported robosuite spec
:raises: ValueError if the spec type is not supported
:return: The Gym space
:rtype: gym.Space
"""
if type(spec) is tuple:
return gym.spaces.Box(shape=spec[0].shape,
dtype=np.float32,
low=spec[0],
high=spec[1])
elif isinstance(spec, np.ndarray):
return gym.spaces.Box(shape=spec.shape,
dtype=np.float32,
low=np.full(spec.shape, float("-inf")),
high=np.full(spec.shape, float("inf")))
elif isinstance(spec, collections.OrderedDict):
return gym.spaces.Dict({k: self._spec_to_space(v) for k, v in spec.items()})
else:
raise ValueError("Spec type {} not supported. Please report this issue".format(type(spec)))
def _observation_to_tensor(self, observation: Any, spec: Optional[Any] = None) -> torch.Tensor:
"""Convert the observation to a flat tensor
:param observation: The observation to convert to a tensor
:type observation: Any supported observation
:raises: ValueError if the observation spec type is not supported
:return: The observation as a flat tensor
:rtype: torch.Tensor
"""
spec = spec if spec is not None else self._env.observation_spec()
if isinstance(spec, np.ndarray):
return torch.tensor(observation, device=self.device, dtype=torch.float32).reshape(self.num_envs, -1)
elif isinstance(spec, collections.OrderedDict):
return torch.cat([self._observation_to_tensor(observation[k], spec[k]) \
for k in sorted(spec.keys())], dim=-1).reshape(self.num_envs, -1)
else:
raise ValueError("Observation spec type {} not supported. Please report this issue".format(type(spec)))
def _tensor_to_action(self, actions: torch.Tensor) -> Any:
"""Convert the action to the robosuite expected format
:param actions: The actions to perform
:type actions: torch.Tensor
:raise ValueError: If the action space type is not supported
:return: The action in the robosuite expected format
:rtype: Any supported robosuite action
"""
spec = self._env.action_spec
if type(spec) is tuple:
return np.array(actions.cpu().numpy(), dtype=np.float32).reshape(spec[0].shape)
else:
raise ValueError("Action spec type {} not supported. Please report this issue".format(type(spec)))
[docs] def step(self, actions: torch.Tensor) -> Tuple[torch.Tensor, torch.Tensor, torch.Tensor, torch.Tensor, Any]:
"""Perform a step in the environment
:param actions: The actions to perform
:type actions: torch.Tensor
:return: Observation, reward, terminated, truncated, info
:rtype: tuple of torch.Tensor and any other info
"""
observation, reward, terminated, info = self._env.step(self._tensor_to_action(actions))
truncated = False
info = {}
# convert response to torch
return self._observation_to_tensor(observation), \
torch.tensor(reward, device=self.device, dtype=torch.float32).view(self.num_envs, -1), \
torch.tensor(terminated, device=self.device, dtype=torch.bool).view(self.num_envs, -1), \
torch.tensor(truncated, device=self.device, dtype=torch.bool).view(self.num_envs, -1), \
info
[docs] def reset(self) -> Tuple[torch.Tensor, Any]:
"""Reset the environment
:return: The state of the environment
:rtype: torch.Tensor
"""
observation = self._env.reset()
return self._observation_to_tensor(observation), {}
[docs] def render(self, *args, **kwargs) -> None:
"""Render the environment
"""
self._env.render(*args, **kwargs)
[docs] def close(self) -> None:
"""Close the environment
"""
self._env.close()
[docs]def wrap_env(env: Any, wrapper: str = "auto", verbose: bool = True, logger=None, seed=None) -> Wrapper:
"""
Wrap an environment to use a common interface
:param env: The environment to be wrapped
:param wrapper: The type of wrapper to use (default: "auto").
If ``"auto"``, the wrapper will be automatically selected based on the environment class.
The supported wrappers are described in the following table:
.. raw:: html
<br>
+--------------------+-------------------------+
|Environment |Wrapper tag |
+====================+=========================+
|OpenAI Gym |``"gym"`` |
+--------------------+-------------------------+
|Gymnasium |``"gymnasium"`` |
+--------------------+-------------------------+
|DeepMind |``"dm"`` |
+--------------------+-------------------------+
|Robosuite |``"robosuite"`` |
+--------------------+-------------------------+
|Isaac Gym preview 2 |``"isaacgym-preview2"`` |
+--------------------+-------------------------+
|Isaac Gym preview 3 |``"isaacgym-preview3"`` |
+--------------------+-------------------------+
|Isaac Gym preview 4 |``"isaacgym-preview4"`` |
+--------------------+-------------------------+
|Omniverse Isaac Gym |``"omniverse-isaacgym"`` |
+--------------------+-------------------------+
|Isaac Sim (orbit) |``"isaac-orbit"`` |
+--------------------+-------------------------+
:param verbose: Whether to print the wrapper type (default: True)
:param logger: rofunc logger (default: None)
:param seed: random seed for env (default: None)
"""
if verbose:
logger.info("Environment class: {}".format(", ".join([str(base).replace("<class '", "").replace("'>", "") \
for base in env.__class__.__bases__])))
if wrapper == "auto":
base_classes = [str(base) for base in env.__class__.__bases__]
if "<class 'omni.isaac.gym.vec_env.vec_env_base.VecEnvBase'>" in base_classes or \
"<class 'omni.isaac.gym.vec_env.vec_env_mt.VecEnvMT'>" in base_classes or \
"<class 'omni.isaac.gym.vec_env.vec_env_base.VecEnvBase'>" in [str(env.__class__)] or \
"<class 'omni.isaac.gym.vec_env.vec_env_mt.VecEnvMT'>" in [str(env.__class__)]:
if verbose:
logger.info("Environment wrapper: Omniverse Isaac Gym")
return OmniverseIsaacGymWrapper(env)
elif isinstance(env, gym.core.Env) or isinstance(env, gym.core.Wrapper):
# isaac-orbit
if hasattr(env, "sim") and hasattr(env, "env_ns"):
if verbose:
logger.info("Environment wrapper: Isaac Orbit")
return IsaacOrbitWrapper(env)
# gym
if verbose:
logger.info("Environment wrapper: Gym")
return GymWrapper(env)
elif isinstance(env, gymnasium.core.Env) or isinstance(env, gymnasium.core.Wrapper):
if verbose:
logger.info("Environment wrapper: Gymnasium")
return GymnasiumWrapper(env, seed)
elif "<class 'dm_env._environment.Environment'>" in base_classes:
if verbose:
logger.info("Environment wrapper: DeepMind")
return DeepMindWrapper(env)
elif "<class 'robosuite.environments." in base_classes[0]:
if verbose:
logger.info("Environment wrapper: Robosuite")
return RobosuiteWrapper(env)
elif "<class 'rlgpu.tasks.base.vec_task.VecTask'>" in base_classes:
if verbose:
logger.info("Environment wrapper: Isaac Gym (preview 2)")
return IsaacGymPreview2Wrapper(env)
if verbose:
logger.info("Environment wrapper: Isaac Gym (preview 3/4)")
return IsaacGymPreview3Wrapper(env) # preview 4 is the same as 3
elif wrapper == "gym":
if verbose:
logger.info("Environment wrapper: Gym")
return GymWrapper(env)
elif wrapper == "gymnasium":
if verbose:
logger.info("Environment wrapper: gymnasium")
return GymnasiumWrapper(env)
elif wrapper == "dm":
if verbose:
logger.info("Environment wrapper: DeepMind")
return DeepMindWrapper(env)
elif wrapper == "robosuite":
if verbose:
logger.info("Environment wrapper: Robosuite")
return RobosuiteWrapper(env)
elif wrapper == "isaacgym-preview2":
if verbose:
logger.info("Environment wrapper: Isaac Gym (preview 2)")
return IsaacGymPreview2Wrapper(env)
elif wrapper == "isaacgym-preview3":
if verbose:
logger.info("Environment wrapper: Isaac Gym (preview 3)")
return IsaacGymPreview3Wrapper(env)
elif wrapper == "isaacgym-preview4":
if verbose:
logger.info("Environment wrapper: Isaac Gym (preview 4)")
return IsaacGymPreview3Wrapper(env) # preview 4 is the same as 3
elif wrapper == "omniverse-isaacgym":
if verbose:
logger.info("Environment wrapper: Omniverse Isaac Gym")
return OmniverseIsaacGymWrapper(env)
elif wrapper == "isaac-orbit":
if verbose:
logger.info("Environment wrapper: Isaac Orbit")
return IsaacOrbitWrapper(env)
else:
raise ValueError("Unknown {} wrapper type".format(wrapper))