Source code for yaocptool.methods.classic.multipleshooting

# -*- coding: utf-8 -*-
"""
Created on Thu Jul 13 17:08:34 2017

@author: marco
"""
from collections import defaultdict

from casadi import DM, vertcat, Function, repmat, is_equal, inf, vec, horzcat

from yaocptool.methods import OptimizationResult
from yaocptool.methods.base.discretizationschemebase import DiscretizationSchemeBase
from yaocptool.optimization import NonlinearOptimizationProblem


[docs]class MultipleShootingScheme(DiscretizationSchemeBase): def _create_nlp_symbolic_variables(self, nlp): """ Create the symbolic variables that will be used by the NLP problem :param NonlinearOptimizationProblem nlp: :rtype: (MX, List[List[MX]], List[List[MX]], List[MX], MX, MX, List[MX) """ x, y, u = [], [], [] for el in range(self.finite_elements + 1): x_k = [ nlp.create_variable('mx_x_' + str(el), self.model.n_x, lb=self.problem.x_min, ub=self.problem.x_max) ] x.append(x_k) for el in range(self.finite_elements): y_k = [ nlp.create_variable('mx_y_' + str(el), self.model.n_y, lb=self.problem.y_min, ub=self.problem.y_max) ] y.append(y_k) for el in range(self.finite_elements): u_k = [] if self.model.n_u_par > 0: for n in range(self.degree_control): u_k.append( nlp.create_variable('mx_u_' + str(el) + '_' + str(n), self.model.n_u, lb=self.problem.u_min, ub=self.problem.u_max)) u.append(u_k) eta = nlp.create_variable('mx_eta', self.problem.n_eta) p_opt = nlp.create_variable('mx_p_opt', self.problem.n_p_opt, lb=self.problem.p_opt_min, ub=self.problem.p_opt_max) theta_opt = [] for el in range(self.finite_elements): theta_opt.append( nlp.create_variable('mx_theta_opt_' + str(el), self.problem.n_theta_opt, lb=self.problem.theta_opt_min, ub=self.problem.theta_opt_max)) v_x = self.vectorize(x) v_y = self.vectorize(y) v_u = self.vectorize(u) v_theta_opt = vertcat(*theta_opt) v = vertcat(v_x, v_y, v_u, eta, p_opt, v_theta_opt) return v, x, y, u, eta, p_opt, theta_opt
[docs] def unpack_decision_variables(self, decision_variables): """Return a structured data from the decision variables vector Returns: (x_data, y_data, u_data, p_opt, eta, theta_opt) :param decision_variables: DM :return: tuple """ x, y, u, eta, p_opt = [], [], [], [], [] offset = 0 for el in range(self.finite_elements + 1): x.append([decision_variables[offset:offset + self.model.n_x]]) offset = offset + self.model.n_x for el in range(self.finite_elements): y.append([decision_variables[offset:offset + self.model.n_y]]) offset = offset + self.model.n_y for el in range(self.finite_elements): u_k = [] if self.model.n_u_par > 0: for i in range(self.degree_control): u_k.append(decision_variables[offset:offset + self.model.n_u]) offset += self.model.n_u u.append(u_k) eta = decision_variables[offset:offset + self.problem.n_eta] offset += self.problem.n_eta p_opt = decision_variables[offset:offset + self.problem.n_p_opt] offset += self.problem.n_p_opt theta_opt = [] for el in range(self.finite_elements): theta_opt.append(decision_variables[offset:offset + self.problem.n_theta_opt]) offset += self.problem.n_theta_opt assert offset == decision_variables.numel() return x, y, u, eta, p_opt, theta_opt
[docs] def discretize(self, x_0=None, p=None, theta=None, last_u=None): """Discretize the OCP, returning a Optimization Problem :param x_0: initial condition :param p: parameters :param theta: theta parameters :param last_u: last applied control :rtype: NonlinearOptimizationProblem """ if p is None: p = [] if theta is None: theta = dict([(i, []) for i in range(self.finite_elements)]) if x_0 is None: x_0 = self.problem.x_0 # Create nlp object nlp = NonlinearOptimizationProblem(name='multiple_shooting_' + self.problem.name) # Create NLP symbolic variables all_decision_vars, x_var, y_var, u_var, eta, p_opt, theta_opt = self._create_nlp_symbolic_variables( nlp) cost = 0 # Put the symbolic optimization parameters in the parameter vector for i, p_opt_index in enumerate(self.problem.get_p_opt_indices()): p[p_opt_index] = p_opt[i] # Put the symbolic theta_opt in the theta vector for i, theta_opt_index in enumerate( self.problem.get_theta_opt_indices()): for el in range(self.finite_elements): theta[el][theta_opt_index] = theta_opt[el][i] # Create "simulations" time_dict, a dict informing the simulation points in each finite elem. for each variable time_dict = self._create_time_dict_for_multiple_shooting() # Initial time constraint/initial condition f_h_initial = Function('h_initial', [self.model.x, self.model.x_0], [self.problem.h_initial]) nlp.include_equality(f_h_initial(x_var[0][0], x_0)) # Create functions to be evaluated functions = defaultdict(dict) for i in range(self.problem.n_g_ineq): functions['g_ineq_' + str(i)] = self._create_function_from_expression( 'f_g_ineq_' + str(i), self.problem.g_ineq[i]) for i in range(self.problem.n_g_eq): functions['g_eq_' + str(i)] = self._create_function_from_expression( 'f_g_eq_' + str(i), self.problem.g_eq[i]) functions['f_s_cost'] = self._create_function_from_expression( 'f_s_cost', self.problem.S) # Multiple Shooting "simulation" results = self.get_system_at_given_times(x_var, y_var, u_var, time_dict, p, theta, functions=functions) # Build the NLP s_cost = 0 for el in range(self.finite_elements): x_at_el_p_1 = results[el]['x'][0] y_end_el = results[el]['y'][0] # cost function as a sum of cost at each element if self.solution_method.solution_class == 'direct' and self.solution_method.cost_as_a_sum: x_at_el_p_1 = [x_at_el_p_1[n] for n in range(self.model.n_x)] x_at_el_p_1[-1] = 0 x_at_el_p_1 = vertcat(*x_at_el_p_1) cost += results[el]['x'][0][-1] # Continuity constraint for defining x nlp.include_equality(x_at_el_p_1 - x_var[el + 1][0]) # Defining y nlp.include_equality(y_end_el - y_var[el][0]) # S cost s_cost += results[el]['f_s_cost'][-1] # Implement the constraint on delta_u if self.problem.has_delta_u: if el > 0: for i in range(self.model.n_u): if not is_equal(self.problem.delta_u_max[i], inf) or not is_equal( self.problem.delta_u_min[i], -inf): nlp.include_inequality( u_var[el][0][i] - u_var[el - 1][0][i], lb=self.problem.delta_u_min[i], ub=self.problem.delta_u_max[i]) elif el == 0 and last_u is not None: for i in range(self.model.n_u): if (not is_equal(self.problem.delta_u_max[i], inf) or not is_equal(self.problem.delta_u_min[i], -inf)): nlp.include_inequality( u_var[el][0][i] - last_u[i], lb=self.problem.delta_u_min[i], ub=self.problem.delta_u_max[i]) # Time dependent inequalities for i in range(self.problem.n_g_ineq): nlp.include_inequality(results[el]['g_ineq_' + str(i)][0], ub=0) # Time dependent equalities for i in range(self.problem.n_g_eq): nlp.include_equality(results[el]['g_eq_' + str(i)][0]) # Final time constraint f_h_final = Function('h_final', [self.model.x, self.problem.eta, self.model.p], [self.problem.h_final]) nlp.include_equality(f_h_final(x_var[-1][0], eta, p)) # Time independent constraints f_h = Function('h', [self.model.p], [self.problem.h]) nlp.include_equality(f_h(p)) if self.solution_method.solution_class == 'direct': if not self.solution_method.cost_as_a_sum: f_final_cost = Function('FinalCost', [self.model.x, self.model.p], [self.problem.V]) cost = f_final_cost(x_var[-1][0], p) cost += s_cost nlp.set_objective(cost) return nlp
[docs] def get_system_at_given_times(self, x_var, y_var, u_var, time_dict=None, p=None, theta=None, functions=None, start_at_t_0=False): """ :param x_var: List[List[MX]] :param y_var: List[List[MX]] :param u_var: List[List[MX]] :type time_dict: Dict(int, List(float)) Dictionary of simulations times, where the KEY is the finite_element and the VALUE list a list of desired times example : {1:{'t_0': 0.0, 'x':[0.0, 0.1, 0.2], y:[0.2]}} :param p: list :param theta: dict :param start_at_t_0: bool If TRUE the simulations in each finite_element will start at the element t_0, Otherwise the simulation will start the end of the previous element :param functions: Dict[str, Function|Dict[int] dictionary of Functions to be evaluated, KEY is the function identifier, VALUE is a CasADi Function with model.all_sym as input """ if theta is None: theta = {} if p is None: p = [] if time_dict is None: time_dict = {} if functions is None: functions = {} results = defaultdict(lambda: defaultdict(list)) # for el in range(finite_element): for el in time_dict: t_0 = t_init = time_dict[el]['t_0'] t_f = time_dict[el]['t_f'] x_init = x_var[el][0] # Create dae_sys and the control function dae_sys = self.model.get_dae_system() dae_sys.convert_from_tau_to_time(self.time_breakpoints[el], self.time_breakpoints[el + 1]) u_func = self.model.convert_expr_from_tau_to_time( self.model.u_expr, t_0, t_f) if self.solution_method.solution_class == 'direct': f_u = Function('f_u_pol', [self.model.t, self.model.u_par], [u_func]) elif self.solution_method.solution_class == 'indirect': f_u = Function('f_u_pol', list(self.model.all_sym), [u_func]) else: raise NotImplemented # Find the times that need to be evaluated element_breakpoints = set() for key in ['x', 'y', 'u'] + list(functions.keys()): if key in time_dict[el]: element_breakpoints = element_breakpoints.union( time_dict[el][key]) element_breakpoints = list(element_breakpoints) element_breakpoints.sort() # If values are needed from t_0, get it if 'x' in time_dict[el] and t_0 in time_dict[el]['x']: results[el]['x'].append(x_var[el][0]) if 'y' in time_dict[el] and t_0 in time_dict[el]['y']: raise NotImplementedError if 'u' in time_dict[el] and t_0 in time_dict[el]['u']: if self.solution_method.solution_class == 'direct': results[el]['u'].append(f_u(t_0, self.vectorize(u_var[el]))) elif self.solution_method.solution_class == 'indirect': raise NotImplementedError else: raise NotImplementedError for f_name in functions: if t_0 in time_dict[el][f_name]: raise NotImplementedError # Remove t_0 from the list of times that need to be evaluated if t_0 in element_breakpoints: element_breakpoints.remove(t_0) for t_ind, t in enumerate(element_breakpoints): if self.solution_method.solution_class == 'direct': p_i = vertcat(p, theta[el], self.vectorize(u_var[el])) else: p_i = vertcat(p, theta[el]) # Do the simulation sim_result = dae_sys.simulate( x_init, t_0=t_init, t_f=t, p=p_i, y_0=self.problem.y_guess, integrator_type=self.solution_method.integrator_type, integrator_options={ 'name': 'integrator_' + str(el) + '_' + str(t_ind) }) # Fetch values from results x_t, y_t = sim_result['xf'], sim_result['zf'] # Save to the result vector if 'x' in time_dict[el] and t in time_dict[el]['x']: results[el]['x'].append(x_t) if 'y' in time_dict[el] and t in time_dict[el]['y']: results[el]['y'].append(y_t) if 'u' in time_dict[el] and t in time_dict[el]['u']: if self.solution_method.solution_class == 'direct': results[el]['u'].append( f_u(t, self.vectorize(u_var[el]))) elif self.solution_method.solution_class == 'indirect': results[el]['u'].append( f_u(*self.model.put_values_in_all_sym_format( t, x=x_t, y=y_t, p=p, theta=theta[el], u_par=u_var[el]))) for f_name in functions: if t in time_dict[el][f_name]: f = functions[f_name][el] val = f(*self.model.put_values_in_all_sym_format( t=t, x=x_t, y=y_t, p=p, theta=theta[el], u_par=self.vectorize(u_var[el]))) results[el][f_name].append(val) # If the simulation should start from the begin of the simulation interval, do not change the t_init if not start_at_t_0: t_init = t x_init = x_t return results
def _create_time_dict_for_multiple_shooting(self): time_dict = {} for el in range(self.finite_elements): time_dict[el] = {} time_dict[el]['t_0'] = self.time_breakpoints[el] time_dict[el]['t_f'] = self.time_breakpoints[el + 1] time_dict[el]['x'] = [self.time_breakpoints[el + 1]] time_dict[el]['y'] = [self.time_breakpoints[el + 1]] # time_dict[el]['u'] = self.time_interpolation_controls[el] time_dict[el]['f_s_cost'] = [self.time_breakpoints[el + 1]] for i in range(self.problem.n_g_ineq): if self.problem.time_g_ineq[i] == 'start': time_dict[el]['g_ineq_' + str(i)] = [self.time_breakpoints[el]] if self.problem.time_g_ineq[i] == 'end': time_dict[el]['g_ineq_' + str(i)] = [self.time_breakpoints[el + 1]] if self.problem.time_g_ineq[i] == 'default': time_dict[el]['g_ineq_' + str(i)] = [self.time_breakpoints[el + 1]] for i in range(self.problem.n_g_eq): if self.problem.time_g_eq[i] == 'start': time_dict[el]['g_eq_' + str(i)] = [self.time_breakpoints[el]] if self.problem.time_g_eq[i] == 'end': time_dict[el]['g_eq_' + str(i)] = [self.time_breakpoints[el + 1]] if self.problem.time_g_eq[i] == 'default': time_dict[el]['g_eq_' + str(i)] = [self.time_breakpoints[el + 1]] return time_dict
[docs] def create_initial_guess(self, p=None, theta=None): """Create an initial guess for the optimal control problem using problem.x_0, problem.y_guess, problem.u_guess, and a given p and theta (for p_opt and theta_opt) if they are given. If y_guess or u_guess are None the initial guess uses a vector of zeros of appropriate size. :param p: Optimization parameters :param theta: Optimization theta :return: """ x_init = repmat(self.problem.x_0, self.finite_elements + 1) if self.problem.y_guess is not None: y_init = repmat(self.problem.y_guess, self.finite_elements) else: y_init = repmat(DM.zeros(self.model.n_y), self.finite_elements) if self.model.n_u_par > 0: if self.problem.u_guess is not None: u_init = repmat(self.problem.u_guess, self.degree_control * self.finite_elements) else: u_init = repmat(DM.zeros(self.model.n_u), self.degree_control * self.finite_elements) else: u_init = [] eta_init = DM.zeros(self.problem.n_eta, 1) p_opt_init = DM.zeros(self.problem.n_p_opt, 1) theta_opt_init = DM.zeros( self.problem.n_theta_opt * self.finite_elements, 1) if p is not None: for k, ind in enumerate(self.problem.get_p_opt_indices()): p_opt_init[k] = p[ind] if theta is not None: for el in range(self.finite_elements): for k, ind in enumerate(self.problem.get_theta_opt_indices()): theta_opt_init[k + el * self.problem.n_theta_opt] = theta[el][ind] return vertcat(x_init, y_init, u_init, eta_init, p_opt_init, theta_opt_init)
[docs] def create_initial_guess_with_simulation(self, u=None, p=None, theta=None): """Create an initial guess for the optimal control problem using by simulating with a given control u, and a given p and theta (for p_opt and theta_opt) if they are given. If no u is given the value of problem.u_guess is used, or problem.last_u, then a vector of zeros of appropriate size is used. If no p or theta is given, an vector of zeros o appropriate size is used. :param u: Control initial guess :param p: Optimization parameters :param theta: Optimization theta :return: """ x_init = [] y_init = [] u_init = [] # Simulation if u is None: if self.problem.u_guess is not None: u = self.problem.u_guess elif self.problem.last_u is not None: u = self.problem.last_u else: u = DM.zeros(self.model.n_u) if self.model.n_u_par > 0: u = vec(horzcat(*[u] * self.degree_control)) else: u = [] x_0 = self.problem.x_0 y_guess = self.problem.y_guess x_init.append([x_0]) for el in range(self.finite_elements): el_x = [] el_y = [] # get DAE system and remove tau dae_sys = self.model.get_dae_system() dae_sys.convert_from_tau_to_time(t_k=self.time_breakpoints[el], t_kp1=self.time_breakpoints[el + 1]) # Prepare for loop t_init = self.time_breakpoints[el] p_el = vertcat(p, theta[el], u) for t in [self.time_breakpoints[el + 1]]: res = dae_sys.simulate(x_0=x_0, t_0=t_init, t_f=t, p=p_el, y_0=y_guess) el_x.append(res['xf']) el_y.append(res['zf']) t_init = t x_0 = res['xf'] x_init.append(el_x) y_init.append(el_y) u_init.append(u) x_init = self.vectorize(x_init) y_init = self.vectorize(y_init) u_init = self.vectorize(u_init) # Other variables eta_init = DM.zeros(self.problem.n_eta, 1) p_opt_init = DM.zeros(self.problem.n_p_opt, 1) theta_opt_init = DM.zeros( self.problem.n_theta_opt * self.finite_elements, 1) if p is not None: for k, ind in enumerate(self.problem.get_p_opt_indices()): p_opt_init[k] = p[ind] if theta is not None: for el in range(self.finite_elements): for k, ind in enumerate(self.problem.get_theta_opt_indices()): theta_opt_init[k + el * self.problem.n_theta_opt] = theta[el][ind] return vertcat(x_init, y_init, u_init, eta_init, p_opt_init, theta_opt_init)
[docs] def set_data_to_optimization_result_from_raw_data(self, optimization_result, raw_solution_dict): """ Set the raw data received from the solver and put it in the Optimization Result object :param OptimizationResult optimization_result: :param dict raw_solution_dict: """ optimization_result.raw_solution_dict = raw_solution_dict optimization_result.raw_decision_variables = raw_solution_dict['x'] optimization_result.objective_opt_problem = raw_solution_dict['f'] optimization_result.constraint_values = raw_solution_dict['g'] x_values, y_values, u_values, eta, p_opt, theta_opt = self.unpack_decision_variables( raw_solution_dict['x']) optimization_result.p[self.problem.get_p_opt_indices()] = p_opt for el in optimization_result.theta: optimization_result.theta[el][ self.problem.get_theta_opt_indices()] = theta_opt[el] # if u_values are all empty, u is not based on the u_par directly if len(u_values[0]) == 0: time_dict = dict([(el, { 'u': self.time_interpolation_controls[el], 't_0': self.time_breakpoints[el], 't_f': self.time_breakpoints[el + 1] }) for el in range(self.finite_elements)]) result = self.get_system_at_given_times( x_values, y_values, u_values, time_dict=time_dict, p=optimization_result.p, theta=optimization_result.theta) u_values = [result[el]['u'] for el in range(self.finite_elements)] optimization_result.p_opt = p_opt optimization_result.eta = eta optimization_result.p_opt = p_opt optimization_result.theta_opt = theta_opt optimization_result.x_data['values'] = x_values optimization_result.y_data['values'] = y_values optimization_result.u_data['values'] = u_values optimization_result.x_data['time'] = [[t] for t in self.time_breakpoints] optimization_result.y_data['time'] = [ [t] for t in self.time_breakpoints[1:] ] if self.degree_control == 1: optimization_result.u_data['time'] = [ [t] for t in self.time_breakpoints[:-1] ] else: optimization_result.u_data['time'] = [[ t + self.delta_t * col for col in self.solution_method.collocation_points(self.degree_control) ] for t in self.time_breakpoints[:-1]]