# Copyright 2023, Junjia LIU, jjliu@mae.cuhk.edu.hk
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from pbdlib.model import *
from pbdlib.mvn import MVN
from scipy.linalg import block_diag
from scipy.special import logsumexp
from termcolor import colored
[docs]class GMM(Model):
def __init__(self, nb_states=1, nb_dim=None, init_zeros=False, mu=None, lmbda=None, sigma=None, priors=None,
log_priors=None):
if mu is not None:
nb_states = mu.shape[0]
nb_dim = mu.shape[-1]
super().__init__(nb_states, nb_dim)
# flag to indicate that publishing was not init
self.publish_init = False
self._mu = mu
self._lmbda = lmbda
self._sigma = sigma
self._priors = priors
self._log_priors = log_priors
if init_zeros:
self.init_zeros()
[docs] def get_matching_mvn(self, max=False, mass=None):
if max:
priors = (self.priors == np.max(self.priors)).astype(np.float32)
priors /= np.sum(priors)
elif mass is not None:
prior_lim = np.sort(self.priors)[::-1][np.max(
[0, np.argmin(np.cumsum(np.sort(self.priors)[::-1]) < mass)])]
priors = (self.priors >= prior_lim) * self.priors
priors /= np.sum(priors)
else:
priors = self.priors
# print priors, self.priors
mus, sigmas = self.moment_matching(priors)
mvn = MVN(nb_dim=self.nb_dim, mu=mus, sigma=sigmas)
return mvn
[docs] def moment_matching(self, h):
"""
Perform moment matching to approximate a mixture of Gaussian as a Gaussian
:param h: np.array([nb_timesteps, nb_states])
Activations of each states for different timesteps
:return:
"""
if h.ndim == 1:
h = h[None]
if self.mu.ndim == 2:
mus = np.einsum('ak,ki->ai', h, self.mu)
dmus = self.mu[None] - mus[:, None] # nb_timesteps, nb_states, nb_dim
sigmas = np.einsum('ak,kij->aij', h, self.sigma) + \
np.einsum('ak,akij->aij', h, np.einsum('aki,akj->akij', dmus, dmus))
else:
mus = np.einsum('ak,aki->ai', h, self.mu)
dmus = self.mu - mus[:, None] # nb_timesteps, nb_states, nb_dim
sigmas = np.einsum('ak,akij->aij', h, self.sigma) + \
np.einsum('ak,akij->aij', h, np.einsum('aki,akj->akij', dmus, dmus))
return mus, sigmas
def __add__(self, other):
if isinstance(other, MVN):
gmm = GMM(nb_dim=self.nb_dim, nb_states=self.nb_states)
gmm.priors = self.priors
gmm.mu = self.mu + other.mu[None]
gmm.sigma = self.sigma + other.sigma[None]
return gmm
else:
raise NotImplementedError
def __mul__(self, other):
"""
Renormalized product of Gaussians, component by component
:param other:
:return:
"""
if isinstance(other, MVN):
gmm = GMM(nb_dim=self.nb_dim, nb_states=self.nb_states)
gmm.mu = np.einsum('aij,aj->ai', self.lmbda, self.mu) + \
np.einsum('ij,j->i', other.lmbda, other.mu)[None]
gmm.lmbda = self.lmbda + other.lmbda[None]
gmm.mu = np.einsum('aij,aj->ai', gmm.sigma, gmm.mu)
Z = np.linalg.slogdet(self.lmbda)[1] \
+ np.linalg.slogdet(other.lmbda)[1] \
- 0.5 * np.linalg.slogdet(gmm.lmbda)[1] \
- self.nb_dim / 2. * np.log(2 * np.pi) \
+ 0.5 * (np.einsum('ai,aj->a',
np.einsum('ai,aij->aj', gmm.mu, gmm.lmbda), gmm.mu)
- np.einsum('ai,aj->a',
np.einsum('ai,aij->aj', self.mu, self.lmbda), self.mu)
- np.sum(np.einsum('i,ij->j', other.mu, other.lmbda) * other.mu)
)
gmm.priors = np.exp(Z) * self.priors
gmm.priors /= np.sum(gmm.priors)
else:
# component wise
gmm = GMM(nb_dim=self.nb_dim, nb_states=self.nb_states)
gmm.priors = self.priors
gmm.mu = np.einsum('aij,aj->ai', self.lmbda, self.mu) + \
np.einsum('aij,aj->ai', other.lmbda, other.mu)
gmm.lmbda = self.lmbda + other.lmbda
gmm.mu = np.einsum('aij,aj->ai', gmm.sigma, gmm.mu)
return gmm
def __mod__(self, other):
"""
Renormalized product of Gaussians, component by component
:param other:
:return:
"""
gmm = GMM(nb_dim=self.nb_dim, nb_states=self.nb_states * other.nb_states)
# gmm.priors = self.priors
gmm.mu = np.einsum('aij,aj->ai', self.lmbda, self.mu)[:, None] + \
np.einsum('aij,aj->ai', other.lmbda, other.mu)[None]
gmm.lmbda = self.lmbda[:, None] + other.lmbda[None]
gmm.sigma = np.linalg.inv(gmm.lmbda)
gmm.mu = np.einsum('abij,abj->abi', gmm.sigma, gmm.mu)
return gmm
[docs] def marginal_model(self, dims):
"""
Get a GMM of a slice of this GMM
:param dims:
:type dims: slice
:return:
"""
gmm = GMM(nb_dim=dims.stop - dims.start, nb_states=self.nb_states)
gmm.priors = self.priors
gmm.mu = self.mu[:, dims]
gmm.sigma = self.sigma[:, dims, dims]
return gmm
[docs] def lintrans(self, A, b):
"""
Linear transformation of a GMM
:param A: np.array(nb_dim, nb_dim)
:param b: np.array(nb_dim)
:return:
"""
gmm = GMM(nb_dim=self.nb_dim, nb_states=self.nb_states)
gmm.priors = self.priors
gmm.mu = np.einsum('ij,aj->ai', A, self.mu) + b
gmm.lmbda = np.einsum('aij,kj->aik',
np.einsum('ij,ajk->aik', A, self.lmbda), A)
return gmm
[docs] def lintrans_dyna(self, A, b):
"""
Linear transformation of a GMM
:param A: np.array(nb_states, nb_dim, nb_dim)
:param b: np.array(nb_states, nb_dim)
:return:
"""
gmm = GMM(nb_dim=self.nb_dim, nb_states=self.nb_states)
gmm.priors = self.priors
gmm.mu = np.einsum('aij,aj->ai', A, self.mu) + b
gmm.lmbda = np.einsum('aij,akj->aik',
np.einsum('aij,ajk->aik', A, self.lmbda), A)
return gmm
[docs] def concatenate_gaussian(self, q, get_mvn=True, reg=None):
"""
Get a concatenated-block-diagonal replication of the GMM with sequence of state
given by q.
:param q: [list of int]
:param get_mvn: [bool]
:return:
"""
if reg is None:
if not get_mvn:
return np.concatenate([self.mu[i] for i in q]), block_diag(*[self.sigma[i] for i in q])
else:
mvn = MVN()
mvn.mu = np.concatenate([self.mu[i] for i in q])
mvn._sigma = block_diag(*[self.sigma[i] for i in q])
mvn._lmbda = block_diag(*[self.lmbda[i] for i in q])
return mvn
else:
if not get_mvn:
return np.concatenate([self.mu[i] for i in q]), block_diag(
*[self.sigma[i] + reg for i in q])
else:
mvn = MVN()
mvn.mu = np.concatenate([self.mu[i] for i in q])
mvn._sigma = block_diag(*[self.sigma[i] + reg for i in q])
mvn._lmbda = block_diag(*[np.linalg.inv(self.sigma[i] + reg) for i in q])
return mvn
[docs] def compute_resp(self, demo=None, dep=None, table=None, marginal=None, norm=True):
sample_size = demo.shape[0]
B = np.ones((self.nb_states, sample_size))
if marginal != []:
for i in range(self.nb_states):
mu, sigma = (self.mu, self.sigma)
if marginal is not None:
mu, sigma = self.get_marginal(marginal)
if dep is None:
B[i, :] = multi_variate_normal(demo,
mu[i],
sigma[i], log=False)
else: # block diagonal computation
B[i, :] = 1.0
for d in dep:
dGrid = np.ix_([i], d, d)
B[[i], :] *= multi_variate_normal(demo, mu[i, d],
sigma[dGrid][:, :, 0], log=False)
B *= self.priors[:, None]
if norm:
return B / np.sum(B, axis=0)
else:
return B
[docs] def init_params_scikit(self, data, cov_type='full'):
from sklearn.mixture import GaussianMixture
gmm_init = GaussianMixture(self.nb_states, cov_type, n_init=5, init_params='random')
gmm_init.fit(data)
self.mu = gmm_init.means_
if cov_type == 'diag':
self.sigma = np.array([np.diag(gmm_init.covariances_[i]) for i in range(self.nb_states)])
else:
self.sigma = gmm_init.covariances_
self.priors = gmm_init.weights_
self.Trans = np.ones((self.nb_states, self.nb_states)) * 0.01
self.init_priors = np.ones(self.nb_states) * 1. / self.nb_states
[docs] def init_params_kmeans(self, data):
from sklearn.cluster import KMeans
km_init = KMeans(n_clusters=self.nb_states)
km_init.fit(data)
self.mu = km_init.cluster_centers_
self.priors = np.ones(self.nb_states) / self.nb_states
self.sigma = np.array([np.eye(self.nb_dim) for i in range(self.nb_states)])
self.Trans = np.ones((self.nb_states, self.nb_states)) * 0.01
self.init_priors = np.ones(self.nb_states) * 1. / self.nb_states
[docs] def init_params_random(self, data):
mu = np.mean(data, axis=0)
sigma = np.dot((data - mu).T, (data - mu)) / \
(data.shape[0] - 1)
self.mu = np.array([np.random.multivariate_normal(mu, sigma)
for i in range(self.nb_states)])
self.sigma = np.array([sigma + self.reg for i in range(self.nb_states)])
self.priors = np.ones(self.nb_states) / self.nb_states
[docs] def em(self, data, reg=1e-8, maxiter=100, minstepsize=1e-5, diag=False, reg_finish=False,
kmeans_init=False, random_init=True, dep_mask=None, verbose=False, only_scikit=False,
no_init=False):
"""
:param data: [np.array([nb_timesteps, nb_dim])]
:param reg: [list([nb_dim]) or float]
Regulariazation for EM
:param maxiter:
:param minstepsize:
:param diag: [bool]
Use diagonal covariance matrices
:param reg_finish: [np.array([nb_dim]) or float]
Regulariazation for finish step
:param kmeans_init: [bool]
Init components with k-means.
:param random_init: [bool]
Init components randomely.
:param dep_mask: [np.array([nb_dim, nb_dim])]
Composed of 0 and 1. Mask given the dependencies in the covariance matrices
:return:
"""
if self.nb_dim is None:
self.nb_dim = data.shape[-1]
self.reg = reg
nb_min_steps = 5 # min num iterations
nb_max_steps = maxiter # max iterations
max_diff_ll = minstepsize # max log-likelihood increase
nb_samples = data.shape[0]
if not no_init:
if random_init and not only_scikit:
self.init_params_random(data)
elif kmeans_init and not only_scikit:
self.init_params_kmeans(data)
else:
if diag:
self.init_params_scikit(data, 'diag')
else:
self.init_params_scikit(data, 'full')
if only_scikit: return
data = data.T
LL = np.zeros(nb_max_steps)
for it in range(nb_max_steps):
# E - step
L = np.zeros((self.nb_states, nb_samples))
L_log = np.zeros((self.nb_states, nb_samples))
for i in range(self.nb_states):
L_log[i, :] = np.log(self.priors[i]) + multi_variate_normal(data.T, self.mu[i],
self.sigma[i], log=True)
L = np.exp(L_log)
GAMMA = L / np.sum(L, axis=0)
GAMMA2 = GAMMA / np.sum(GAMMA, axis=1)[:, np.newaxis]
# M-step
self.mu = np.einsum('ac,ic->ai', GAMMA2,
data) # a states, c sample, i dim
dx = data[None, :] - self.mu[:, :, None] # nb_dim, nb_states, nb_samples
self.sigma = np.einsum('acj,aic->aij', np.einsum('aic,ac->aci', dx, GAMMA2),
dx) # a states, c sample, i-j dim
self.sigma += self.reg
if diag:
self.sigma *= np.eye(self.nb_dim)
if dep_mask is not None:
self.sigma *= dep_mask
# print self.Sigma[:,u :, i]
# Update initial state probablility vector
self.priors = np.mean(GAMMA, axis=1)
LL[it] = np.mean(np.log(np.sum(L, axis=0)))
# Check for convergence
if it > nb_min_steps:
if LL[it] - LL[it - 1] < max_diff_ll:
if reg_finish is not False:
self.sigma = np.einsum(
'acj,aic->aij', np.einsum('aic,ac->aci', dx, GAMMA2), dx) + reg_finish
if verbose:
print(colored('Converged after %d iterations: %.3e' % (it, LL[it]), 'red', 'on_white'))
return GAMMA
if verbose:
print(
"GMM did not converge before reaching max iteration. Consider augmenting the number of max iterations.")
return GAMMA
[docs] def init_hmm_kbins(self, demos, dep=None, reg=1e-8, dep_mask=None):
"""
Init HMM by splitting each demos in K bins along time. Each K states of the HMM will
be initialized with one of the bin. It corresponds to a left-to-right HMM.
:param demos: [list of np.array([nb_timestep, nb_dim])]
:param dep:
:param reg: [float]
:return:
"""
# delimit the cluster bins for first demonstration
self.nb_dim = demos[0].shape[1]
self.init_zeros()
t_sep = []
for demo in demos:
t_sep += [list(map(int, np.round(np.linspace(0, demo.shape[0], self.nb_states + 1))))]
# print t_sep
for i in range(self.nb_states):
data_tmp = np.empty((0, self.nb_dim))
inds = []
states_nb_data = 0 # number of datapoints assigned to state i
# Get bins indices for each demonstration
for n, demo in enumerate(demos):
inds = range(t_sep[n][i], t_sep[n][i + 1])
data_tmp = np.concatenate([data_tmp, demo[inds]], axis=0)
states_nb_data += t_sep[n][i + 1] - t_sep[n][i]
self.priors[i] = states_nb_data
self.mu[i] = np.mean(data_tmp, axis=0)
if dep_mask is not None:
self.sigma *= dep_mask
if dep is None:
self.sigma[i] = np.cov(data_tmp.T) + np.eye(self.nb_dim) * reg
else:
for d in dep:
dGrid = np.ix_([i], d, d)
self.sigma[dGrid] = (np.cov(data_tmp[:, d].T) + np.eye(
len(d)) * reg)[:, :, np.newaxis]
# print self.Sigma[:,:,i]
# normalize priors
self.priors = self.priors / np.sum(self.priors)
# Hmm specific init
self.Trans = np.ones((self.nb_states, self.nb_states)) * 0.01
nb_data = np.mean([d.shape[0] for d in demos])
for i in range(self.nb_states - 1):
self.Trans[i, i] = 1.0 - float(self.nb_states) / nb_data
self.Trans[i, i + 1] = float(self.nb_states) / nb_data
self.Trans[-1, -1] = 1.0
self.init_priors = np.ones(self.nb_states) * 1. / self.nb_states
[docs] def add_trash_component(self, data, scale=2.):
if isinstance(data, list):
data = np.concatenate(data, axis=0)
mu_new = np.mean(data, axis=0)
sigma_new = scale ** 2 * np.cov(data.T)
self.priors = np.concatenate([self.priors, 0.01 * np.ones(1)])
self.priors /= np.sum(self.priors)
self.mu = np.concatenate([self.mu, mu_new[None]], axis=0)
self.sigma = np.concatenate([self.sigma, sigma_new[None]], axis=0)
[docs] def mvn_pdf(self, x, reg=None):
"""
:param x: np.array([nb_samples, nb_dim])
samples
:param mu: np.array([nb_states, nb_dim])
mean vector
:param sigma_chol: np.array([nb_states, nb_dim, nb_dim])
cholesky decomposition of covariance matrices
:param lmbda: np.array([nb_states, nb_dim, nb_dim])
precision matrices
:return: np.array([nb_states, nb_samples])
log mvn
"""
mu, lmbda_, sigma_chol_ = self.mu, self.lmbda, self.sigma_chol
if x.ndim > 1 and mu.ndim == 2:
dx = mu[None] - x[:, None] # nb_timesteps, nb_states, nb_dim
eins_idx = ('baj,baj->ba', 'ajk,baj->bak')
elif x.ndim > 1 and mu.ndim == 3:
dx = mu - x[:, None] # nb_timesteps, nb_states, nb_dim
eins_idx = ('baj,baj->ba', 'bajk,baj->bak')
else:
dx = mu - x
eins_idx = ('aj,aj->a', 'ajk,aj->ak')
if lmbda_.ndim == 4:
cov_part = np.sum(np.log(sigma_chol_.diagonal(axis1=2, axis2=3)), axis=-1)
else:
cov_part = np.sum(np.log(sigma_chol_.diagonal(axis1=1, axis2=2)), axis=1)
return -0.5 * np.einsum(eins_idx[0], dx, np.einsum(eins_idx[1], lmbda_, dx)) \
- (mu.shape[-1] / 2.) * np.log(2 * np.pi) - cov_part
[docs] def log_prob(self, x):
return logsumexp(self.log_priors + self.mvn_pdf(x), -1)