Source code for rofunc.learning.RofuncRL.tasks.utils.env_loaders

import os
import sys
import queue
from contextlib import contextmanager

__all__ = ["load_isaacgym_env_preview2",
           "load_isaacgym_env_preview3",
           "load_isaacgym_env_preview4",
           "load_omniverse_isaacgym_env"]


[docs]@contextmanager def cwd(new_path: str) -> None: """Context manager to change the current working directory This function restores the current working directory after the context manager exits :param new_path: The new path to change to :type new_path: str """ current_path = os.getcwd() os.chdir(new_path) try: yield finally: os.chdir(current_path)
def _omegaconf_to_dict(config) -> dict: """Convert OmegaConf config to dict :param config: The OmegaConf config :type config: OmegaConf.Config :return: The config as dict :rtype: dict """ # return config.to_container(dict) from omegaconf import DictConfig d = {} for k, v in config.items(): d[k] = _omegaconf_to_dict(v) if isinstance(v, DictConfig) else v return d def _print_cfg(d, indent=0) -> None: """Print the environment configuration :param d: The dictionary to print :type d: dict :param indent: The indentation level (default: 0) :type indent: int, optional """ for key, value in d.items(): if isinstance(value, dict): _print_cfg(value, indent + 1) else: print(' | ' * indent + " |-- {}: {}".format(key, value))
[docs]def load_isaacgym_env_preview2(task_name: str = "", isaacgymenvs_path: str = "", show_cfg: bool = True): """Load an Isaac Gym environment (preview 2) :param task_name: The name of the task (default: ""). If not specified, the task name is taken from the command line argument (``--task TASK_NAME``). Command line argument has priority over function parameter if both are specified :type task_name: str, optional :param isaacgymenvs_path: The path to the ``rlgpu`` directory (default: ""). If empty, the path will obtained from isaacgym package metadata :type isaacgymenvs_path: str, optional :param show_cfg: Whether to print the configuration (default: True) :type show_cfg: bool, optional :raises ValueError: The task name has not been defined, neither by the function parameter nor by the command line arguments :raises RuntimeError: The isaacgym package is not installed or the path is wrong :return: Isaac Gym environment (preview 2) :rtype: tasks.base.vec_task.VecTask """ import isaacgym # check task from command line arguments defined = False for arg in sys.argv: if arg.startswith("--task"): defined = True break # get task name from command line arguments if defined: arg_index = sys.argv.index("--task") + 1 if arg_index >= len(sys.argv): raise ValueError("No task name defined. Set the task_name parameter or use --task <task_name> as command line argument") if task_name and task_name != sys.argv[arg_index]: print("[WARNING] Overriding task ({}) with command line argument ({})".format(task_name, sys.argv[arg_index])) # get task name from function arguments else: if task_name: sys.argv.append("--task") sys.argv.append(task_name) else: raise ValueError("No task name defined. Set the task_name parameter or use --task <task_name> as command line argument") # get isaacgym envs path from isaacgym package metadata if not isaacgymenvs_path: if not hasattr(isaacgym, "__path__"): raise RuntimeError("isaacgym package is not installed or could not be accessed by the current Python environment") path = isaacgym.__path__ path = os.path.join(path[0], "..", "rlgpu") else: path = isaacgymenvs_path # import required packages sys.path.append(path) status = True try: from utils.config import get_args, load_cfg, parse_sim_params from utils.parse_task import parse_task except Exception as e: status = False print("[ERROR] Failed to import required packages: {}".format(e)) if not status: raise RuntimeError("The path ({}) is not valid or the isaacgym package is not installed in editable mode (pip install -e .)" \ .format(path)) args = get_args() # print config if show_cfg: print("\nIsaac Gym environment ({})".format(args.task)) _print_cfg(vars(args)) # update task arguments args.cfg_train = os.path.join(path, args.cfg_train) args.cfg_env = os.path.join(path, args.cfg_env) # load environment with cwd(path): cfg, cfg_train, _ = load_cfg(args) sim_params = parse_sim_params(args, cfg, cfg_train) task, env = parse_task(args, cfg, cfg_train, sim_params) return env
[docs]def load_isaacgym_env_preview3(task_name: str = "", isaacgymenvs_path: str = "", show_cfg: bool = True): """Load an Isaac Gym environment (preview 3) Isaac Gym benchmark environments: https://github.com/NVIDIA-Omniverse/IsaacGymEnvs :param task_name: The name of the task (default: ""). If not specified, the task name is taken from the command line argument (``task=TASK_NAME``). Command line argument has priority over function parameter if both are specified :type task_name: str, optional :param isaacgymenvs_path: The path to the ``isaacgymenvs`` directory (default: ""). If empty, the path will obtained from isaacgymenvs package metadata :type isaacgymenvs_path: str, optional :param show_cfg: Whether to print the configuration (default: True) :type show_cfg: bool, optional :raises ValueError: The task name has not been defined, neither by the function parameter nor by the command line arguments :raises RuntimeError: The isaacgymenvs package is not installed or the path is wrong :return: Isaac Gym environment (preview 3) :rtype: isaacgymenvs.tasks.base.vec_task.VecTask """ from hydra.types import RunMode from hydra._internal.hydra import Hydra from hydra._internal.utils import create_automatic_config_search_path, get_args_parser from omegaconf import OmegaConf import isaacgym import isaacgymenvs # check task from command line arguments defined = False for arg in sys.argv: if arg.startswith("task="): defined = True break # get task name from command line arguments if defined: if task_name and task_name != arg.split("task=")[1].split(" ")[0]: print("[WARNING] Overriding task name ({}) with command line argument ({})" \ .format(task_name, arg.split("task=")[1].split(" ")[0])) # get task name from function arguments else: if task_name: sys.argv.append("task={}".format(task_name)) else: raise ValueError("No task name defined. Set task_name parameter or use task=<task_name> as command line argument") # get isaacgymenvs path from isaacgymenvs package metadata if isaacgymenvs_path == "": if not hasattr(isaacgymenvs, "__path__"): raise RuntimeError("isaacgymenvs package is not installed") isaacgymenvs_path = list(isaacgymenvs.__path__)[0] config_path = os.path.join(isaacgymenvs_path, "cfg") # set omegaconf resolvers try: OmegaConf.register_new_resolver('eq', lambda x, y: x.lower() == y.lower()) except Exception as e: pass try: OmegaConf.register_new_resolver('contains', lambda x, y: x.lower() in y.lower()) except Exception as e: pass try: OmegaConf.register_new_resolver('if', lambda condition, a, b: a if condition else b) except Exception as e: pass try: OmegaConf.register_new_resolver('resolve_default', lambda default, arg: default if arg == '' else arg) except Exception as e: pass # get hydra config without use @hydra.main config_file = "config" args = get_args_parser().parse_args() search_path = create_automatic_config_search_path(config_file, None, config_path) hydra_object = Hydra.create_main_hydra2(task_name='load_isaacgymenv', config_search_path=search_path) config = hydra_object.compose_config(config_file, args.overrides, run_mode=RunMode.RUN) cfg = _omegaconf_to_dict(config.task) # print config if show_cfg: print("\nIsaac Gym environment ({})".format(config.task.name)) _print_cfg(cfg) # load environment sys.path.append(isaacgymenvs_path) from tasks import isaacgym_task_map try: env = isaacgym_task_map[config.task.name](cfg=cfg, sim_device=config.sim_device, graphics_device_id=config.graphics_device_id, headless=config.headless) except TypeError as e: env = isaacgym_task_map[config.task.name](cfg=cfg, rl_device=config.rl_device, sim_device=config.sim_device, graphics_device_id=config.graphics_device_id, headless=config.headless, virtual_screen_capture=config.capture_video, # TODO: check force_render=config.force_render) return env
[docs]def load_isaacgym_env_preview4(task_name: str = "", isaacgymenvs_path: str = "", show_cfg: bool = True): """Load an Isaac Gym environment (preview 4) Isaac Gym benchmark environments: https://github.com/NVIDIA-Omniverse/IsaacGymEnvs :param task_name: The name of the task (default: ""). If not specified, the task name is taken from the command line argument (``task=TASK_NAME``). Command line argument has priority over function parameter if both are specified :type task_name: str, optional :param isaacgymenvs_path: The path to the ``isaacgymenvs`` directory (default: ""). If empty, the path will obtained from isaacgymenvs package metadata :type isaacgymenvs_path: str, optional :param show_cfg: Whether to print the configuration (default: True) :type show_cfg: bool, optional :raises ValueError: The task name has not been defined, neither by the function parameter nor by the command line arguments :raises RuntimeError: The isaacgymenvs package is not installed or the path is wrong :return: Isaac Gym environment (preview 4) :rtype: isaacgymenvs.tasks.base.vec_task.VecTask """ return load_isaacgym_env_preview3(task_name, isaacgymenvs_path, show_cfg)
[docs]def load_omniverse_isaacgym_env(task_name: str = "", omniisaacgymenvs_path: str = "", show_cfg: bool = True, multi_threaded: bool = False, timeout: int = 30): """Load an Omniverse Isaac Gym environment Omniverse Isaac Gym benchmark environments: https://github.com/NVIDIA-Omniverse/OmniIsaacGymEnvs :param task_name: The name of the task (default: ""). If not specified, the task name is taken from the command line argument (``task=TASK_NAME``). Command line argument has priority over function parameter if both are specified :type task_name: str, optional :param omniisaacgymenvs_path: The path to the ``rofunc.learning.RofuncRL.tasks.omniisaacgym`` directory (default: ""). If empty, the path will obtained from rofunc.learning.RofuncRL.tasks.omniisaacgym package metadata :type omniisaacgymenvs_path: str, optional :param show_cfg: Whether to print the configuration (default: True) :type show_cfg: bool, optional :param multi_threaded: Whether to use multi-threaded environment (default: False) :type multi_threaded: bool, optional :param timeout: Seconds to wait for data when queue is empty in multi-threaded environment (default: 30) :type timeout: int, optional :raises ValueError: The task name has not been defined, neither by the function parameter nor by the command line arguments :raises RuntimeError: The rofunc.learning.RofuncRL.tasks.omniisaacgym package is not installed or the path is wrong :return: Omniverse Isaac Gym environment :rtype: omni.isaac.gym.vec_env.vec_env_base.VecEnvBase or omni.isaac.gym.vec_env.vec_env_mt.VecEnvMT """ import torch from hydra.types import RunMode from hydra._internal.hydra import Hydra from hydra._internal.utils import create_automatic_config_search_path, get_args_parser from omegaconf import OmegaConf from omni.isaac.gym.vec_env import VecEnvBase, VecEnvMT, TaskStopException from omni.isaac.gym.vec_env.vec_env_mt import TrainerMT import rofunc.learning.RofuncRL.tasks.omniisaacgym # check task from command line arguments defined = False for arg in sys.argv: if arg.startswith("task="): defined = True break # get task name from command line arguments if defined: if task_name and task_name != arg.split("task=")[1].split(" ")[0]: print("[WARNING] Overriding task name ({}) with command line argument ({})" \ .format(task_name, arg.split("task=")[1].split(" ")[0])) # get task name from function arguments else: if task_name: sys.argv.append("task={}".format(task_name)) else: raise ValueError("No task name defined. Set task_name parameter or use task=<task_name> as command line argument") # get rofunc.learning.RofuncRL.tasks.omniisaacgym path from rofunc.learning.RofuncRL.tasks.omniisaacgym package metadata if omniisaacgymenvs_path == "": if not hasattr(rofunc.learning.RofuncRL.tasks.omniisaacgym, "__path__"): raise RuntimeError("rofunc.learning.RofuncRL.tasks.omniisaacgym package is not installed") omniisaacgymenvs_path = list(rofunc.learning.RofuncRL.tasks.omniisaacgym.__path__)[0] config_path = os.path.join(omniisaacgymenvs_path, "cfg") # set omegaconf resolvers OmegaConf.register_new_resolver('eq', lambda x, y: x.lower() == y.lower()) OmegaConf.register_new_resolver('contains', lambda x, y: x.lower() in y.lower()) OmegaConf.register_new_resolver('if', lambda condition, a, b: a if condition else b) OmegaConf.register_new_resolver('resolve_default', lambda default, arg: default if arg == '' else arg) # get hydra config without use @hydra.main config_file = "config" args = get_args_parser().parse_args() search_path = create_automatic_config_search_path(config_file, None, config_path) hydra_object = Hydra.create_main_hydra2(task_name='load_omniisaacgymenv', config_search_path=search_path) config = hydra_object.compose_config(config_file, args.overrides, run_mode=RunMode.RUN) del config.hydra cfg = _omegaconf_to_dict(config) cfg["train"] = {} # print config if show_cfg: print("\nOmniverse Isaac Gym environment ({})".format(config.task.name)) _print_cfg(cfg) # internal classes class _OmniIsaacGymVecEnv(VecEnvBase): def step(self, actions): actions = torch.clamp(actions, -self._task.clip_actions, self._task.clip_actions).to(self._task.device).clone() self._task.pre_physics_step(actions) for _ in range(self._task.control_frequency_inv): self._world.step(render=self._render) self.sim_frame_count += 1 observations, rewards, dones, info = self._task.post_physics_step() return {"obs": torch.clamp(observations, -self._task.clip_obs, self._task.clip_obs).to(self._task.rl_device).clone()}, \ rewards.to(self._task.rl_device).clone(), dones.to(self._task.rl_device).clone(), info.copy() def reset(self): self._task.reset() actions = torch.zeros((self.num_envs, self._task.num_actions), device=self._task.device) return self.step(actions)[0] class _OmniIsaacGymTrainerMT(TrainerMT): def run(self): pass def stop(self): pass class _OmniIsaacGymVecEnvMT(VecEnvMT): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.action_queue = queue.Queue(1) self.data_queue = queue.Queue(1) def run(self, trainer=None): super().run(_OmniIsaacGymTrainerMT() if trainer is None else trainer) def _parse_data(self, data): self._observations = torch.clamp(data["obs"], -self._task.clip_obs, self._task.clip_obs).to(self._task.rl_device).clone() self._rewards = data["rew"].to(self._task.rl_device).clone() self._dones = data["reset"].to(self._task.rl_device).clone() self._info = data["extras"].copy() def step(self, actions): if self._stop: raise TaskStopException() actions = torch.clamp(actions, -self._task.clip_actions, self._task.clip_actions).clone() self.send_actions(actions) data = self.get_data() return {"obs": self._observations}, self._rewards, self._dones, self._info def reset(self): self._task.reset() actions = torch.zeros((self.num_envs, self._task.num_actions), device=self._task.device) return self.step(actions)[0] def close(self): # end stop signal to main thread self.send_actions(None) self.stop = True # load environment sys.path.append(omniisaacgymenvs_path) from utils.task_util import initialize_task if config.multi_gpu: rank = int(os.getenv("LOCAL_RANK", "0")) config.device_id = rank config.rl_device = f"cuda:{rank}" enable_viewport = "enable_cameras" in config.task.sim and config.task.sim.enable_cameras if multi_threaded: env = _OmniIsaacGymVecEnvMT(headless=config.headless, sim_device=config.device_id, enable_livestream=config.enable_livestream, enable_viewport=enable_viewport) task = initialize_task(cfg, env, init_sim=False) env.initialize(env.action_queue, env.data_queue, timeout=timeout) else: env = _OmniIsaacGymVecEnv(headless=config.headless, sim_device=config.device_id, enable_livestream=config.enable_livestream, enable_viewport=enable_viewport) task = initialize_task(cfg, env, init_sim=True) return env
[docs]def load_isaac_orbit_env(task_name: str = "", show_cfg: bool = True): """Load an Isaac Orbit environment Isaac Orbit: https://isaac-orbit.github.io/orbit/index.html This function includes the definition and parsing of command line arguments used by Isaac Orbit: - ``--headless``: Force display off at all times - ``--cpu``: Use CPU pipeline - ``--num_envs``: Number of environments to simulate - ``--task``: Name of the task - ``--num_envs``: Seed used for the environment :param task_name: The name of the task (default: ""). If not specified, the task name is taken from the command line argument (``--task TASK_NAME``). Command line argument has priority over function parameter if both are specified :type task_name: str, optional :param show_cfg: Whether to print the configuration (default: True) :type show_cfg: bool, optional :raises ValueError: The task name has not been defined, neither by the function parameter nor by the command line arguments :return: Isaac Orbit environment :rtype: gym.Env """ import gym import atexit import argparse # check task from command line arguments defined = False for arg in sys.argv: if arg.startswith("--task"): defined = True break # get task name from command line arguments if defined: arg_index = sys.argv.index("--task") + 1 if arg_index >= len(sys.argv): raise ValueError("No task name defined. Set the task_name parameter or use --task <task_name> as command line argument") if task_name and task_name != sys.argv[arg_index]: print("[WARNING] Overriding task ({}) with command line argument ({})".format(task_name, sys.argv[arg_index])) # get task name from function arguments else: if task_name: sys.argv.append("--task") sys.argv.append(task_name) else: raise ValueError("No task name defined. Set the task_name parameter or use --task <task_name> as command line argument") # parse arguments parser = argparse.ArgumentParser("Welcome to Orbit: Omniverse Robotics Environments!") parser.add_argument("--headless", action="store_true", default=False, help="Force display off at all times.") parser.add_argument("--cpu", action="store_true", default=False, help="Use CPU pipeline.") parser.add_argument("--num_envs", type=int, default=None, help="Number of environments to simulate.") parser.add_argument("--task", type=str, default=None, help="Name of the task.") parser.add_argument("--seed", type=int, default=None, help="Seed used for the environment") args = parser.parse_args() # load the most efficient kit configuration in headless mode if args.headless: app_experience = f"{os.environ['EXP_PATH']}/omni.isaac.sim.python.gym.headless.kit" else: app_experience = f"{os.environ['EXP_PATH']}/omni.isaac.sim.python.kit" # launch the simulator from omni.isaac.kit import SimulationApp config = {"headless": args.headless} simulation_app = SimulationApp(config, experience=app_experience) @atexit.register def close_the_simulator(): simulation_app.close() # import orbit extensions import omni.isaac.contrib_envs # noqa: F401 import omni.isaac.orbit_envs # noqa: F401 from omni.isaac.orbit_envs.utils import parse_env_cfg cfg = parse_env_cfg(args.task, use_gpu=not args.cpu, num_envs=args.num_envs) # print config if show_cfg: print("\nIsaac Orbit environment ({})".format(args.task)) try: _print_cfg(cfg) except AttributeError as e: pass # load environment env = gym.make(args.task, cfg=cfg, headless=args.headless) return env