Source code for yaocptool.estimation.pce_kalman_filter

# coding=utf-8
from math import factorial

from casadi import mtimes, vertcat, solve, DM, vec

from yaocptool.estimation.estimator_abstract import EstimatorAbstract
from yaocptool.modelling import SystemModel, DataSet
from yaocptool.stochastic import sample_parameter_normal_distribution_with_sobol
from yaocptool.stochastic.pce import get_ls_factor


[docs]class PCEKalmanFilter(EstimatorAbstract):
[docs] def __init__(self, model, **kwargs): """ :param SystemModel model: estimator model :param x_mean: a initial guess for the mean :param p_k: a initial guess for the covariance of the estimator :param h_function: a function that receives 3 parameters (x, y_algebraic, u) and returns an measurement. :param c_matrix: if h_function is not give, c_matrix is "C" from the measurement equation: y_meas = C*[x y_alg] + D*u. note that it has to have n_x + n_y (algebraic) columns. :param DM r_v: process noise matrix :param DM r_n: measurement noise matrix :param int pc_order: Polynomial chaos order :param int n_samples: Number of samples to be used """ self.model = model self.h_function = None self.c_matrix = None self.d_matrix = None self.implementation = 'standard' self.x_mean = None self.p_k = None self.r_v = DM(0.) self.r_n = DM(0.) self.p = None self.theta = None self.y_guess = None self.n_samples = None self.n_uncertain = None self.pc_order = 4 self._types_fixed = False self._checked = False EstimatorAbstract.__init__(self, **kwargs) self._fix_types() if self.n_uncertain is None: self.n_uncertain = self.x_mean.numel() if self.n_samples is None: self.n_samples = factorial(self.n_uncertain + self.pc_order) // ( factorial(self.n_uncertain) * factorial(self.pc_order)) if self.n_samples < self.n_pol_parameters: raise ValueError( 'Number of samples has to greater or equal to the number of polynomial parameters' '"n_samples"={}, n_pol_parameter={}'.format( self.n_samples, self.n_pol_parameters)) self._ls_factor, _, _ = get_ls_factor(self.n_uncertain, self.n_samples, self.pc_order) self.dataset = DataSet(name=self.model.name) self.dataset.data['x']['size'] = self.model.n_x self.dataset.data['x']['names'] = [ 'est_' + self.model.x_sym[i].name() for i in range(self.model.n_x) ] self.dataset.data['P']['size'] = self.model.n_x**2 self.dataset.data['P']['names'] = [ 'P_' + str(i) + str(j) for i in range(self.model.n_x) for j in range(self.model.n_x) ]
@property def n_pol_parameters(self): n_pol_parameters = factorial(self.n_uncertain + self.pc_order) / ( factorial(self.n_uncertain) * factorial(self.pc_order)) return n_pol_parameters def _fix_types(self): self.x_mean = vertcat(self.x_mean) self.p_k = vertcat(self.p_k) self.r_v = vertcat(self.r_v) self.r_n = vertcat(self.r_n) if self.p is not None: self.p = vertcat(self.p) def _check(self): if self.x_mean is None: raise ValueError( 'A initial condition for the "x_mean" must be provided') if self.p_k is None: raise ValueError( 'A initial condition for the "p_k" must be provided') if self.h_function is None and self.c_matrix is None: raise ValueError( 'Neither a measurement function "h_function" or a measurement matrix "c_matrix" was given' ) return True
[docs] def estimate(self, t_k, y_k, u_k): if not self._checked: self._check() self._checked = True if not self._types_fixed: self._fix_types() self._types_fixed = True x_mean = self.x_mean x_cov = self.p_k (x_hat_k_minus, p_k_minus, y_hat_k_minus, p_yk_yk, k_gain) = self._priori_update(x_mean, x_cov, u=u_k, p=self.p, theta=self.theta) x_hat_k = x_hat_k_minus + mtimes(k_gain, (y_k - y_hat_k_minus)) p_k = p_k_minus - mtimes(k_gain, mtimes(p_yk_yk, k_gain.T)) self.x_mean = x_hat_k self.p_k = p_k self.dataset.insert_data('x', t_k, self.x_mean) self.dataset.insert_data('P', t_k, vec(self.p_k)) return x_hat_k, p_k
def _get_measurement_from_prediction(self, x, y, u): if self.h_function is not None: measurement_prediction = self.h_function(x, y, u) elif self.c_matrix is not None: measurement_prediction = mtimes(self.c_matrix, vertcat(x, y)) if self.d_matrix is not None: measurement_prediction = measurement_prediction + mtimes( self.d_matrix, u) else: raise ValueError( 'Neither a measurement function "h_function" or a measurement matrix "c_matrix" was given' ) return measurement_prediction def _priori_update(self, x_mean, x_cov, u, p, theta): x_samples = self._get_sampled_states(x_mean, x_cov) # Perform predictions via simulation simulation_results = [] x_cal_x_k_at_k_minus_1 = [] y_alg_cal_x_k_at_k_minus_1 = [] for s in range(self.n_samples): x_0_i = DM(x_samples[s]) simulation_results_i = self.model.simulate(x_0=x_0_i, t_0=self.t, t_f=self.t + self.t_s, u=u, p=p, theta=theta, y_0=self.y_guess) simulation_results.append(simulation_results_i) x_cal_x_k_at_k_minus_1.append( simulation_results_i.final_condition()[0]) y_alg_cal_x_k_at_k_minus_1.append( simulation_results[s].final_condition()[1]) # fit the polynomial for x a_x = [] x_hat_k_minus = [] for i in range(self.model.n_x): x_i_vector = vertcat( *[x_cal_x_k_at_k_minus_1[s][i] for s in range(self.n_samples)]) a_x.append(mtimes(self._ls_factor, x_i_vector)) # get the mean for x for i in range(self.model.n_x): x_hat_k_minus.append(a_x[i][0]) x_hat_k_minus = vertcat(*x_hat_k_minus) # get the covariance for x p_k_minus = DM.zeros(self.model.n_x, self.model.n_x) for i in range(self.model.n_x): for j in range(self.model.n_x): p_k_minus[i, j] = sum( [a_x[i][k] * a_x[j][k] for k in range(1, self.n_samples)]) + self.r_v[i, j] # calculate the measurement for each sample y_cal_k_at_k_minus_1 = [] for s in range(self.n_samples): y_cal_k_at_k_minus_1.append( self._get_measurement_from_prediction( x_cal_x_k_at_k_minus_1[s], y_alg_cal_x_k_at_k_minus_1[s], u)) n_meas = y_cal_k_at_k_minus_1[0].numel() # find the measurements estimate a_meas = [] y_meas_hat_k_minus = [] for i in range(n_meas): y_meas_i_vector = vertcat( *[y_cal_k_at_k_minus_1[s][i] for s in range(self.n_samples)]) a_meas.append(mtimes(self._ls_factor, y_meas_i_vector)) # get the mean for the measurement for i in range(n_meas): y_meas_hat_k_minus.append(a_meas[i][0]) y_meas_hat_k_minus = vertcat(*y_meas_hat_k_minus) # get the covariance for the meas p_yk_yk = DM.zeros(n_meas, n_meas) for i in range(n_meas): for j in range(n_meas): p_yk_yk[i, j] = sum([ a_meas[i][k] * a_meas[j][k] for k in range(1, self.n_samples) ]) p_yk_yk = p_yk_yk + self.r_n # get cross-covariance p_xk_yk = DM.zeros(self.model.n_x, n_meas) for i in range(self.model.n_x): for j in range(n_meas): p_xk_yk[i, j] = sum([ a_x[i][k] * a_meas[j][k] for k in range(1, self.n_samples) ]) # k_gain = mtimes(p_xk_yk, inv(p_yk_yk)) k_gain = solve(p_yk_yk.T, p_xk_yk.T).T return x_hat_k_minus, p_k_minus, y_meas_hat_k_minus, p_yk_yk, k_gain def _get_sampled_states(self, x_mean, x_cov): x_samples = sample_parameter_normal_distribution_with_sobol( x_mean, x_cov, self.n_samples) return [x_samples[:, i] for i in range(self.n_samples)]