import numpy as np
from pbdlib.utils.utils import lifted_transfer_matrix
import pbdlib as pbd
from pbdlib import MVN
from tqdm import tqdm
[docs]class LQR(object):
def __init__(self, A=None, B=None, nb_dim=2, dt=0.01, horizon=50):
self._horizon = horizon
self.A = A
self.B = B
self.dt = dt
self.nb_dim = nb_dim
self._s_xi, self._s_u = None, None
self._x0 = None
self._gmm_xi, self._gmm_u = None, None
self._mvn_sol_xi, self._mvn_sol_u = None, None
self._seq_xi, self._seq_u = None, None
self._S, self._v, self._K, self._Kv, self._ds, self._cs, self._Qc = \
None, None, None, None, None, None, None
self._Q, self._z = None, None
@property
def seq_xi(self):
return self._seq_xi
@seq_xi.setter
def seq_xi(self, value):
self._seq_xi = value
@property
def K(self):
assert self._K is not None, "Solve Ricatti before"
return self._K
@property
def Q(self):
return self._Q
@Q.setter
def Q(self, value):
"""
value :
(ndim_xi, ndim_xi) or
((N, ndim_xi, ndim_xi), (nb_timestep, )) or
(nb_timestep, ndim_xi, ndim_xi)
"""
self._Q = value
@property
def z(self):
return self._z
@z.setter
def z(self, value):
"""
value :
(ndim_xi, ) or
((N, ndim_xi, ), (nb_timestep, )) or
(nb_timestep, ndim_xi)
"""
self._z = value
@property
def Qc(self):
assert self._Qc is not None, "Solve Ricatti before"
return self._Qc
@property
def cs(self):
"""
Return c list where control command u is
u = -K x + c
:return:
"""
if self._cs is None:
self._cs = self.get_feedforward()
return self._cs
@property
def ds(self):
"""
Return c list where control command u is
u = K(d - x)
:return:
"""
if self._ds is None:
self._ds = self.get_target()
return self._ds
@property
def horizon(self):
return self._horizon
@horizon.setter
def horizon(self, value):
# self.reset_params()
self._horizon = value
@property
def u_dim(self):
"""
Number of dimension of input
:return:
"""
if self.B is not None:
return self.B.shape[1]
else:
return self.nb_dim
@property
def xi_dim(self):
"""
Number of dimension of state
:return:
"""
if self.A is not None:
return self.A.shape[0]
else:
return self.nb_dim * 2
@property
def gmm_xi(self):
"""
Distribution of state
:return:
"""
return self._gmm_xi
@gmm_xi.setter
def gmm_xi(self, value):
"""
:param value [pbd.GMM] or [(pbd.GMM, list)]
"""
# resetting solution
self._mvn_sol_xi = None
self._mvn_sol_u = None
self._seq_u = None
self._seq_xi = None
self._gmm_xi = value
@property
def gmm_u(self):
"""
Distribution of control input
:return:
"""
return self._gmm_u
@gmm_u.setter
def gmm_u(self, value):
"""
:param value [float] or [pbd.MVN] or [pbd.GMM] or [(pbd.GMM, list)]
"""
# resetting solution
self._mvn_sol_xi = None
self._mvn_sol_u = None
self._seq_u = None
# self._seq_xi = None
if isinstance(value, float):
self._gmm_u = pbd.MVN(
mu=np.zeros(self.u_dim), lmbda=10 ** value * np.eye(self.u_dim))
else:
self._gmm_u = value
@property
def x0(self):
return self._x0
@x0.setter
def x0(self, value):
# resetting solution
self._mvn_sol_xi = None
self._mvn_sol_u = None
self._x0 = value
[docs] def get_Q_z(self, t):
"""
get Q and target z for time t
:param t:
:return:
"""
if self._gmm_xi is None:
z, Q = None, None
if self._z is None:
z = np.zeros(self.A.shape[-1])
elif isinstance(self._z, tuple):
z = self._z[0][self._z[1][t]]
elif isinstance(self._z, np.ndarray):
if self._z.ndim == 1:
z = self._z
elif self._z.ndim == 2:
if self._seq_xi is None:
z = self._z[t]
else:
z = self._z[self._seq_xi[t]]
if isinstance(self._Q, tuple):
Q = self._Q[0][self._Q[1][t]]
elif isinstance(self._Q, np.ndarray):
if self._Q.ndim == 2:
Q = self._Q
elif self._Q.ndim == 3:
if self._seq_xi is None:
Q = self._Q[t]
else:
Q = self._Q[self._seq_xi[t]]
return Q, z
else:
if isinstance(self._gmm_xi, tuple):
gmm, seq = self._gmm_xi
return gmm.lmbda[seq[t]], gmm.mu[seq[t]]
elif isinstance(self._gmm_xi, pbd.GMM):
return self._gmm_xi.lmbda[t], self._gmm_xi.mu[t]
elif isinstance(self._gmm_xi, pbd.MVN):
return self._gmm_xi.lmbda, self._gmm_xi.mu
else:
raise ValueError("Not supported gmm_xi")
[docs] def get_R(self, t):
if isinstance(self._gmm_u, pbd.MVN):
return self._gmm_u.lmbda
elif isinstance(self._gmm_u, tuple):
gmm, seq = self._gmm_u
return gmm.lmbda[seq[t]]
elif isinstance(self._gmm_u, pbd.GMM):
return self._gmm_u.lmbda[t]
else:
raise ValueError("Not supported gmm_u")
[docs] def get_A(self, t):
if self.A.ndim == 2:
return self.A
else:
return self.A[t]
[docs] def get_B(self, t):
if self.B.ndim == 2:
return self.B
else:
return self.B[t]
[docs] def ricatti(self):
"""
http://web.mst.edu/~bohner/papers/tlqtots.pdf
:return:
"""
Q, z = self.get_Q_z(-1)
#
_S = [None for i in range(self._horizon)]
_v = [None for i in range(self._horizon)]
_K = [None for i in range(self._horizon - 1)]
_Kv = [None for i in range(self._horizon - 1)]
_Qc = [None for i in range(self._horizon - 1)]
_S[-1] = Q
_v[-1] = Q.dot(z)
for t in tqdm(range(self.horizon - 2, -1, -1)):
Q, z = self.get_Q_z(t)
R = self.get_R(t)
A = self.get_A(t)
B = self.get_B(t)
_Qc[t] = np.linalg.inv(R + B.T.dot(_S[t + 1]).dot(B))
_Kv[t] = _Qc[t].dot(B.T)
_K[t] = _Kv[t].dot(_S[t + 1]).dot(A)
AmBK = A - B.dot(_K[t])
_S[t] = A.T.dot(_S[t + 1]).dot(AmBK) + Q
_v[t] = AmBK.T.dot(_v[t + 1]) + Q.dot(z)
self._S = _S
self._v = _v
self._K = _K
self._Kv = _Kv
self._Qc = _Qc
self._ds = None
self._cs = None
[docs] def get_target(self):
ds = []
for t in range(0, self.horizon - 1):
ds += [np.linalg.inv(self._S[t].dot(self.A)).dot(self._v[t])]
return np.array(ds)
[docs] def get_feedforward(self):
cs = []
for t in range(0, self.horizon - 1):
cs += [self._Kv[t].dot(self._v[t + 1])]
return np.array(cs)
[docs] def get_command(self, xi, i):
if xi.ndim == 1:
return -self._K[i].dot(xi) + self._Kv[i].dot(self._v[i])
else:
return np.einsum('ij,aj->ai', -self.K[i], xi) + self._Kv[i].dot(self._v[i + 1])
[docs] def policy(self, xi, t):
"""
Time-dependent and linear in state policy as MVN distribution.
:param xi: Current state
:return:
"""
loc = self.get_command(xi, t)
Qc = self.Qc[t]
try:
np.linalg.cholesky(Qc)
invertible = True
except:
invertible = False
phi = 1e-8
while not invertible:
Qc = Qc + np.eye(self.u_dim) * phi
try:
np.linalg.cholesky(Qc)
invertible = True
except:
invertible = False
phi *= 1E1
cov = np.tile(Qc[None], (loc.shape[0], 1, 1))
return MVN(mu=loc, sigma=cov)
[docs] def get_sample(self, xi, i, sample_size=1):
"""
:param xi:
:param i:
:param sample_size:
:return:
"""
return self.policy(xi, i).sample(sample_size)
[docs] def trajectory_distribution(self, xi, u, t):
pass
[docs] def get_seq(self, xi0, return_target=False):
xis = [xi0]
us = [-self._K[0].dot(xi0) + self._Kv[0].dot(self._v[0])]
ds = []
for t in range(1, self.horizon - 1):
A = self.get_A(t)
B = self.get_B(t)
xis += [A.dot(xis[-1]) + B.dot(us[-1])]
if return_target:
d = np.linalg.inv(self._S[t].dot(A)).dot(self._v[t + 1])
ds += [d]
us += [self._K[t].dot(d - xis[-1])]
else:
us += [-self._K[t].dot(xis[-1]) + self._Kv[t].dot(self._v[t + 1])]
if return_target:
return np.array(xis), np.array(us), np.array(ds)
else:
return np.array(xis), np.array(us)
[docs] def make_rollout_samples(self, x0):
T = self.horizon
xs = [None for i in range(T)]
us = [None for i in range(T - 1)]
xs[0] = x0
next_xs = x0
n = x0.shape[0]
for i in range(T - 1):
B = self.get_B(i)
A = self.get_A(i)
loc = self.get_command(next_xs, i)
cov = self.Qc[i]
eps = np.random.normal(size=(n, self.u_dim))
next_us = loc + np.einsum('ij,aj->ai ', np.linalg.cholesky(cov), eps)
next_xs = np.einsum('ij,aj->ai', A, next_xs) + np.einsum('ij,aj->ai', B, next_us)
xs[i + 1] = next_xs
us[i] = next_us
return np.transpose(np.stack(xs), (1, 0, 2)), np.transpose(np.stack(us), (1, 0, 2))
[docs] def make_rollout(self, x0):
T = self.horizon
xs = [None for i in range(T)]
us = [None for i in range(T - 1)]
xs[0] = x0
for i in range(T - 1):
B = self.get_B(i)
A = self.get_A(i)
next_us = self.get_command(xs[i], i)
next_xs = np.einsum('ij,aj->ai', A, xs[i]) + np.einsum('ij,aj->ai', B, next_us)
xs[i + 1] = next_xs
us[i] = next_us
return np.transpose(np.stack(xs), (1, 0, 2)), np.transpose(np.stack(us), (1, 0, 2))
[docs] def rollout_policy(self, dist_policy, x0):
"""
Rollout of the stochastic policy.
:param dist_policy: A policy distribution which takes x and t as input.
:param x0: initial state
:return:
"""
T = self.horizon
xs = [None for i in range(T)]
us = [None for i in range(T - 1)]
xs[0] = x0
for i in range(T - 1):
B = self.get_B(i)
A = self.get_A(i)
next_us = dist_policy(xs[i], i).sample()
next_xs = np.einsum('ij,aj->ai', A, xs[i]) + np.einsum('ij,aj->ai', B, next_us)
xs[i + 1] = next_xs
us[i] = next_us
return np.transpose(np.stack(xs), (1, 0, 2)), np.transpose(np.stack(us), (1, 0, 2))
[docs]class GMMLQR(LQR):
"""
LQR with a GMM cost on the state, approximation to be checked
"""
def __init__(self, *args, **kwargs):
self._full_gmm_xi = None
LQR.__init__(self, *args, **kwargs)
@property
def full_gmm_xi(self):
"""
Distribution of state
:return:
"""
return self._full_gmm_xi
@full_gmm_xi.setter
def full_gmm_xi(self, value):
"""
:param value [pbd.GMM] or [(pbd.GMM, list)]
"""
self._full_gmm_xi = value
[docs] def ricatti(self, x0, n_best=None):
costs = []
if isinstance(self._full_gmm_xi, pbd.MTMM):
full_gmm = self.full_gmm_xi.get_matching_gmm()
else:
full_gmm = self.full_gmm_xi
if n_best is not None:
log_prob_components = self.full_gmm_xi.log_prob_components(x0)
a = np.sort(log_prob_components, axis=0)[-n_best - 1][0]
for i in range(self.full_gmm_xi.nb_states):
if n_best is not None and log_prob_components[i] < a:
costs += [-np.inf]
else:
self.gmm_xi = full_gmm, [i for j in range(self.horizon)]
LQR.ricatti(self)
xis, us = self.get_seq(x0)
costs += [np.sum(self.gmm_u.log_prob(us) + self.full_gmm_xi.log_prob(xis))]
max_lqr = np.argmax(costs)
self.gmm_xi = full_gmm, [max_lqr for j in range(self.horizon)]
LQR.ricatti(self)
[docs]class PoGLQR(LQR):
"""
Implementation of LQR with Product of Gaussian as described in
http://calinon.ch/papers/Calinon - HFR2016.pdf
"""
def __init__(self, A=None, B=None, nb_dim=2, dt=0.01, horizon=50):
self._horizon = horizon
self.A = A
self.B = B
self.nb_dim = nb_dim
self.dt = dt
self._s_xi, self._s_u = None, None
self._x0 = None
self._mvn_xi, self._mvn_u = None, None
self._mvn_sol_xi, self._mvn_sol_u = None, None
self._seq_xi, self._seq_u = None, None
@property
def A(self):
return self._A
@A.setter
def A(self, value):
self.reset_params() # reset params
self._A = value
@property
def B(self):
return self._B
@B.setter
def B(self, value):
self.reset_params() # reset params
self._B = value
@property
def mvn_u_dim(self):
"""
Number of dimension of input sequence lifted form
:return:
"""
if self.B is not None:
return self.B.shape[1] * self.horizon
else:
return self.nb_dim * self.horizon
@property
def mvn_xi_dim(self):
"""
Number of dimension of state sequence lifted form
:return:
"""
if self.A is not None:
return self.A.shape[0] * self.horizon
else:
return self.nb_dim * self.horizon * 2
@property
def mvn_sol_u(self):
"""
Distribution of control input after solving LQR
:return:
"""
assert self.x0 is not None, "Please specify a starting state"
assert self.mvn_xi is not None, "Please specify a target distribution"
assert self.mvn_u is not None, "Please specify a control input distribution"
if self._mvn_sol_u is None:
self._mvn_sol_u = self.mvn_xi.inv_trans_s(
self.s_u, self.s_xi.dot(self.x0)) % self.mvn_u
return self._mvn_sol_u
@property
def seq_xi(self):
if self._seq_xi is None:
self._seq_xi = self.mvn_sol_xi.mu.reshape(self.horizon, self.xi_dim)
return self._seq_xi
@property
def seq_u(self):
if self._seq_u is None:
self._seq_u = self.mvn_sol_u.mu.reshape(self.horizon, self.u_dim)
return self._seq_u
@property
def mvn_sol_xi(self):
"""
Distribution of state after solving LQR
:return:
"""
if self._mvn_sol_xi is None:
# MU: self._mvn_sol_xi.mu -> self.s_u * mvn_sol_u.mu + self.s_xi.dot(self.x0)
# it means: zeta = S_u * u + S_zeta * zeta_0
# SIGMA: self._mvn_sol_xi.sigma -> self.s_u * mvn_sol_u.sigma * self.s_u^T
# it means: Sigma_zeta = S_u * Sigma_u * S_u^T
# Calinon, Stochastic learning and control in multiple coordinate systems
self._mvn_sol_xi = self.mvn_sol_u.transform(
self.s_u, self.s_xi.dot(self.x0))
return self._mvn_sol_xi
@property
def mvn_xi(self):
"""
Distribution of state
:return:
"""
return self._mvn_xi
@mvn_xi.setter
def mvn_xi(self, value):
"""
:param value [float] or [pbd.MVN]
"""
# resetting solution
self._mvn_sol_xi = None
self._mvn_sol_u = None
self._seq_u = None
self._seq_xi = None
self._mvn_xi = value
@property
def mvn_u(self):
"""
Distribution of control input
:return:
"""
return self._mvn_u
@mvn_u.setter
def mvn_u(self, value):
"""
:param value [float] or [pbd.MVN]
"""
# resetting solution
self._mvn_sol_xi = None
self._mvn_sol_u = None
self._seq_u = None
self._seq_xi = None
if isinstance(value, pbd.MVN):
self._mvn_u = value
else:
self._mvn_u = pbd.MVN(
mu=np.zeros(self.mvn_u_dim), lmbda=10 ** value * np.eye(self.mvn_u_dim))
@property
def xis(self):
return self.mvn_sol_xi.mu.reshape(self.horizon, self.xi_dim / self.horizon)
@property
def k(self):
# return self.mvn_sol_u.sigma.dot(self.s_u.T.dot(self.mvn_xi.lmbda)).dot(self.s_xi).reshape(
return self.mvn_sol_u.sigma.dot(self.s_u.T.dot(self.mvn_xi.lmbda)).dot(self.s_xi).reshape(
(self.horizon, self.mvn_u_dim / self.horizon, self.mvn_xi_dim / self.horizon))
@property
def s_u(self):
if self._s_u is None:
# Calinon, Stochastic learning and control in multiple coordinate systems, Appendix II
self._s_xi, self._s_u = lifted_transfer_matrix(self.A, self.B,
horizon=self.horizon, dt=self.dt, nb_dim=self.nb_dim)
return self._s_u
@property
def s_xi(self):
if self._s_xi is None:
# Calinon, Stochastic learning and control in multiple coordinate systems, Appendix II
self._s_xi, self._s_u = lifted_transfer_matrix(self.A, self.B,
horizon=self.horizon, dt=self.dt, nb_dim=self.nb_dim)
return self._s_xi
[docs] def reset_params(self):
# reset everything
self._s_xi, self._s_u = None, None
self._x0 = None
# self._mvn_xi, self._mvn_u = None, None
self._mvn_sol_xi, self._mvn_sol_u = None, None
self._seq_xi, self._seq_u = None, None
@property
def horizon(self):
return self._horizon
@horizon.setter
def horizon(self, value):
self.reset_params()
self._horizon = value
[docs]class PoGLQRBi(LQR):
"""
Implementation of LQR with Product of Gaussian as described in
http://calinon.ch/papers/Calinon - HFR2016.pdf
"""
def __init__(self, A=None, B=None, nb_dim=2, dt=0.01, horizon=50):
# xi => zeta, x0 => zeta_1
self._horizon = horizon # T
self.A = A
self.B = B
self.nb_dim = nb_dim # D
self.dt = dt
self._s_xi, self._s_U = None, None
self._x0_l, self._x0_r, self._x0_c = None, None, None
self._mvn_xi_l, self._mvn_xi_r, self.mvn_xi_c, self._mvn_u = None, None, None, None
self._mvn_sol_xi_l, self._mvn_sol_xi_r, self._mvn_sol_U = None, None, None
self._seq_xi, self._seq_U = None, None
self._C, self._C_l, self._C_r = None, None, None # Coordination matrix
# <editor-fold desc="Setter A, B, C, horizon, x0_l, x0_r, x0_c">
@property
def A(self):
return self._A
@A.setter
def A(self, value):
self.reset_params() # reset params
self._A = value
@property
def B(self):
return self._B
@B.setter
def B(self, value):
self.reset_params() # reset params
self._B = value
@property
def C(self):
self._C = np.hstack((np.eye(self.mvn_U_dim), -1 * np.eye(self.mvn_U_dim))) # DT * DTK (100, 200)
return self._C
@property
def C_l(self):
self._C_l = np.hstack((np.eye(self.mvn_U_dim), 0 * np.eye(self.mvn_U_dim)))
return self._C_l
@property
def C_r(self):
self._C_r = np.hstack((0 * np.eye(self.mvn_U_dim), np.eye(self.mvn_U_dim)))
return self._C_r
@property
def horizon(self):
return self._horizon
@horizon.setter
def horizon(self, value):
self.reset_params()
self._horizon = value
@property
def x0_l(self):
return self._x0_l
@x0_l.setter
def x0_l(self, value):
# resetting solution
self._mvn_sol_xi = None
self._mvn_sol_u = None
self._x0_l = value
@property
def x0_r(self):
return self._x0_r
@x0_r.setter
def x0_r(self, value):
# resetting solution
self._mvn_sol_xi = None
self._mvn_sol_u = None
self._x0_r = value
@property
def x0_c(self):
return self._x0_c
@x0_c.setter
def x0_c(self, value):
# resetting solution
self._mvn_sol_xi = None
self._mvn_sol_u = None
self._x0_c = value
#
# @x0_c.setter
# def x0_c(self, value):
# # resetting solution
# self._mvn_sol_xi = None
# self._mvn_sol_u = None
#
# self._x0_c = value
# </editor-fold>
# <editor-fold desc="U and xi dim">
@property
def mvn_U_dim(self):
"""
Number of dimension of input sequence lifted form
:return:
"""
if self.B is not None:
return self.B.shape[1] * self.horizon
else:
return self.nb_dim * self.horizon
@property
def mvn_xi_dim(self):
"""
Number of dimension of state sequence lifted form
:return:
"""
if self.A is not None:
return self.A.shape[0] * self.horizon
else:
return self.nb_dim * self.horizon * 2
# </editor-fold>
@property
def mvn_sol_U(self):
"""
Distribution of control input after solving LQR
:return:
"""
assert self.x0_l is not None, "Please specify a starting state"
assert self.x0_r is not None, "Please specify a starting state"
assert self.mvn_xi_l is not None, "Please specify a target distribution"
assert self.mvn_xi_r is not None, "Please specify a target distribution"
assert self.mvn_xi_c is not None, "Please specify a target distribution"
assert self.mvn_u is not None, "Please specify a control input distribution"
if self._mvn_sol_U is None:
# self._mvn_sol_U = self.mvn_xi.inv_trans_s(self.s_U, self.s_xi.dot(self.x0)) % self.mvn_u
a = self.mvn_xi_l.inv_trans_sC(self.s_U, self.s_xi.dot(self.x0_l), self.C_l)
b = self.mvn_xi_r.inv_trans_sC(self.s_U, self.s_xi.dot(self.x0_r), self.C_r)
c = self.mvn_xi_c.inv_trans_sC(self.s_U, self.s_xi.dot(self.x0_c), self.C)
d = self.mvn_u.inv_trans_sR(self.s_U, self.C_l)
e = self.mvn_u.inv_trans_sR(self.s_U, self.C_r)
self._mvn_sol_U = self.get_sigma_mu(
a // d // b // e // c // c // c // c // c // c // c // c // c // c // c // c // c // c // c // c // c
// c // c // c // c // c // c // c // c // c // c // c // c // c // c // c // c // c // c // c // c
// c // c // c // c // c // c // c // c // c // c // c // c // c // c // c // c // c // c // c // c
// c // c // c // c // c // c // c // c // c // c // c // c // c // c // c // c // c // c // c // c
// c // c // c // c // c // c // c // c // c // c // c // c // c // c // c // c // c // c // c // c)
# self._mvn_sol_U = self.get_sigma_mu(
# self.mvn_xi_l.inv_trans_sC(self.s_U, self.s_xi.dot(self.x0_l), self.C_l) % \
# self.mvn_xi_r.inv_trans_sC(self.s_U, self.s_xi.dot(self.x0_r), self.C_r) % \
# self.mvn_xi_c.inv_trans_sC(self.s_U, self.s_xi.dot(self.x0_c), self.C) \
# % self.mvn_u.inv_trans_sR(self.C))
return self._mvn_sol_U
[docs] def get_sigma_mu(self, prod):
prod.sigma = np.linalg.pinv(prod.lmbda)
prod.mu = prod.sigma.dot(prod.lmbda_mu)
# sigma_U * \SUM s_U^T * Q_s * s_U * [C]^-1 (mu_U - s_xi*x0) + 0
return prod
@property
def mvn_sol_xi(self):
"""
Distribution of state after solving LQR
:return:
"""
if self._mvn_sol_xi is None:
# MU: self._mvn_sol_xi.mu -> self.s_u * mvn_sol_u.mu + self.s_xi.dot(self.x0)
# it means: zeta = S_u * u + S_zeta * zeta_0
# SIGMA: self._mvn_sol_xi.sigma -> self.s_u * mvn_sol_u.sigma * self.s_u^T
# it means: Sigma_zeta = S_u * Sigma_u * S_u^T
# Calinon, Stochastic learning and control in multiple coordinate systems
mvn_sol_U = self.mvn_sol_U
mvn_sol_u_l = pbd.MVN(
mu=mvn_sol_U.mu[:self.mvn_U_dim], sigma=mvn_sol_U.sigma[:self.mvn_U_dim, :self.mvn_U_dim])
mvn_sol_u_r = pbd.MVN(
mu=mvn_sol_U.mu[self.mvn_U_dim:], sigma=mvn_sol_U.sigma[self.mvn_U_dim:, self.mvn_U_dim:])
self._mvn_sol_xi_l = mvn_sol_u_l.transform(self.s_U, self.s_xi.dot(self.x0_l))
self._mvn_sol_xi_r = mvn_sol_u_r.transform(self.s_U, self.s_xi.dot(self.x0_r))
# self._mvn_sol_xi_l = None
return self._mvn_sol_xi_l, self._mvn_sol_xi_r
@property
def seq_xi(self):
if self._seq_xi is None:
mvn_sol_xi_l, mvn_sol_xi_r = self.mvn_sol_xi
self._seq_xi_l = mvn_sol_xi_l.mu.reshape(self.horizon, self.xi_dim)
self._seq_xi_r = mvn_sol_xi_r.mu.reshape(self.horizon, self.xi_dim)
# self._seq_xi_l = None
return self._seq_xi_l, self._seq_xi_r
@property
def seq_U(self):
if self._seq_U is None:
self._seq_U = self.mvn_sol_U.mu.reshape(self.horizon, self.U_dim)
return self._seq_U
# <editor-fold desc="Setter of mvn_xi and mvn_u">
@property
def mvn_xi_l(self):
"""
Distribution of state
:return:
"""
return self._mvn_xi_l
@mvn_xi_l.setter
def mvn_xi_l(self, value):
"""
:param value [float] or [pbd.MVN]
"""
# resetting solution
self._mvn_sol_xi = None
self._mvn_sol_U = None
self._seq_U = None
self._seq_xi = None
self._mvn_xi_l = value
@property
def mvn_xi_r(self):
"""
Distribution of state
:return:
"""
return self._mvn_xi_r
@mvn_xi_r.setter
def mvn_xi_r(self, value):
"""
:param value [float] or [pbd.MVN]
"""
# resetting solution
self._mvn_sol_xi = None
self._mvn_sol_U = None
self._seq_U = None
self._seq_xi = None
self._mvn_xi_r = value
@property
def mvn_xi_c(self):
"""
Distribution of state
:return:
"""
return self._mvn_xi_c
@mvn_xi_c.setter
def mvn_xi_c(self, value):
"""
:param value [float] or [pbd.MVN]
"""
# resetting solution
self._mvn_sol_xi = None
self._mvn_sol_U = None
self._seq_U = None
self._seq_xi = None
self._mvn_xi_c = value
@property
def mvn_u(self):
"""
Distribution of control input
:return:
"""
return self._mvn_u
@mvn_u.setter
def mvn_u(self, value):
"""
:param value [float] or [pbd.MVN]
"""
# resetting solution
self._mvn_sol_xi = None
self._mvn_sol_U = None
self._seq_U = None
self._seq_xi = None
if isinstance(value, pbd.MVN):
self._mvn_u = value
else:
self._mvn_u = pbd.MVN(
mu=np.zeros(self.mvn_U_dim), lmbda=10 ** value * np.eye(self.mvn_U_dim))
# </editor-fold>
@property
def xis(self):
return self.mvn_sol_xi.mu.reshape(self.horizon, self.xi_dim / self.horizon)
@property
def k(self):
# return self.mvn_sol_u.sigma.dot(self.s_u.T.dot(self.mvn_xi.lmbda)).dot(self.s_xi).reshape(
return self.mvn_sol_U.sigma.dot(self.s_U.T.dot(self.mvn_xi.lmbda)).dot(self.s_xi).reshape(
(self.horizon, self.mvn_U_dim / self.horizon, self.mvn_xi_dim / self.horizon))
@property
def s_U(self):
if self._s_U is None:
# Calinon, Stochastic learning and control in multiple coordinate systems, Appendix II
self._s_xi, self._s_U = lifted_transfer_matrix(self.A, self.B,
horizon=self.horizon, dt=self.dt, nb_dim=self.nb_dim)
return self._s_U
@property
def s_xi(self):
if self._s_xi is None:
# Calinon, Stochastic learning and control in multiple coordinate systems, Appendix II
self._s_xi, self._s_U = lifted_transfer_matrix(self.A, self.B,
horizon=self.horizon, dt=self.dt, nb_dim=self.nb_dim)
return self._s_xi
@property
def U_dim(self):
"""
Number of dimension of input
:return:
"""
if self.B is not None:
return self.B.shape[1]
else:
return self.nb_dim
[docs] def reset_params(self):
# reset everything
self._s_xi, self._s_U = None, None
self._x0 = None
# self._mvn_xi, self._mvn_u = None, None
self._mvn_sol_xi, self._mvn_sol_U = None, None
self._seq_xi, self._seq_U = None, None