Source code for rofunc.learning.ml.tpgmr

# Copyright 2023, Junjia LIU, jjliu@mae.cuhk.edu.hk
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import Tuple

import numpy as np
import pbdlib as pbd
from numpy import ndarray
from scipy.linalg import block_diag

import rofunc as rf
from rofunc.learning.ml.tpgmm import TPGMM
from rofunc.utils.logger.beauty_logger import beauty_print


[docs]class TPGMR(TPGMM): def __init__(self, demos_x, task_params, nb_states: int = 4, reg: float = 1e-3, plot=False): """ Task-parameterized Gaussian Mixture Regression (TP-GMR) :param demos_x: demo displacement :param task_params: task parameters :param nb_states: number of states in the HMM :param reg: regularization term :param plot: whether to plot the result """ super().__init__(demos_x, task_params, nb_states=nb_states, reg=reg, plot=plot) self.gmr = rf.learning.gmr.GMR(self.demos_x, self.demos_dx, self.demos_xdx, nb_states=nb_states, reg=reg, plot=False)
[docs] def gmm_learning(self): # Learn the time-dependent GMR from demonstration t = np.linspace(0, 10, self.demos_x[0].shape[0]) demos = [np.hstack([t[:, None], d]) for d in self.demos_xdx_augm] self.gmr.demos = demos model = self.gmr.gmm_learning() mu_gmr, sigma_gmr = self.gmr.estimate(model, t[:, None], dim_in=slice(0, 1), dim_out=slice(1, 4 * len(self.demos_x[0]) + 1)) model = pbd.GMM(mu=mu_gmr, sigma=sigma_gmr) return model
def _reproduce(self, model: pbd.HMM, prod: pbd.GMM, show_demo_idx: int, start_xdx: np.ndarray) -> np.ndarray: """ Reproduce the specific demo_idx from the learned model :param model: learned model :param prod: result of PoE :param show_demo_idx: index of the specific demo to be reproduced :return: """ lqr = pbd.PoGLQR(nb_dim=len(self.demos_x[0][0]), dt=0.01, horizon=self.demos_xdx[show_demo_idx].shape[0]) mvn = pbd.MVN() mvn.mu = np.concatenate([i for i in prod.mu]) mvn._sigma = block_diag(*[i for i in prod.sigma]) lqr.mvn_xi = mvn lqr.mvn_u = -4 lqr.x0 = start_xdx xi = lqr.seq_xi if self.plot: if len(self.demos_x[0][0]) == 2: rf.ml.generate_plot(xi, prod, self.demos_x, show_demo_idx) elif len(self.demos_x[0][0]) > 2: rf.ml.generate_plot_3d(xi, prod, self.demos_x, show_demo_idx, scale=0.1) else: raise Exception('Dimension is less than 2, cannot plot') return xi
[docs] def fit(self): beauty_print('Learning the trajectory representation from demonstration via TP-GMR') model = self.gmm_learning() return model
[docs] def reproduce(self, model, show_demo_idx): beauty_print('reproduce {}-th demo from learned representation'.format(show_demo_idx), type='info') prod = self.poe(model, show_demo_idx) traj = self._reproduce(model, prod, show_demo_idx, self.demos_xdx[show_demo_idx][0]) return traj, prod
[docs] def generate(self, model: pbd.HMM, ref_demo_idx: int, task_params: dict) -> np.ndarray: beauty_print('generate new demo from learned representation', type='info') self.get_A_b() prod = self.poe(model, ref_demo_idx) traj = self._reproduce(model, prod, ref_demo_idx, self.task_params['frame_origins'][0][0]) return traj, prod
[docs]class TPGMRBi(TPGMR): def __init__(self, demos_left_x, demos_right_x, task_params, plot=False): self.demos_left_x = demos_left_x self.demos_right_x = demos_right_x self.plot = plot self.repr_l = TPGMR(demos_left_x, task_params['left'], plot=plot) self.repr_r = TPGMR(demos_right_x, task_params['right'], plot=plot)
[docs] def fit(self): beauty_print('Learning the trajectory representation from demonstration via TP-GMR') model_l = self.repr_l.gmm_learning() model_r = self.repr_r.gmm_learning() return model_l, model_r
[docs] def reproduce(self, models, show_demo_idx): beauty_print('reproduce {}-th demo from learned representation'.format(show_demo_idx), type='info') model_l, model_r = models prod_l = self.repr_l.poe(model_l, show_demo_idx) prod_r = self.repr_r.poe(model_r, show_demo_idx) traj_l = self.repr_l._reproduce(model_l, prod_l, show_demo_idx, self.repr_l.demos_xdx[show_demo_idx][0]) traj_r = self.repr_r._reproduce(model_r, prod_r, show_demo_idx, self.repr_r.demos_xdx[show_demo_idx][0]) if self.plot: nb_dim = int(traj_l.shape[1] / 2) data_lst = [traj_l[:, :nb_dim], traj_r[:, :nb_dim]] rf.visualab.traj_plot(data_lst, title='Reproduced bimanual trajectories') return traj_l, traj_r, prod_l, prod_r
[docs] def generate(self, models, ref_demo_idx: int, task_params: dict) -> \ Tuple[ndarray, ndarray]: beauty_print('generate new demo from learned representation', type='info') model_l, model_r = models self.repr_l.task_params = self.task_params['left'] self.repr_r.task_params = self.task_params['right'] self.repr_l.get_A_b() self.repr_r.get_A_b() prod_l = self.repr_l.poe(model_l, ref_demo_idx) prod_r = self.repr_r.poe(model_r, ref_demo_idx) traj_l = self.repr_l._reproduce(model_l, prod_l, ref_demo_idx, self.repr_l.task_params['frame_origins'][0][0]) traj_r = self.repr_r._reproduce(model_r, prod_r, ref_demo_idx, self.repr_l.task_params['frame_origins'][0][0]) if self.plot: nb_dim = int(traj_l.shape[1] / 2) data_lst = [traj_l[:, :nb_dim], traj_r[:, :nb_dim]] rf.visualab.traj_plot(data_lst, title='Generated bimanual trajectories') return traj_l, traj_r, prod_l, prod_r