Source code for rofunc.learning.ml.tphsmm

import copy
from typing import Union, List, Tuple

import numpy as np
import pbdlib as pbd

import rofunc as rf
from rofunc.utils.logger.beauty_logger import beauty_print


[docs]class TPHSMM: def __init__(self, demos: Union[List, np.ndarray], nb_states: int = 4, reg: float = 1e-3, horizon: int = 150, plot: bool = False, task_params: Union[List, Union[List, Union[Tuple, np.ndarray]]] = None, dt: float = 0.01): """ Task-parameterized Hidden Semi-Markov Model (TP-GMM) :param demos: demo displacement :param nb_states: number of states in the HMM :param reg: regularization coefficient :param horizon: horizon of the reproduced trajectory :param plot: whether to plot the result """ self.demos_x = demos self.nb_states = nb_states self.reg = reg self.horizon = horizon self.plot = plot self.task_params = task_params self.dt = dt self.hsmm = None self.demos_xdx = [np.concatenate([x, dx], axis=-1) for x, dx in zip(self.demos_x, self.get_dx(self.demos_x))] self.nb_dim = self.demos_xdx[0].shape[1] # TODO Handle case of moving task parameters ? self.n_tp = task_params[0][0].shape[0] self.demos_tp_f, self.demos_tp = self._task_parametrize(self.demos_xdx, self.task_params)
[docs] def get_dx(self, demos_x): demos_dx = [] for i in range(len(demos_x)): demo_dx = [] for j in range(len(demos_x[i])): if 0 < j < len(demos_x[i]) - 1: dx = (demos_x[i][j + 1] - demos_x[i][j - 1]) / 2 elif j == len(demos_x[i]) - 1: dx = demos_x[i][j] - demos_x[i][j - 1] else: dx = demos_x[i][j + 1] - demos_x[i][j] dx = dx / self.dt demo_dx.append(dx) demos_dx.append(np.array(demo_dx)) return demos_dx
def _task_parametrize(self, demos_xdx, task_params): beauty_print(f'[{self.__class__.__name__}] Task parametrization') print(task_params[0][0].shape) if len(task_params[0][0].shape) == 4: demos_tp_f = [np.einsum('atij,atj->tai', _A, _x - _b) for _x, (_A, _b) in zip(demos_xdx, task_params)] else: print([(_x.shape, _A.shape, _b.shape) for _x, (_A, _b) in zip(demos_xdx, task_params)]) demos_tp_f = [np.einsum('taij,taj->tai', _A[None, :], _x[:, None] - _b) for _x, (_A, _b) in zip(demos_xdx, task_params)] demos_tp = [d.reshape(-1, self.nb_dim * self.n_tp) for d in demos_tp_f] return demos_tp_f, demos_tp
[docs] def hsmm_learning(self): """ Learn the task-parameterized HMM """ beauty_print(f'[{self.__class__.__name__}] Learning...', type='info') self.hsmm = pbd.HSMM(nb_states=self.nb_states, nb_dim=self.nb_dim) self.hsmm.init_hmm_kbins(self.demos_tp) self.hsmm.em(self.demos_tp, reg=self.reg) if self.plot: rf.ml.hmm_plot(self.nb_dim, self.demos_tp_f, self.hsmm)
[docs] def poe(self, show_demo_idx: int, task_params: tuple = None) -> pbd.GMM: """ Product of Expert/Gaussian (PoE), which calculates the mixture distribution from multiple coordinates :param model: learned model :param show_demo_idx: index of the specific demo to be reproduced :param task_params: [dict], task parameters for including transformation matrix A and bias b :return: The product of experts """ # get transformation for given demonstration. if task_params is not None: _A, _b = task_params[0], task_params[1] else: # We use the transformation of the first timestep as they are constant # Are they always constant ? (No) _A, _b = self.task_params[show_demo_idx] if len(_A.shape) == 4: _A, _b = _A[:, 0, :, :], _b[:, 0, :] marginal_models = [] # Get porduct of marginals for o in range(_A.shape[0]): mod = self.hsmm.marginal_model(slice(o * self.nb_dim, (o + 1) * self.nb_dim)).lintrans(_A[o].T, _b[o]) # reproj_demos.append([(_A[o].T @ d[:, o * self.nb_dim:(o + 1) * self.nb_dim].T).T + _b[o] for d in demo_frames_aug]) marginal_models.append(mod) if o == 0: prod = copy.deepcopy(mod) else: prod = prod * mod if self.plot: rf.ml.poe_plot(self.nb_dim, [marginal_models[0], marginal_models[1]], prod, self.demos_x, show_demo_idx) return prod
def _reproduce(self, prod: pbd.GMM, show_demo_idx: int, start_xdx: np.ndarray, dt: float = None) -> np.ndarray: """ Reproduce the specific demo_idx from the learned model :param model: learned model :param prod: result of PoE :param show_demo_idx: index of the specific demo to be reproduced :param start_xdx: start state :return: """ if dt is None: dt = self.dt # get the most probable sequence of state for this demonstration sq = self.hsmm.viterbi(self.demos_tp[show_demo_idx]) sq = np.repeat(np.array(sq), int(self.dt / dt)) # solving LQR with Product of Gaussian, see notebook on LQR lqr = rf.lqr.PoGLQR(nb_dim=self.nb_dim // 2, dt=dt, horizon=int(self.demos_xdx[show_demo_idx].shape[0] * self.dt / dt)) lqr.mvn_xi = prod.concatenate_gaussian(sq) # augmented version of gaussian lqr.mvn_u = -4 lqr.x0 = start_xdx xi = lqr.seq_xi if self.plot: rf.ml.gen_plot(self.nb_dim, xi, prod, self.demos_x, show_demo_idx) return xi def _generate(self, prod: pbd.GMM, ref_demo_idx: int, start_x: np.ndarray, task_params: tuple, horizon: int, dt: float = None) -> np.ndarray: """ Reproduce the specific demo_idx from the learned model :param model: learned model :param prod: result of PoE :param show_demo_idx: index of the specific demo to be reproduced :param start_xdx: start state :return: """ if dt is None: dt = self.dt # # get the most probable sequence of state for this demonstration # sq = self.hsmm.viterbi(self.demos_tp[show_demo_idx]) start_xdx = np.concatenate([start_x, self.get_dx([start_x])[0]], axis=-1) _, start_xdx_tp = self._task_parametrize(start_xdx, task_params=[task_params]) p0 = self.hsmm.forward_variable(demo=start_xdx_tp[0])[:, 0] if np.isnan(p0).any(): p0 = None sq = self.hsmm.forward_variable_ts(horizon, p0=p0) sq = np.argmax(sq, axis=0) # sq = np.repeat(np.array(sq), int(self.dt / dt)) # solving LQR with Product of Gaussian, see notebook on LQR lqr = rf.lqr.PoGLQR(nb_dim=self.nb_dim // 2, dt=dt, horizon=horizon) lqr.mvn_xi = prod.concatenate_gaussian(sq) # augmented version of gaussian lqr.mvn_u = -4 lqr.x0 = start_xdx[0] xi = lqr.seq_xi if self.plot: rf.ml.gen_plot(self.nb_dim, xi, prod, self.demos_x, ref_demo_idx) return xi
[docs] def fit(self) -> pbd.HSMM: """ Learning the single arm/agent trajectory representation from demonstration via TP-HSMM. """ beauty_print('Learning the trajectory representation from demonstration via TP-HSMM') model = self.hsmm_learning() return model
[docs] def reproduce(self, show_demo_idx: int, dt: float = None) -> Tuple[np.ndarray, pbd.GMM]: """ Reproduce the specific demo_idx from the learned model """ beauty_print('reproduce {}-th demo from learned representation'.format(show_demo_idx), type='info') if dt is None: dt = self.dt prod = self.poe(show_demo_idx) traj = self._reproduce(prod, show_demo_idx, self.demos_xdx[show_demo_idx][0], dt=dt) return traj, prod
[docs] def generate(self, ref_demo_idx: int, task_params: tuple, start_state: np.array, horizon: int = 100, dt: float = None) -> Tuple[np.ndarray, pbd.GMM]: """ Generate a new trajectory from the learned model """ beauty_print('generate a new demo from learned representation', type='info') # task_params_A_b = self.get_task_params_A_b(task_params) if dt is None: dt = self.dt beauty_print(f"[{self.__class__.__name__}] Start state: {start_state}") beauty_print(f"[{self.__class__.__name__}] Start state shape: {start_state.shape}") prod = self.poe(ref_demo_idx, task_params) traj = self._generate(prod, ref_demo_idx, start_state, task_params, horizon, dt=dt) return traj, prod