Source code for rofunc.config.utils

import copy
import os
from typing import Dict

import hydra
from hydra import compose, initialize
from hydra._internal.hydra import Hydra
from hydra._internal.utils import create_automatic_config_search_path
from hydra.core.hydra_config import HydraConfig
from hydra.types import RunMode
from omegaconf import DictConfig

import rofunc as rf
from rofunc.config import *
from rofunc.utils.oslab.path import get_rofunc_path

"""
Config loading rules:
1. If config_path and config_name are both None, load the default config file (Depreciated, designed for LQT)
2. Configs for RL/IL contains two parts: task and train
    - With the config_path and config_name, the config file `config.yaml` will be loaded and some params will be 
      rewritten by the args passed in.
    - args[0].split('=')[1] is the task name, args[1].split('=')[1] is the train name
"""


[docs]def get_sim_config(sim_name: str): """ Load the configs stored in agent_name.yaml. :param sim_name: Name of the config file under config/simulator for the agent. :return: A dict of configs. """ agent_config_file_path = os.path.join(os.path.dirname(__file__), f"simulator/{sim_name}.yaml") if not os.path.exists(agent_config_file_path): raise FileNotFoundError(f"{agent_config_file_path} does not exist") return OmegaConf.load(agent_config_file_path)
[docs]def get_config(config_path=None, config_name=None, args=None, debug=False, absl_config_path=None) -> DictConfig: """ Load config file for RofuncRL and rewrite some params by args. :param config_path: relative path to the config file (only for rofunc package) :param config_name: name of the config file (without .yaml) :param args: custom args to rewrite some params in the config file :param debug: if True, print the config :param absl_config_path: absolute path to the folder contains config file (for external user) :return: """ # reset current hydra config if already parsed (but not passed in here) if HydraConfig.initialized(): hydra.core.global_hydra.GlobalHydra.instance().clear() if (config_path is not None and config_name is not None) or ( absl_config_path is not None and config_name is not None ): assert None in [ config_path, absl_config_path, ], "config_path and absl_config_path cannot be set simultaneously" if args is None: with initialize(config_path=config_path, version_base=None): cfg = compose(config_name=config_name) else: if absl_config_path is None: rofunc_path = get_rofunc_path() absl_config_path = os.path.join(rofunc_path, "config/{}".format(config_path)) search_path = create_automatic_config_search_path(config_name, None, absl_config_path) hydra_object = Hydra.create_main_hydra2(task_name='load_isaacgymenv', config_search_path=search_path) # Find the available task and train config files try: cfg = hydra_object.compose_config(config_name, args, run_mode=RunMode.RUN) rf.logger.beauty_print('Use task config: {}.yaml'.format(args[0].split('=')[1]), type='info') rf.logger.beauty_print('Use train config: {}.yaml'.format(args[1].split('=')[1]), type='info') except Exception as e: rf.logger.beauty_print(e, type='warning') original_task = args[0].split('=')[1] if args[0].split('=')[1].split('_')[0] == 'Gym': task = 'GymBaseTask' args[0] = 'task={}'.format(task) rf.logger.beauty_print('Use task config: {}.yaml'.format(task), type='warning') try: cfg = hydra_object.compose_config(config_name, args, run_mode=RunMode.RUN) rf.logger.beauty_print('Use train config: {}.yaml'.format(args[1].split('=')[1]), type='info') except Exception as e: rf.logger.beauty_print(e, type='warning') train = 'BaseTask' + args[1].split('=')[1].split(original_task)[1] args[1] = 'train={}'.format(train) cfg = hydra_object.compose_config(config_name, args, run_mode=RunMode.RUN) rf.logger.beauty_print('Use train config: {}.yaml'.format(train), type='warning') else: with initialize(config_path="./", version_base=None): cfg = compose(config_name="lqt") if debug: print_config(cfg) return cfg
[docs]def process_omni_config(cfg): """ Load the configs stored in agent_name.yaml. :param cfg: :return: """ import carb import omni.usd default_physx_params = { ### Per-scene settings "use_gpu": False, "worker_thread_count": 4, "solver_type": 1, # 0: PGS, 1:TGS "bounce_threshold_velocity": 0.2, "friction_offset_threshold": 0.04, # A threshold of contact separation distance used to decide if a contact # point will experience friction forces. "friction_correlation_distance": 0.025, # Contact points can be merged into a single friction anchor if the # distance between the contacts is smaller than correlation distance. # disabling these can be useful for debugging "enable_sleeping": True, "enable_stabilization": True, # GPU buffers "gpu_max_rigid_contact_count": 512 * 1024, "gpu_max_rigid_patch_count": 80 * 1024, "gpu_found_lost_pairs_capacity": 1024, "gpu_found_lost_aggregate_pairs_capacity": 1024, "gpu_total_aggregate_pairs_capacity": 1024, "gpu_max_soft_body_contacts": 1024 * 1024, "gpu_max_particle_contacts": 1024 * 1024, "gpu_heap_capacity": 64 * 1024 * 1024, "gpu_temp_buffer_capacity": 16 * 1024 * 1024, "gpu_max_num_partitions": 8, ### Per-actor settings ( can override in actor_options ) "solver_position_iteration_count": 4, "solver_velocity_iteration_count": 1, "sleep_threshold": 0.0, # Mass-normalized kinetic energy threshold below which an actor may go to sleep. # Allowed range [0, max_float). "stabilization_threshold": 0.0, # Mass-normalized kinetic energy threshold below which an actor may # participate in stabilization. Allowed range [0, max_float). ### Per-body settings ( can override in actor_options ) "enable_gyroscopic_forces": False, "density": 1000.0, # density to be used for bodies that do not specify mass or density "max_depenetration_velocity": 100.0, ### Per-shape settings ( can override in actor_options ) "contact_offset": 0.02, "rest_offset": 0.001 } default_physics_material = { "static_friction": 1.0, "dynamic_friction": 1.0, "restitution": 0.0 } default_sim_params = { "gravity": [0.0, 0.0, -9.81], "dt": 1.0 / 60.0, "substeps": 1, "use_gpu_pipeline": True, "add_ground_plane": True, "add_distant_light": True, "use_flatcache": True, "enable_scene_query_support": False, "enable_cameras": False, "disable_contact_processing": False, "default_physics_material": default_physics_material } default_actor_options = { # -1 means use authored value from USD or default values from default_sim_params if not explicitly authored in USD. # If an attribute value is not explicitly authored in USD, add one with the value given here, # which overrides the USD default. "override_usd_defaults": False, "make_kinematic": -1, "enable_self_collisions": -1, "enable_gyroscopic_forces": -1, "solver_position_iteration_count": -1, "solver_velocity_iteration_count": -1, "sleep_threshold": -1, "stabilization_threshold": -1, "max_depenetration_velocity": -1, "density": -1, "mass": -1, "contact_offset": -1, "rest_offset": -1 } class OmniConfig: def __init__(self, config: dict = None): if config is None: config = dict() self._config = config self._cfg = config.get("task", dict()) self._parse_config() if self._config["inference"]: self._sim_params["enable_scene_query_support"] = True from omni.isaac.core.utils.extensions import enable_extension if self._config["headless"] == True and not self._sim_params["enable_cameras"] and not self._config[ "enable_livestream"]: self._sim_params["use_flatcache"] = False self._sim_params["enable_viewport"] = False else: self._sim_params["enable_viewport"] = True enable_extension("omni.kit.viewport.bundle") enable_extension("omni.replicator.isaac") if self._sim_params["disable_contact_processing"]: carb.settings.get_settings().set_bool("/physics/disableContactProcessing", True) carb.settings.get_settings().set_bool("/physics/physxDispatcher", True) def _parse_config(self): # general sim parameter self._sim_params = copy.deepcopy(default_sim_params) self._default_physics_material = copy.deepcopy(default_physics_material) sim_cfg = self._cfg.get("sim", None) if sim_cfg is not None: for opt in sim_cfg.keys(): if opt in self._sim_params: if opt == "default_physics_material": for material_opt in sim_cfg[opt]: self._default_physics_material[material_opt] = sim_cfg[opt][material_opt] else: self._sim_params[opt] = sim_cfg[opt] else: print("Sim params does not have attribute: ", opt) self._sim_params["default_physics_material"] = self._default_physics_material # physx parameters self._physx_params = copy.deepcopy(default_physx_params) if sim_cfg is not None and "physx" in sim_cfg: for opt in sim_cfg["physx"].keys(): if opt in self._physx_params: self._physx_params[opt] = sim_cfg["physx"][opt] else: print("Physx sim params does not have attribute: ", opt) self._sanitize_device() def _sanitize_device(self): if self._sim_params["use_gpu_pipeline"]: self._physx_params["use_gpu"] = True # device should be in sync with pipeline if self._sim_params["use_gpu_pipeline"]: self._config["sim_device"] = f"cuda:{self._config['device_id']}" else: self._config["sim_device"] = "cpu" # also write to physics params for setting sim device self._physx_params["sim_device"] = self._config["sim_device"] print("Pipeline: ", "GPU" if self._sim_params["use_gpu_pipeline"] else "CPU") print("Pipeline Device: ", self._config["sim_device"]) print("Sim Device: ", "GPU" if self._physx_params["use_gpu"] else "CPU") def parse_actor_config(self, actor_name): actor_params = copy.deepcopy(default_actor_options) if "sim" in self._cfg and actor_name in self._cfg["sim"]: actor_cfg = self._cfg["sim"][actor_name] for opt in actor_cfg.keys(): if actor_cfg[opt] != -1 and opt in actor_params: actor_params[opt] = actor_cfg[opt] elif opt not in actor_params: print("Actor params does not have attribute: ", opt) return actor_params def _get_actor_config_value(self, actor_name, attribute_name, attribute=None): actor_params = self.parse_actor_config(actor_name) if attribute is not None: if attribute_name not in actor_params: return attribute.Get() if actor_params[attribute_name] != -1: return actor_params[attribute_name] elif actor_params["override_usd_defaults"] and not attribute.IsAuthored(): return self._physx_params[attribute_name] else: if actor_params[attribute_name] != -1: return actor_params[attribute_name] @property def sim_params(self): return self._sim_params @property def config(self): return self._config @property def task_config(self): return self._cfg @property def physx_params(self): return self._physx_params def get_physics_params(self): return {**self.sim_params, **self.physx_params} def _get_physx_collision_api(self, prim): from pxr import PhysxSchema physx_collision_api = PhysxSchema.PhysxCollisionAPI(prim) if not physx_collision_api: physx_collision_api = PhysxSchema.PhysxCollisionAPI.Apply(prim) return physx_collision_api def _get_physx_rigid_body_api(self, prim): from pxr import PhysxSchema physx_rb_api = PhysxSchema.PhysxRigidBodyAPI(prim) if not physx_rb_api: physx_rb_api = PhysxSchema.PhysxRigidBodyAPI.Apply(prim) return physx_rb_api def _get_physx_articulation_api(self, prim): from pxr import PhysxSchema arti_api = PhysxSchema.PhysxArticulationAPI(prim) if not arti_api: arti_api = PhysxSchema.PhysxArticulationAPI.Apply(prim) return arti_api def set_contact_offset(self, name, prim, value=None): physx_collision_api = self._get_physx_collision_api(prim) contact_offset = physx_collision_api.GetContactOffsetAttr() # if not contact_offset: # contact_offset = physx_collision_api.CreateContactOffsetAttr() if value is None: value = self._get_actor_config_value(name, "contact_offset", contact_offset) if value != -1: contact_offset.Set(value) def set_rest_offset(self, name, prim, value=None): physx_collision_api = self._get_physx_collision_api(prim) rest_offset = physx_collision_api.GetRestOffsetAttr() # if not rest_offset: # rest_offset = physx_collision_api.CreateRestOffsetAttr() if value is None: value = self._get_actor_config_value(name, "rest_offset", rest_offset) if value != -1: rest_offset.Set(value) def set_position_iteration(self, name, prim, value=None): physx_rb_api = self._get_physx_rigid_body_api(prim) solver_position_iteration_count = physx_rb_api.GetSolverPositionIterationCountAttr() if value is None: value = self._get_actor_config_value(name, "solver_position_iteration_count", solver_position_iteration_count) if value != -1: solver_position_iteration_count.Set(value) def set_velocity_iteration(self, name, prim, value=None): physx_rb_api = self._get_physx_rigid_body_api(prim) solver_velocity_iteration_count = physx_rb_api.GetSolverVelocityIterationCountAttr() if value is None: value = self._get_actor_config_value(name, "solver_velocity_iteration_count", solver_position_iteration_count) if value != -1: solver_velocity_iteration_count.Set(value) def set_max_depenetration_velocity(self, name, prim, value=None): physx_rb_api = self._get_physx_rigid_body_api(prim) max_depenetration_velocity = physx_rb_api.GetMaxDepenetrationVelocityAttr() if value is None: value = self._get_actor_config_value(name, "max_depenetration_velocity", max_depenetration_velocity) if value != -1: max_depenetration_velocity.Set(value) def set_sleep_threshold(self, name, prim, value=None): physx_rb_api = self._get_physx_rigid_body_api(prim) sleep_threshold = physx_rb_api.GetSleepThresholdAttr() if value is None: value = self._get_actor_config_value(name, "sleep_threshold", sleep_threshold) if value != -1: sleep_threshold.Set(value) def set_stabilization_threshold(self, name, prim, value=None): physx_rb_api = self._get_physx_rigid_body_api(prim) stabilization_threshold = physx_rb_api.GetStabilizationThresholdAttr() if value is None: value = self._get_actor_config_value(name, "stabilization_threshold", stabilization_threshold) if value != -1: stabilization_threshold.Set(value) def set_gyroscopic_forces(self, name, prim, value=None): physx_rb_api = self._get_physx_rigid_body_api(prim) enable_gyroscopic_forces = physx_rb_api.GetEnableGyroscopicForcesAttr() if value is None: value = self._get_actor_config_value(name, "enable_gyroscopic_forces", enable_gyroscopic_forces) if value != -1: enable_gyroscopic_forces.Set(value) def set_density(self, name, prim, value=None): physx_rb_api = self._get_physx_rigid_body_api(prim) density = physx_rb_api.GetDensityAttr() if value is None: value = self._get_actor_config_value(name, "density", density) if value != -1: density.Set(value) # auto-compute mass self.set_mass(prim, 0.0) def set_mass(self, name, prim, value=None): physx_rb_api = self._get_physx_rigid_body_api(prim) mass = physx_rb_api.GetMassAttr() if value is None: value = self._get_actor_config_value(name, "mass", mass) if value != -1: mass.Set(value) def retain_acceleration(self, prim): # retain accelerations if running with more than one substep physx_rb_api = self._get_physx_rigid_body_api(prim) if self._sim_params["substeps"] > 1: physx_rb_api.GetRetainAccelerationsAttr().Set(True) def make_kinematic(self, name, prim, cfg, value=None): # make rigid body kinematic (fixed base and no collision) from pxr import UsdPhysics stage = omni.usd.get_context().get_stage() if value is None: value = self._get_actor_config_value(name, "make_kinematic") if value: # parse through all children prims prims = [prim] while len(prims) > 0: cur_prim = prims.pop(0) rb = UsdPhysics.RigidBodyAPI.Get(stage, cur_prim.GetPath()) if rb: rb.CreateKinematicEnabledAttr().Set(True) children_prims = cur_prim.GetPrim().GetChildren() prims = prims + children_prims def set_articulation_position_iteration(self, name, prim, value=None): arti_api = self._get_physx_articulation_api(prim) solver_position_iteration_count = arti_api.GetSolverPositionIterationCountAttr() if value is None: value = self._get_actor_config_value(name, "solver_position_iteration_count", solver_position_iteration_count) if value != -1: solver_position_iteration_count.Set(value) def set_articulation_velocity_iteration(self, name, prim, value=None): arti_api = self._get_physx_articulation_api(prim) solver_velocity_iteration_count = arti_api.GetSolverVelocityIterationCountAttr() if value is None: value = self._get_actor_config_value(name, "solver_velocity_iteration_count", solver_position_iteration_count) if value != -1: solver_velocity_iteration_count.Set(value) def set_articulation_sleep_threshold(self, name, prim, value=None): arti_api = self._get_physx_articulation_api(prim) sleep_threshold = arti_api.GetSleepThresholdAttr() if value is None: value = self._get_actor_config_value(name, "sleep_threshold", sleep_threshold) if value != -1: sleep_threshold.Set(value) def set_articulation_stabilization_threshold(self, name, prim, value=None): arti_api = self._get_physx_articulation_api(prim) stabilization_threshold = arti_api.GetStabilizationThresholdAttr() if value is None: value = self._get_actor_config_value(name, "stabilization_threshold", stabilization_threshold) if value != -1: stabilization_threshold.Set(value) def apply_rigid_body_settings(self, name, prim, cfg, is_articulation): from pxr import UsdPhysics, PhysxSchema stage = omni.usd.get_context().get_stage() rb_api = UsdPhysics.RigidBodyAPI.Get(stage, prim.GetPath()) physx_rb_api = PhysxSchema.PhysxRigidBodyAPI.Get(stage, prim.GetPath()) if not physx_rb_api: physx_rb_api = PhysxSchema.PhysxRigidBodyAPI.Apply(prim) # if it's a body in an articulation, it's handled at articulation root if not is_articulation: self.make_kinematic(name, prim, cfg, cfg["make_kinematic"]) self.set_position_iteration(name, prim, cfg["solver_position_iteration_count"]) self.set_velocity_iteration(name, prim, cfg["solver_velocity_iteration_count"]) self.set_max_depenetration_velocity(name, prim, cfg["max_depenetration_velocity"]) self.set_sleep_threshold(name, prim, cfg["sleep_threshold"]) self.set_stabilization_threshold(name, prim, cfg["stabilization_threshold"]) self.set_gyroscopic_forces(name, prim, cfg["enable_gyroscopic_forces"]) # density and mass mass_api = UsdPhysics.MassAPI.Get(stage, prim.GetPath()) if mass_api is None: mass_api = UsdPhysics.MassAPI.Apply(prim) mass_attr = mass_api.GetMassAttr() density_attr = mass_api.GetDensityAttr() if not mass_attr: mass_attr = mass_api.CreateMassAttr() if not density_attr: density_attr = mass_api.CreateDensityAttr() if cfg["density"] != -1: density_attr.Set(cfg["density"]) mass_attr.Set(0.0) # mass is to be computed elif cfg["override_usd_defaults"] and not density_attr.IsAuthored() and not mass_attr.IsAuthored(): density_attr.Set(self._physx_params["density"]) self.retain_acceleration(prim) def apply_rigid_shape_settings(self, name, prim, cfg): from pxr import UsdPhysics, PhysxSchema stage = omni.usd.get_context().get_stage() # collision APIs collision_api = UsdPhysics.CollisionAPI(prim) if not collision_api: collision_api = UsdPhysics.CollisionAPI.Apply(prim) physx_collision_api = PhysxSchema.PhysxCollisionAPI(prim) if not physx_collision_api: physx_collision_api = PhysxSchema.PhysxCollisionAPI.Apply(prim) self.set_contact_offset(name, prim, cfg["contact_offset"]) self.set_rest_offset(name, prim, cfg["rest_offset"]) def apply_articulation_settings(self, name, prim, cfg): from pxr import UsdPhysics, PhysxSchema stage = omni.usd.get_context().get_stage() is_articulation = False # check if is articulation prims = [prim] while len(prims) > 0: prim_tmp = prims.pop(0) articulation_api = UsdPhysics.ArticulationRootAPI.Get(stage, prim_tmp.GetPath()) physx_articulation_api = PhysxSchema.PhysxArticulationAPI.Get(stage, prim_tmp.GetPath()) if articulation_api or physx_articulation_api: is_articulation = True children_prims = prim_tmp.GetPrim().GetChildren() prims = prims + children_prims # parse through all children prims prims = [prim] while len(prims) > 0: cur_prim = prims.pop(0) rb = UsdPhysics.RigidBodyAPI.Get(stage, cur_prim.GetPath()) collision_body = UsdPhysics.CollisionAPI.Get(stage, cur_prim.GetPath()) articulation = UsdPhysics.ArticulationRootAPI.Get(stage, cur_prim.GetPath()) if rb: self.apply_rigid_body_settings(name, cur_prim, cfg, is_articulation) if collision_body: self.apply_rigid_shape_settings(name, cur_prim, cfg) if articulation: articulation_api = UsdPhysics.ArticulationRootAPI.Get(stage, cur_prim.GetPath()) physx_articulation_api = PhysxSchema.PhysxArticulationAPI.Get(stage, cur_prim.GetPath()) # enable self collisions enable_self_collisions = physx_articulation_api.GetEnabledSelfCollisionsAttr() if cfg["enable_self_collisions"] != -1: enable_self_collisions.Set(cfg["enable_self_collisions"]) self.set_articulation_position_iteration(name, cur_prim, cfg["solver_position_iteration_count"]) self.set_articulation_velocity_iteration(name, cur_prim, cfg["solver_velocity_iteration_count"]) self.set_articulation_sleep_threshold(name, cur_prim, cfg["sleep_threshold"]) self.set_articulation_stabilization_threshold(name, cur_prim, cfg["stabilization_threshold"]) children_prims = cur_prim.GetPrim().GetChildren() prims = prims + children_prims cfg_dict = omegaconf_to_dict(cfg) omni_config = OmniConfig(cfg_dict) return omni_config
[docs]def load_view_motion_config(config_name): """Load the configs stored in config_name.yaml. Args: config_name (str): Name of the config file for viewing motion. Returns: (dict): A dict of configs. """ config_file_path = os.path.join( os.path.dirname(__file__), f"view_motion/{config_name}.yaml" ) if not os.path.exists(config_file_path): raise FileNotFoundError(f"{config_file_path} does not exist") return OmegaConf.load(config_file_path)
[docs]def load_ikea_config(ikea_name): """Load the configs stored in ikea_name.yaml. Args: ikea_name (str): Name of the config file under config/ikea for the ikea furniture. Returns: (dict): A dict of configs. """ agent_config_file_path = os.path.join( os.path.dirname(__file__), f"ikea/{ikea_name}.yaml" ) if not os.path.exists(agent_config_file_path): raise FileNotFoundError(f"{agent_config_file_path} does not exist") return OmegaConf.load(agent_config_file_path)
[docs]def omegaconf_to_dict(config: DictConfig) -> Dict: """ Converts an omegaconf DictConfig to a python Dict, respecting variable interpolation. """ d = {} for k, v in config.items(): try: d[k] = omegaconf_to_dict(v) if isinstance(v, DictConfig) else v except: pass return d
[docs]def dict_to_omegaconf(d: Dict, save_path: str = None) -> DictConfig: """ Converts a python Dict to an omegaconf DictConfig, respecting variable interpolation. """ conf = OmegaConf.create(d) if save_path is not None: with open(save_path, "w") as fp: OmegaConf.save(config=conf, f=fp.name) loaded = OmegaConf.load(fp.name) assert conf == loaded else: return conf
if __name__ == "__main__": TD3_DEFAULT_CONFIG = { "gradient_steps": 1, # gradient steps "batch_size": 64, # training batch size "discount_factor": 0.99, # discount factor (gamma) "polyak": 0.005, # soft update hyperparameter (tau) "actor_learning_rate": 1e-3, # actor learning rate "critic_learning_rate": 1e-3, # critic learning rate "learning_rate_scheduler": None, # learning rate scheduler class (see torch.optim.lr_scheduler) "learning_rate_scheduler_kwargs": {}, # learning rate scheduler's kwargs (e.g. {"step_size": 1e-3}) "state_preprocessor": None, # state preprocessor class (see skrl.resources.preprocessors) "state_preprocessor_kwargs": {}, # state preprocessor's kwargs (e.g. {"size": env.observation_space}) "random_timesteps": 0, # random exploration steps "learning_starts": 0, # learning starts after this many steps "exploration": { "noise": None, # exploration noise "initial_scale": 1.0, # initial scale for noise "final_scale": 1e-3, # final scale for noise "timesteps": None, # timesteps for noise decay }, "policy_delay": 2, # policy delay update with respect to critic update "smooth_regularization_noise": None, # smooth noise for regularization "smooth_regularization_clip": 0.5, # clip for smooth regularization "rewards_shaper": None, # rewards shaping function: Callable(reward, timestep, timesteps) -> reward "experiment": { "directory": "", # experiment's parent directory "experiment_name": "", # experiment name "write_interval": 250, # TensorBoard writing interval (timesteps) "checkpoint_interval": 1000, # interval for checkpoints (timesteps) "store_separately": False, # whether to store checkpoints separately }, } dict_to_omegaconf( TD3_DEFAULT_CONFIG, save_path="/home/ubuntu/Github/Knowledge-Universe/Robotics/Roadmap-for-robot-science/rofunc/config/learning/rl/agent/td3_default_config_skrl.yaml", )