Source code for rofunc.learning.RofuncRL.tasks.isaacgymenv.hotu.motion_lib

# Copyright 2023, Junjia LIU, jjliu@mae.cuhk.edu.hk
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import os
import time

import yaml
from isaacgym.torch_utils import *

import rofunc as rf
from rofunc.learning.RofuncRL.tasks.utils import torch_jit_utils as torch_utils
from rofunc.utils.datalab.poselib.poselib.core.rotation3d import *
from rofunc.utils.datalab.poselib.poselib.skeleton.skeleton3d import SkeletonMotion

USE_CACHE = True
print("MOVING MOTION DATA TO GPU, USING CACHE:", USE_CACHE)

if not USE_CACHE:
    old_numpy = torch.Tensor.numpy


    class Patch:
        def numpy(self):
            if self.is_cuda:
                return self.to("cpu").numpy()
            else:
                return old_numpy(self)


    torch.Tensor.numpy = Patch.numpy


[docs]class DeviceCache: def __init__(self, obj, device): self.obj = obj self.device = device keys = dir(obj) num_added = 0 for k in keys: try: out = getattr(obj, k) except: # print("Error for key=", k) continue if isinstance(out, torch.Tensor): if out.is_floating_point(): out = out.to(self.device, dtype=torch.float32) else: out.to(self.device) setattr(self, k, out) num_added += 1 elif isinstance(out, np.ndarray): out = torch.tensor(out) if out.is_floating_point(): out = out.to(self.device, dtype=torch.float32) else: out.to(self.device) setattr(self, k, out) num_added += 1 # print("Total added", num_added) def __getattr__(self, string): out = getattr(self.obj, string) return out
[docs]class MotionLib: def __init__(self, motion_file, dof_body_ids, dof_offsets, key_body_ids, device): """ Args: motion_file: dof_body_ids: dof_offsets: key_body_ids: device: """ self._dof_body_ids = dof_body_ids self._dof_offsets = dof_offsets self._num_dof = dof_offsets[-1] self._key_body_ids = torch.tensor(key_body_ids, device=device) self._device = device self._object_poses = torch.zeros((), device=device) self._load_motions(motion_file) motions = self._motions self.humanoid_height_offsets = [] for motion in motions: tar_global_pos = motion.global_translation min_h = torch.min(tar_global_pos[..., 2]) motion.global_translation[..., 2] -= min_h self.humanoid_height_offsets.append(min_h) self.gts = torch.cat([m.global_translation for m in motions], dim=0).float() self.grs = torch.cat([m.global_rotation for m in motions], dim=0).float() self.lrs = torch.cat([m.local_rotation for m in motions], dim=0).float() self.grvs = torch.cat([m.global_root_velocity for m in motions], dim=0).float() self.gravs = torch.cat( [m.global_root_angular_velocity for m in motions], dim=0 ).float() self.dvs = torch.cat([m.dof_vels for m in motions], dim=0).float() lengths = self._motion_num_frames lengths_shifted = lengths.roll(1) lengths_shifted[0] = 0 self.length_starts = lengths_shifted.cumsum(0) self.motion_ids = torch.arange( len(self._motions), dtype=torch.long, device=self._device ) self.init_local_rotation = torch.Tensor( np.load(os.path.join(rf.oslab.get_rofunc_path(), "utils/datalab/poselib/local_orientation.npy"))).to(device)
[docs] def num_motions(self): return len(self._motions)
[docs] def get_total_length(self): return sum(self._motion_lengths)
[docs] def get_motion(self, motion_id): return self._motions[motion_id]
[docs] def sample_motions(self, n): motion_ids = torch.multinomial( self._motion_weights, num_samples=n, replacement=True ) # m = self.num_motions() # motion_ids = np.random.choice(m, size=n, replace=True, p=self._motion_weights) # motion_ids = torch.tensor(motion_ids, device=self._device, dtype=torch.long) return motion_ids
[docs] def sample_time(self, motion_ids, truncate_time=None): n = len(motion_ids) phase = torch.rand(motion_ids.shape, device=self._device) motion_len = self._motion_lengths[motion_ids] if truncate_time is not None: assert truncate_time >= 0.0 motion_len -= truncate_time motion_time = phase * motion_len return motion_time
[docs] def get_motion_length(self, motion_ids): return self._motion_lengths[motion_ids]
[docs] def get_object_pose(self, frame_id): """Return object pose at frame=id, where id is recorded in motion_ids Args: frame_id [list]: Same frame id * num_envs, where num_envs is the environment number. Returns: """ if self._object_poses.ndim == 0: return None object_pose = self._object_poses[frame_id][0] return object_pose
[docs] def get_motion_state(self, motion_ids, motion_times): """ Args: motion_ids: motion_times: Returns: """ n = len(motion_ids) num_bodies = self._get_num_bodies() num_key_bodies = self._key_body_ids.shape[0] motion_len = self._motion_lengths[motion_ids] num_frames = self._motion_num_frames[motion_ids] dt = self._motion_dt[motion_ids] frame_idx0, frame_idx1, blend = self._calc_frame_blend( motion_times, motion_len, num_frames, dt ) f0l = frame_idx0 + self.length_starts[motion_ids] f1l = frame_idx1 + self.length_starts[motion_ids] root_pos0 = self.gts[f0l, 0] root_pos1 = self.gts[f1l, 0] root_rot0 = self.grs[f0l, 0] root_rot1 = self.grs[f1l, 0] local_rot0 = self.lrs[f0l] local_rot1 = self.lrs[f1l] root_vel = self.grvs[f0l] root_ang_vel = self.gravs[f0l] key_pos0 = self.gts[f0l.unsqueeze(-1), self._key_body_ids.unsqueeze(0)] key_pos1 = self.gts[f1l.unsqueeze(-1), self._key_body_ids.unsqueeze(0)] dof_vel = self.dvs[f0l] vals = [ root_pos0, root_pos1, local_rot0, local_rot1, root_vel, root_ang_vel, key_pos0, key_pos1, ] for v in vals: assert v.dtype != torch.float64 blend = blend.unsqueeze(-1) root_pos = (1.0 - blend) * root_pos0 + blend * root_pos1 root_rot = torch_utils.slerp(root_rot0, root_rot1, blend) blend_exp = blend.unsqueeze(-1) key_pos = (1.0 - blend_exp) * key_pos0 + blend_exp * key_pos1 local_rot = torch_utils.slerp( local_rot0, local_rot1, torch.unsqueeze(blend, axis=-1) ) dof_pos = self._local_rotation_to_dof(local_rot) return root_pos, root_rot, dof_pos, root_vel, root_ang_vel, dof_vel, key_pos, f0l, f1l
def _load_motions(self, motion_file): self._motions = [] self._motion_lengths = [] self._motion_weights = [] self._motion_fps = [] self._motion_dt = [] self._motion_num_frames = [] self._motion_files = [] total_len = 0.0 motion_files, motion_weights = self._fetch_motion_files(motion_file) num_motion_files = len(motion_files) import tqdm with tqdm.trange(num_motion_files, ncols=100, colour="green") as t_bar: for f in t_bar: curr_file = motion_files[f] t_bar.set_postfix_str("Loading: {:s}".format(curr_file.split("/")[-1])) curr_motion = SkeletonMotion.from_file(curr_file) motion_fps = curr_motion.fps curr_dt = 1.0 / motion_fps num_frames = curr_motion.tensor.shape[0] curr_len = 1.0 / motion_fps * (num_frames - 1) self._motion_fps.append(motion_fps) self._motion_dt.append(curr_dt) self._motion_num_frames.append(num_frames) curr_dof_vels = self._compute_motion_dof_vels(curr_motion) curr_motion.dof_vels = curr_dof_vels # Moving motion tensors to the GPU if USE_CACHE: curr_motion = DeviceCache(curr_motion, self._device) else: curr_motion.tensor = curr_motion.tensor.to(self._device) curr_motion._skeleton_tree._parent_indices = ( curr_motion._skeleton_tree._parent_indices.to(self._device) ) curr_motion._skeleton_tree._local_translation = ( curr_motion._skeleton_tree._local_translation.to(self._device) ) curr_motion._rotation = curr_motion._rotation.to(self._device) self._motions.append(curr_motion) self._motion_lengths.append(curr_len) curr_weight = motion_weights[f] self._motion_weights.append(curr_weight) self._motion_files.append(curr_file) for motion in self._motions: if motion.object_poses is not None: self._object_poses = motion.object_poses self._object_poses = torch.tensor( self._object_poses, device=self._device, dtype=torch.float32 ) break self._motion_lengths = torch.tensor( self._motion_lengths, device=self._device, dtype=torch.float32 ) self._motion_weights = torch.tensor( self._motion_weights, dtype=torch.float32, device=self._device ) self._motion_weights /= self._motion_weights.sum() self._motion_fps = torch.tensor( self._motion_fps, device=self._device, dtype=torch.float32 ) self._motion_dt = torch.tensor( self._motion_dt, device=self._device, dtype=torch.float32 ) self._motion_num_frames = torch.tensor( self._motion_num_frames, device=self._device ) num_motions = self.num_motions() total_len = self.get_total_length() print( "Loaded {:d} motions with a total length of {:.3f}s.".format( num_motions, total_len ) ) @staticmethod def _fetch_motion_files(motion_file): ext = os.path.splitext(motion_file)[1] if ext == ".yaml": dir_name = os.path.dirname(motion_file) motion_files = [] motion_weights = [] with open(os.path.join(os.getcwd(), motion_file), "r") as f: motion_config = yaml.load(f, Loader=yaml.SafeLoader) motion_list = motion_config["motions"] for motion_entry in motion_list: curr_file = motion_entry["file"] curr_weight = motion_entry["weight"] assert curr_weight >= 0 curr_file = os.path.join(dir_name, curr_file) motion_weights.append(curr_weight) motion_files.append(curr_file) else: motion_files = [motion_file] motion_weights = [1.0] return motion_files, motion_weights def _calc_frame_blend(self, time, len, num_frames, dt): phase = time / len phase = torch.clip(phase, 0.0, 1.0) frame_idx0 = (phase * (num_frames - 1)).long() frame_idx1 = torch.min(frame_idx0 + 1, num_frames - 1) blend = (time - frame_idx0 * dt) / dt return frame_idx0, frame_idx1, blend def _get_num_bodies(self): motion = self.get_motion(0) num_bodies = motion.num_joints return num_bodies def _compute_motion_dof_vels(self, motion): num_frames = motion.tensor.shape[0] dt = 1.0 / motion.fps dof_vels = [] for f in range(num_frames - 1): local_rot0 = motion.local_rotation[f] local_rot1 = motion.local_rotation[f + 1] frame_dof_vel = self._local_rotation_to_dof_vel(local_rot0, local_rot1, dt) frame_dof_vel = frame_dof_vel dof_vels.append(frame_dof_vel) dof_vels.append(dof_vels[-1]) dof_vels = torch.stack(dof_vels, dim=0) return dof_vels def _local_rotation_to_dof(self, local_rot): body_ids = self._dof_body_ids dof_offsets = self._dof_offsets n = local_rot.shape[0] dof_pos = torch.zeros( (n, self._num_dof), dtype=torch.float, device=self._device ) for j in range(len(body_ids)): body_id = body_ids[j] joint_offset = dof_offsets[j] joint_size = dof_offsets[j + 1] - joint_offset if joint_size == 3: joint_q = local_rot[:, body_id] joint_exp_map = torch_utils.quat_to_exp_map(joint_q) if body_id is 5: # Right hand new_joint_q = rf.robolab.quaternion_multiply_tensor_multirow2([0, 1, 0, 0], joint_q) joint_exp_map = torch_utils.quat_to_exp_map(new_joint_q) elif body_id is 23: # Left hand new_joint_q = rf.robolab.quaternion_multiply_tensor_multirow2([1, 0, 0, 0], joint_q) joint_exp_map = torch_utils.quat_to_exp_map(new_joint_q) dof_pos[:, joint_offset: (joint_offset + joint_size)] = joint_exp_map elif joint_size == 1: # TODO: check this if body_id in [*[i for i in range(10, 21)], *[i for i in range(28, 39)]]: # Right and left fingers except thumbs joint_q = local_rot[:, body_id] joint_theta, joint_axis = torch_utils.quat_to_angle_axis(joint_q) joint_theta = -(joint_theta * joint_axis[..., 2]) # assume joint is always along y axis elif body_id in [6, 24]: # right and left thumbs knuckles link joint_q = local_rot[:, body_id] joint_theta, joint_axis = torch_utils.quat_to_angle_axis(joint_q) joint_theta = -(joint_theta * joint_axis[..., 0]) # assume joint is always along y axis else: joint_q = local_rot[:, body_id] joint_theta, joint_axis = torch_utils.quat_to_angle_axis(joint_q) joint_theta = (joint_theta * joint_axis[..., 1]) # assume joint is always along y axis joint_theta = normalize_angle(joint_theta) dof_pos[:, joint_offset] = joint_theta else: print("Unsupported joint type") assert False return dof_pos def _local_rotation_to_dof_vel(self, local_rot0, local_rot1, dt): body_ids = self._dof_body_ids dof_offsets = self._dof_offsets dof_vel = torch.zeros([self._num_dof], device=self._device) diff_quat_data = quat_mul_norm(quat_inverse(local_rot0), local_rot1) diff_angle, diff_axis = quat_angle_axis(diff_quat_data) local_vel = diff_axis * diff_angle.unsqueeze(-1) / dt for j in range(len(body_ids)): body_id = body_ids[j] joint_offset = dof_offsets[j] joint_size = dof_offsets[j + 1] - joint_offset if joint_size == 3: joint_vel = local_vel[body_id] dof_vel[joint_offset: (joint_offset + joint_size)] = joint_vel elif joint_size == 1: assert joint_size == 1 joint_vel = local_vel[body_id] dof_vel[joint_offset] = joint_vel[ 1 ] # assume joint is always along y axis else: print("Unsupported joint type") assert False return dof_vel
[docs]class ObjectMotionLib: def __init__(self, object_motion_file, object_names, device, humanoid_start_time=None, height_offset=0.0): """ Object motion library for the IsaacGym humanoid series environments and process the motion data from Optitrack. :param object_motion_file: optitrack motion file path, should be .csv files, can be a list of files or a single file :param object_names: list of object names you want to load :param device: device same as the env/task device :param humanoid_start_time: start time of the humanoid motion, for the temporal synchronization :param height_offset: height offset of the object, for the spatial alignment """ self.object_motion_file = object_motion_file self.object_names = object_names self.object_poses_w_time = [] self.device = device self.humanoid_start_time = humanoid_start_time # in second, for the motion sync if self.humanoid_start_time is not None: assert len(self.humanoid_start_time) == len(self.object_motion_file) self.height_offset = height_offset if isinstance(self.object_motion_file, list): # Multiple files self.num_motions = len(self.object_motion_file) elif isinstance(self.object_motion_file, str): # Single file self.num_motions = 1 else: raise NotImplementedError self._load_motions() def _load_motions(self): objs_list, meta_list = rf.optitrack.get_objects(self.object_motion_file) self.scales = self._get_scale(meta_list) self.dts = self._get_dt(meta_list) if self.humanoid_start_time is not None: self.tds = self._get_time_difference(meta_list) else: self.tds = [0] * self.num_motions for i in range(self.num_motions): # data is a numpy array of shape (n_samples, n_features) # labels is a list of strings corresponding to the name of the features data, labels = rf.optitrack.data_clean(self.object_motion_file, legacy=False, objs=objs_list[i])[i] # Accessing the position and attitude of an object over all samples: # Coordinates names and order: ['x', 'y', 'z', 'qx', 'qy', 'qz', 'qw'] object_poses_dict = {} for object_name in self.object_names: data_ptr = labels.index(f'{object_name}.pose.x') assert data_ptr + 6 == labels.index(f'{object_name}.pose.qw') pose = data[:, data_ptr:data_ptr + 7] pose[:, :3] *= self.scales[i] # convert to meter pose = self._motion_transform(torch.tensor(pose, dtype=torch.float)) pose_w_time = torch.hstack((torch.tensor(data[:, 1], dtype=torch.float).unsqueeze(-1), pose)) object_poses_dict[object_name] = pose_w_time.to(self.device) self.object_poses_w_time.append(object_poses_dict) # [num_motions, num_objects, num_samples, 7] def _get_scale(self, meta_list): """ Get the scale of the motion data, convert to meter :param meta_list: list of meta data obtained from motion files :return: list of scale for each motion file """ scales = [] for i in range(len(meta_list)): if meta_list[i]["Length Units"] == "Centimeters": scales.append(0.01) elif meta_list[i]["Length Units"] == "Meters": scales.append(1.0) elif meta_list[i]["Length Units"] == "Millimeters": scales.append(0.001) else: raise NotImplementedError return scales def _get_dt(self, meta_list): """ Get the time interval of the motion data, convert to second :param meta_list: list of meta data obtained from motion files :return: list of dt for each motion file """ dts = [] for i in range(len(meta_list)): dts.append(1.0 / float(meta_list[i]["Export Frame Rate"])) return dts def _get_time_difference(self, meta_list): """ Get the time difference between the motion data and the humanoid motion by comparing their start time, convert to second :param meta_list: list of meta data obtained from motion files :return: list of time difference for each motion file """ time_diffs = [] for i in range(len(meta_list)): object_start_time = meta_list[i]["Capture Start Time"] y, t = object_start_time.split(' ')[0], object_start_time.split(' ')[1] if meta_list[i]["Take Name"].split(" ")[-1] == "PM": t = t.split(".") t[0] = str(int(t[0]) + 12) t = ":".join(t[:3]) timestamp = time.mktime(time.strptime(y + " " + t, "%Y-%m-%d %H:%M:%S")) time_diffs.append(float(timestamp - self.humanoid_start_time[i])) return time_diffs def _motion_transform(self, pose): """ Coordinate transformation from optitrack to IsaacGym :param pose: [num_samples, 7] :return: pose: [num_samples, 7] """ # y-up in the optitrack to z-up in IsaacGym # pose[:, 0] = pose[:, 0] # tmp = pose[:, 1].clone() # pose[:, 1] = -pose[:, 2] # pose[:, 2] = tmp - self.height_offset # pose[:, 3:] = rf.robolab.quaternion_multiply_tensor_multirow2(torch.tensor([0.5, 0.5, 0.5, 0.5]), # torch.tensor(pose[:, 3:])) homo_matrix = torch.tensor(rf.robolab.homo_matrix_from_quaternion([0.5, 0.5, 0.5, 0.5]), dtype=torch.float32) num_samples = len(pose) raw_position = pose[:, :3] new_position = torch.ones((num_samples, 4, 1)) new_position[:, :3] = raw_position.resize(num_samples, 3, 1) new_position = torch.bmm(homo_matrix.expand(num_samples, 4, 4), new_position) new_pose = torch.zeros((num_samples, 7)) # new_pose[:, 3:] = torch.tensor([0, 0, 0, 1], dtype=torch.float32) new_pose[:, :3] = new_position[:, :3].resize(num_samples, 3) new_pose[:, 2] -= self.height_offset.cpu() # pose[:, :3] = new_position[:, :3].resize(num_samples, 3) new_pose[:, 3:] = rf.robolab.quaternion_multiply_tensor_multirow2(torch.tensor([0.5, 0.5, 0.5, 0.5]), torch.tensor(pose[:, 3:])) # new_pose[:, 3:] = rf.robolab.quaternion_multiply_tensor_multirow2(torch.tensor([0, 0, -0.383, 0.924]), new_pose[:, 3:] = rf.robolab.quaternion_multiply_tensor_multirow2(torch.tensor([0, 0, 0.924, 0.383]), torch.tensor(new_pose[:, 3:])) return new_pose
[docs] def get_motion_state(self, motion_ids, motion_times): """ Get the object pose at the given time :param motion_ids: the id indicating which motion file to use :param motion_times: the time to get the object pose :return: """ approx_index = torch.round(motion_times[motion_ids] / self.dts[motion_ids] + self.tds[motion_ids])[0].long() if approx_index < 0: approx_index = 0 elif approx_index >= self.object_poses_w_time[motion_ids][self.object_names[0]].shape[0]: approx_index = self.object_poses_w_time[motion_ids][self.object_names[0]].shape[0] - 1 object_poses = {} for object_name in self.object_poses_w_time[motion_ids].keys(): object_poses[object_name] = self.object_poses_w_time[motion_ids][object_name][approx_index][1:] return object_poses