Source code for rofunc.learning.ml.hmm

# Copyright 2023, Junjia LIU, jjliu@mae.cuhk.edu.hk
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from pbdlib.model import *

from rofunc.learning.ml.gmm import GMM
from rofunc.utils.logger.beauty_logger import beauty_print


[docs]class HMM(GMM): def __init__(self, nb_states, nb_dim=2): GMM.__init__(self, nb_states, nb_dim) self._trans = None self._init_priors = None @property def init_priors(self): if self._init_priors is None: beauty_print("HMM init priors not defined, initializing to uniform", type="warning") self._init_priors = np.ones(self.nb_states) / self.nb_states return self._init_priors @init_priors.setter def init_priors(self, value): self._init_priors = value @property def trans(self): if self._trans is None: beauty_print("HMM transition matrix not defined, initializing to uniform", type="warning") self._trans = np.ones((self.nb_states, self.nb_states)) / self.nb_states return self._trans @trans.setter def trans(self, value): self._trans = value @property def Trans(self): return self.trans @Trans.setter def Trans(self, value): self.trans = value
[docs] def make_finish_state(self, demos, dep_mask=None): self.has_finish_state = True self.nb_states += 1 data = np.concatenate([d[-3:] for d in demos]) mu = np.mean(data, axis=0) # Update covariances if data.shape[0] > 1: sigma = np.einsum('ai,aj->ij', data - mu, data - mu) / (data.shape[0] - 1) + self.reg else: sigma = self.reg # if cov_type == 'diag': # self.sigma *= np.eye(self.nb_dim) if dep_mask is not None: sigma *= dep_mask self.mu = np.concatenate([self.mu, mu[None]], axis=0) self.sigma = np.concatenate([self.sigma, sigma[None]], axis=0) self.init_priors = np.concatenate([self.init_priors, np.zeros(1)], axis=0) self.priors = np.concatenate([self.priors, np.zeros(1)], axis=0) pass
[docs] def viterbi(self, demo, reg=True): """ Compute most likely sequence of state given observations :param demo: [np.array([nb_timestep, nb_dim])] :return: """ nb_data, dim = demo.shape if isinstance(demo, np.ndarray) else demo['x'].shape logB = np.zeros((self.nb_states, nb_data)) logDELTA = np.zeros((self.nb_states, nb_data)) PSI = np.zeros((self.nb_states, nb_data)).astype(int) _, logB = self.obs_likelihood(demo) # forward pass logDELTA[:, 0] = np.log(self.init_priors + realmin * reg) + logB[:, 0] for t in range(1, nb_data): for i in range(self.nb_states): # get index of maximum value : most probables PSI[i, t] = np.argmax(logDELTA[:, t - 1] + np.log(self.Trans[:, i] + realmin * reg)) logDELTA[i, t] = np.max(logDELTA[:, t - 1] + np.log(self.Trans[:, i] + realmin * reg)) + logB[i, t] assert not np.any(np.isnan(logDELTA)), "Nan values" # backtracking q = [0 for i in range(nb_data)] q[-1] = np.argmax(logDELTA[:, -1]) for t in range(nb_data - 2, -1, -1): q[t] = PSI[q[t + 1], t + 1] return q
[docs] def split_kbins(self, demos): t_sep = [] t_resp = [] for demo in demos: t_sep += [map(int, np.round( np.linspace(0, demo.shape[0], self.nb_states + 1)))] resp = np.zeros((demo.shape[0], self.nb_states)) # print t_sep for i in range(self.nb_states): resp[t_sep[-1][i]:t_sep[-1][i + 1], i] = 1.0 # print resp t_resp += [resp] return np.concatenate(t_resp)
[docs] def obs_likelihood(self, demo=None, dep=None, marginal=None, sample_size=200, demo_idx=None): sample_size = demo.shape[0] # 50 # emission probabilities B = np.ones((self.nb_states, sample_size)) # (4, 50) if marginal != []: for i in range(self.nb_states): mu, sigma = (self.mu, self.sigma) # mu: (4, 8), sigma: (4, 8, 8) if marginal is not None: mu, sigma = self.get_marginal(marginal) if dep is None: B[i, :] = multi_variate_normal(demo, mu[i], sigma[i], log=True) else: # block diagonal computation B[i, :] = 0. for d in dep: if isinstance(d, list): dGrid = np.ix_([i], d, d) B[[i], :] += multi_variate_normal(demo[:, d], mu[i, d], sigma[dGrid][0], log=True) elif isinstance(d, slice): B[[i], :] += multi_variate_normal(demo[:, d], mu[i, d], sigma[i, d, d], log=True) return np.exp(B), B
[docs] def online_forward_message(self, x, marginal=None, reset=False): """ :param x: :param marginal: slice :param reset: :return: """ if (not hasattr(self, '_marginal_tmp') or reset) and marginal is not None: self._marginal_tmp = self.marginal_model(marginal) if marginal is not None: B, _ = self._marginal_tmp.obs_likelihood(x[None]) else: B, _ = self.obs_likelihood(x[None]) if not hasattr(self, '_alpha_tmp') or reset: self._alpha_tmp = self.init_priors * B[:, 0] else: self._alpha_tmp = self._alpha_tmp.dot(self.Trans) * B[:, 0] self._alpha_tmp /= np.sum(self._alpha_tmp, keepdims=True) return self._alpha_tmp
[docs] def compute_messages(self, demo=None, dep=None, table=None, marginal=None, sample_size=200, demo_idx=None): """ :param demo: [np.array([nb_timestep, nb_dim])] :param dep: [A x [B x [int]]] A list of list of dimensions Each list of dimensions indicates a dependence of variables in the covariance matrix E.g. [[0],[1],[2]] indicates a diagonal covariance matrix E.g. [[0, 1], [2]] indicates a full covariance matrix between [0, 1] and no covariance with dim [2] :param table: np.array([nb_states, nb_demos]) - composed of 0 and 1 A mask that avoid some demos to be assigned to some states :param marginal: [slice(dim_start, dim_end)] or [] If not None, compute messages with marginals probabilities If [] compute messages without observations, use size (can be used for time-series regression) :return: """ if isinstance(demo, np.ndarray): sample_size = demo.shape[0] elif isinstance(demo, dict): sample_size = demo['x'].shape[0] B, _ = self.obs_likelihood(demo, dep, marginal, sample_size) # if table is not None: # B *= table[:, [n]] self._B = B # forward variable alpha (rescaled) alpha = np.zeros((self.nb_states, sample_size)) alpha[:, 0] = self.init_priors * B[:, 0] c = np.zeros(sample_size) c[0] = 1.0 / np.sum(alpha[:, 0] + realmin) alpha[:, 0] = alpha[:, 0] * c[0] for t in range(1, sample_size): alpha[:, t] = alpha[:, t - 1].dot(self.Trans) * B[:, t] # Scaling to avoid underflow issues c[t] = 1.0 / np.sum(alpha[:, t] + realmin) alpha[:, t] = alpha[:, t] * c[t] # backward variable beta (rescaled) beta = np.zeros((self.nb_states, sample_size)) beta[:, -1] = np.ones(self.nb_states) * c[-1] # Rescaling for t in range(sample_size - 2, -1, -1): beta[:, t] = np.dot(self.Trans, beta[:, t + 1] * B[:, t + 1]) beta[:, t] = np.minimum(beta[:, t] * c[t], realmax) # Smooth node marginals, gamma gamma = (alpha * beta) / np.tile(np.sum(alpha * beta, axis=0) + realmin, (self.nb_states, 1)) # Smooth edge marginals. zeta (fast version, considers the scaling factor) zeta = np.zeros((self.nb_states, self.nb_states, sample_size - 1)) for i in range(self.nb_states): for j in range(self.nb_states): zeta[i, j, :] = self.Trans[i, j] * alpha[i, 0:-1] * B[j, 1:] * beta[ j, 1:] return alpha, beta, gamma, zeta, c
[docs] def init_params_random(self, data, left_to_right=False, self_trans=0.9): """ :param data: :param left_to_right: if True, init with left to right. All observations pre_trained_models will be the same, and transition matrix will be set to l_t_r :type left_to_right: bool :param self_trans: if left_to_right, self transition value to fill :type self_trans: float :return: """ mu = np.mean(data, axis=0) sigma = np.cov(data.T) if sigma.ndim == 0: sigma = np.ones((1, 1)) * sigma if left_to_right: self.mu = np.array([mu for i in range(self.nb_states)]) else: self.mu = np.array([np.random.multivariate_normal(mu * 1, sigma) for i in range(self.nb_states)]) self.sigma = np.array([sigma + self.reg for i in range(self.nb_states)]) self.priors = np.ones(self.nb_states) / self.nb_states if left_to_right: self.Trans = np.zeros((self.nb_states, self.nb_states)) for i in range(self.nb_states): if i < self.nb_states - 1: self.Trans[i, i] = self_trans self.Trans[i, i + 1] = 1. - self_trans else: self.Trans[i, i] = 1. self.init_priors = np.zeros(self.nb_states) / self.nb_states else: self.Trans = np.ones((self.nb_states, self.nb_states)) * (1. - self_trans) / (self.nb_states - 1) # remove diagonal self.Trans *= (1. - np.eye(self.nb_states)) self.Trans += self_trans * np.eye(self.nb_states) self.init_priors = np.ones(self.nb_states) / self.nb_states
[docs] def gmm_init(self, data, **kwargs): if isinstance(data, list): data = np.concatenate(data, axis=0) GMM.em(self, data, **kwargs) self.init_priors = np.ones(self.nb_states) / self.nb_states self.Trans = np.ones((self.nb_states, self.nb_states)) / self.nb_states
[docs] def init_loop(self, demos): self.Trans = 0.98 * np.eye(self.nb_states) for i in range(self.nb_states - 1): self.Trans[i, i + 1] = 0.02 self.Trans[-1, 0] = 0.02 data = np.concatenate(demos, axis=0) _mu = np.mean(data, axis=0) _cov = np.cov(data.T) self.mu = np.array([_mu for i in range(self.nb_states)]) self.sigma = np.array([_cov for i in range(self.nb_states)]) self.init_priors = np.array([1.] + [0. for i in range(self.nb_states - 1)])
[docs] def em(self, demos, dep=None, reg=1e-8, table=None, end_cov=False, cov_type='full', dep_mask=None, reg_finish=None, left_to_right=False, nb_max_steps=40, loop=False, obs_fixed=False, trans_reg=None): """ :param demos: [list of np.array([nb_timestep, nb_dim])] or [lisf of dict({})] :param dep: [A x [B x [int]]] A list of list of dimensions or slices Each list of dimensions indicates a dependence of variables in the covariance matrix !!! dimensions should not overlap eg : [[0], [0, 1]] should be [[0, 1]], [[0, 1], [1, 2]] should be [[0, 1, 2]] E.g. [[0],[1],[2]] indicates a diagonal covariance matrix E.g. [[0, 1], [2]] indicates a full covariance matrix between [0, 1] and no covariance with dim [2] E.g. [slice(0, 2), [2]] indicates a full covariance matrix between [0, 1] and no covariance with dim [2] :param reg: [float] or list [nb_dim x float] for different regularization in different dimensions Regularization term used in M-step for covariance matrices :param table: np.array([nb_states, nb_demos]) - composed of 0 and 1 A mask that avoid some demos to be assigned to some states :param end_cov: [bool] If True, compute covariance matrix without regularization after convergence :param cov_type: [string] in ['full', 'diag', 'spherical'] :return: """ if reg_finish is not None: end_cov = True nb_min_steps = 2 # min num iterations max_diff_ll = 1e-4 # max log-likelihood increase nb_samples = len(demos) data = np.concatenate(demos).T nb_data = data.shape[0] s = [{} for d in demos] # stored log-likelihood LL = np.zeros(nb_max_steps) if dep is not None: dep_mask = self.get_dep_mask(dep) self.reg = reg if self.mu is None or self.sigma is None: self.init_params_random(data.T, left_to_right=left_to_right) # create regularization matrix if left_to_right or loop: mask = np.eye(self.Trans.shape[0]) for i in range(self.Trans.shape[0] - 1): mask[i, i + 1] = 1. if loop: mask[-1, 0] = 1. if dep_mask is not None: self.sigma *= dep_mask for it in range(nb_max_steps): for n, demo in enumerate(demos): s[n]['alpha'], s[n]['beta'], s[n]['gamma'], s[n]['zeta'], s[n]['c'] = HMM.compute_messages(self, demo, dep, table) # concatenate intermediary vars gamma = np.hstack([s[i]['gamma'] for i in range(nb_samples)]) zeta = np.dstack([s[i]['zeta'] for i in range(nb_samples)]) gamma_init = np.hstack([s[i]['gamma'][:, 0:1] for i in range(nb_samples)]) gamma_trk = np.hstack([s[i]['gamma'][:, 0:-1] for i in range(nb_samples)]) gamma2 = gamma / (np.sum(gamma, axis=1, keepdims=True) + realmin) # M-step if not obs_fixed: for i in range(self.nb_states): # Update centers self.mu[i] = np.einsum('a,ia->i', gamma2[i], data) # Update covariances Data_tmp = data - self.mu[i][:, None] self.sigma[i] = np.einsum('ij,jk->ik', np.einsum('ij,j->ij', Data_tmp, gamma2[i, :]), Data_tmp.T) # Regularization self.sigma[i] = self.sigma[i] + self.reg if cov_type == 'diag': self.sigma[i] *= np.eye(self.sigma.shape[1]) if dep_mask is not None: self.sigma *= dep_mask # Update initial state probablility vector self.init_priors = np.mean(gamma_init, axis=1) # Update transition probabilities self.Trans = np.sum(zeta, axis=2) / (np.sum(gamma_trk, axis=1) + realmin) if trans_reg is not None: self.Trans += trans_reg self.Trans /= np.sum(self.Trans, axis=1, keepdims=True) if left_to_right or loop: self.Trans *= mask self.Trans /= np.sum(self.Trans, axis=1, keepdims=True) # print self.Trans # Compute avarage log-likelihood using alpha scaling factors LL[it] = 0 for n in range(nb_samples): LL[it] -= sum(np.log(s[n]['c'])) LL[it] = LL[it] / nb_samples self._gammas = [s_['gamma'] for s_ in s] # Check for convergence if it > nb_min_steps and LL[it] - LL[it - 1] < max_diff_ll: print("EM converges") if end_cov: for i in range(self.nb_states): # recompute covariances without regularization Data_tmp = data - self.mu[i][:, None] self.sigma[i] = np.einsum('ij,jk->ik', np.einsum('ij,j->ij', Data_tmp, gamma2[i, :]), Data_tmp.T) if reg_finish is not None: self.reg = reg_finish self.sigma += self.reg[None] if cov_type == 'diag': self.sigma[i] *= np.eye(self.sigma.shape[1]) # print "EM converged after " + str(it) + " iterations" # print LL[it] if dep_mask is not None: self.sigma *= dep_mask return True print("EM did not converge") return False
[docs] def score(self, demos): """ :param demos: [list of np.array([nb_timestep, nb_dim])] :return: """ ll = [] for n, demo in enumerate(demos): _, _, _, _, c = HMM.compute_messages(self, demo) ll += [np.sum(np.log(c))] return ll
[docs] def condition(self, data_in, dim_in, dim_out, h=None, return_gmm=False): if return_gmm: return super().condition(data_in, dim_in, dim_out, return_gmm=return_gmm) else: if dim_in == slice(0, 1): dim_in_msg = [] else: dim_in_msg = dim_in a, _, _, _, _ = self.compute_messages(data_in, marginal=dim_in_msg) return super().condition(data_in, dim_in, dim_out, h=a)
""" To ensure compatibility """ @property def Trans(self): return self.trans @Trans.setter def Trans(self, value): self.trans = value