Commit 64a8bec6 authored by Jorn Baayen's avatar Jorn Baayen

Introduce OptimizationProblem.der() method, yielding time-independent...

Introduce OptimizationProblem.der() method, yielding time-independent derivative symbols for use in path constraints.
parent b4c89e99
......@@ -40,9 +40,12 @@ class CollocatedIntegratedOptimizationProblem(OptimizationProblem):
'states'] + self.dae_variables['algebraics'] + self.dae_variables['control_inputs']
# Cache names of states
self._differentiated_states = [variable.getName() for variable in self.dae_variables['states']]
self._algebraic_states = [variable.getName() for variable in self.dae_variables['algebraics']]
self._controls = [variable.getName() for variable in self.dae_variables['control_inputs']]
self._differentiated_states = [
variable.getName() for variable in self.dae_variables['states']]
self._algebraic_states = [variable.getName()
for variable in self.dae_variables['algebraics']]
self._controls = [variable.getName()
for variable in self.dae_variables['control_inputs']]
@abstractmethod
def times(self, variable=None):
......@@ -116,7 +119,8 @@ class CollocatedIntegratedOptimizationProblem(OptimizationProblem):
logger.debug("Collocating variables {}".format(
repr(collocated_variables)))
self._path_variable_names = [variable.getName() for variable in self.path_variables]
self._path_variable_names = [variable.getName()
for variable in self.path_variables]
# Initialize control discretization
control_size, discrete_control, lbx_control, ubx_control, x0_control = self.discretize_controls()
......@@ -148,15 +152,18 @@ class CollocatedIntegratedOptimizationProblem(OptimizationProblem):
# Split derivatives into "integrated" and "collocated" lists.
integrated_derivatives = []
collocated_derivatives = []
for k in range(len(self.dae_variables['states'])):
if self.dae_variables['states'][k].getName() in self.integrated_states:
for k, var in enumerate(self.dae_variables['states']):
if var.getName() in self.integrated_states:
integrated_derivatives.append(
self.dae_variables['derivatives'][k])
else:
collocated_derivatives.append(
self.dae_variables['derivatives'][k])
for k in range(len(self.dae_variables['algebraics'] + self.dae_variables['control_inputs'])):
collocated_derivatives.append(MX.sym('dummy{}'.format(k)))
self._algebraic_and_control_derivatives = []
for k, var in enumerate(itertools.chain(self.dae_variables['algebraics'], self.dae_variables['control_inputs'])):
sym = MX.sym('der({})'.format(var.getName()))
self._algebraic_and_control_derivatives.append(sym)
collocated_derivatives.append(sym)
# Delayed feedback
delayed_feedback = self.delayed_feedback()
......@@ -370,14 +377,15 @@ class CollocatedIntegratedOptimizationProblem(OptimizationProblem):
constant_inputs = constant_inputs_interpolated
# Add constraints for initial conditions
initial_residual_with_params = MXFunction('initial_residual', [vertcat(self.dae_variables['states'] + self.dae_variables['algebraics'] + self.dae_variables['control_inputs'] + self.dae_variables[
'derivatives'] + self.dae_variables['constant_inputs'] + self.dae_variables['time'])], [vertcat([dae_residual_with_params, initial_residual_with_params])])
initial_residual_with_params = MXFunction('initial_residual', [vertcat(self.dae_variables['states'] + self.dae_variables['algebraics'] + self.dae_variables[
'control_inputs'] + integrated_derivatives + collocated_derivatives + self.dae_variables['constant_inputs'] + self.dae_variables['time'])], [vertcat([dae_residual_with_params, initial_residual_with_params])])
# Expand to SX for improved performance
initial_residual_with_params = initial_residual_with_params.expand()
# Compute initial residual, avoiding the use of expensive
# state_at().
initial_state = []
initial_derivatives = []
for variable in integrated_variables + collocated_variables:
variable = variable.getName()
value = self.state_vector(
......@@ -386,9 +394,10 @@ class CollocatedIntegratedOptimizationProblem(OptimizationProblem):
if nominal != 1:
value *= nominal
initial_state.append(value)
initial_derivatives.append(self.der_at(
variable, t0, ensemble_member=ensemble_member))
[res] = initial_residual_with_params([vertcat(initial_state
+ [self.der(variable, t0, ensemble_member=ensemble_member)
for variable in self.differentiated_states]
+ initial_derivatives
+ [constant_inputs[variable.getName()][0] for variable in self.dae_variables[
'constant_inputs']]
+ [0.0])], False, True)
......@@ -528,12 +537,14 @@ class CollocatedIntegratedOptimizationProblem(OptimizationProblem):
# Add path constraints to map()
path_constraints = self.path_constraints(ensemble_member)
path_constraints_function = MXFunction('path_constraints',
[vertcat(integrated_variables + collocated_variables + self.dae_variables[
[vertcat(integrated_variables + collocated_variables + integrated_derivatives + collocated_derivatives + self.dae_variables[
'constant_inputs'] + self.dae_variables['time'] + self.path_variables)],
[vertcat([f_constraint for (f_constraint, lb, ub) in path_constraints])])
path_constraints_function = path_constraints_function.expand()
accumulated_Y.extend(path_constraints_function([vertcat([integrated_states_1,
collocated_states_1,
integrated_finite_differences,
collocated_finite_differences,
constant_inputs_1,
collocation_time_1 - t0,
path_variables_1])],
......@@ -729,6 +740,7 @@ class CollocatedIntegratedOptimizationProblem(OptimizationProblem):
variable, ensemble_member=ensemble_member)
initial_path_variables.append(values[0])
initial_path_constraints = path_constraints_function([vertcat(initial_state
+ initial_derivatives
+ [constant_inputs[variable.getName()][0] for variable in self.dae_variables[
'constant_inputs']]
+ [0.0]
......@@ -1444,7 +1456,19 @@ class CollocatedIntegratedOptimizationProblem(OptimizationProblem):
dt = t[1:] - t[:x.size1() - 1]
return sumRows(x_avg * dt)
def der(self, variable, t, ensemble_member=0):
def der(self, variable):
# Look up the derivative variable for the given non-derivative variable
for i, variable in enumerate(self.differentiated_states):
for alias in self.variable_aliases(variable):
if alias.name == variable:
return self.dae_variables['derivatives'][i]
for i, variable in enumerate(itertools.chain(self.algebraic_states, self.controls)):
for alias in self.variable_aliases(variable):
if alias.name == variable:
return self._algebraic_and_control_derivatives[i]
raise KeyError
def der_at(self, variable, t, ensemble_member=0):
# Special case t being t0 for differentiated states
if t == self.initial_time:
# We have a special symbol for t0 derivatives
......@@ -1476,7 +1500,8 @@ class CollocatedIntegratedOptimizationProblem(OptimizationProblem):
if t == history_and_times[0]:
return 0.0
# Handle t being an interior point, or t0 for a non-differentiated state
# Handle t being an interior point, or t0 for a non-differentiated
# state
for i in range(len(history_and_times)):
# Use finite differences when between collocation points, and
# backward finite differences when on one.
......@@ -1487,4 +1512,4 @@ class CollocatedIntegratedOptimizationProblem(OptimizationProblem):
return dx / dt
# t does not belong to any collocation point interval
raise IndexError
\ No newline at end of file
raise IndexError
......@@ -842,9 +842,22 @@ class OptimizationProblem(object):
pass
@abstractmethod
def der(self, variable, t, ensemble_member=0):
def der(self, variable):
"""
Returns an expression for the derivative of the specified variable at time t.
Returns an :class:`MX` symbol for the time derivative given state, not bound to any time.
:param variable: Variable name.
:returns: :class:`MX` symbol for given state.
:raises: KeyError
"""
pass
@abstractmethod
def der_at(self, variable, t, ensemble_member=0):
"""
Returns an expression for the time derivative of the specified variable at time t.
:param variable: Variable name.
:param t: Time.
......
......@@ -342,3 +342,53 @@ class TestGoalProgrammingEnsemble(TestGoalProgramming):
), self.problem.extract_results(0)['x']), 0.1, objective_value_tol)
self.assertAlmostGreaterThan(self.problem.interpolate(0.7, self.problem.times(
), self.problem.extract_results(1)['x']), 0.1, objective_value_tol)
class PathGoalSmoothing(Goal):
def function(self, optimization_problem, ensemble_member):
return optimization_problem.der('u')
function_range = (-1e1, 1e1)
priority = 3
class TestProblemPathGoalsSmoothing(GoalProgrammingMixin, ModelicaMixin, CollocatedIntegratedOptimizationProblem):
def __init__(self):
super(TestProblemPathGoalsSmoothing, self).__init__(input_folder=data_path(
), output_folder=data_path(), model_name='TestModelWithInitial', model_folder=data_path())
def times(self, variable=None):
# Collocation points
return np.linspace(0.0, 1.0, 21)
def delayed_feedback(self):
# Delayed feedback
return [('x', 'x_delayed', 0.1)]
def constant_inputs(self, ensemble_member):
# Constant inputs
return {'constant_input': Timeseries(np.hstack(([self.initial_time, self.times()])), np.hstack(([1.0], np.linspace(1.0, 0.0, 21))))}
def bounds(self):
bounds = super(TestProblemPathGoalsSmoothing, self).bounds()
bounds['u'] = (-2.0, 2.0)
return bounds
def path_goals(self):
return [PathGoal1(), PathGoal2(), PathGoalSmoothing()]
class TestGoalProgrammingSmoothing(TestCase):
def setUp(self):
self.problem = TestProblemPathGoalsSmoothing()
self.problem.optimize()
self.tolerance = 1e-6
def test_x(self):
value_tol = 1e-3
for x in self.problem.extract_results()['x']:
self.assertAlmostGreaterThan(x, 0.0, value_tol)
self.assertAlmostLessThan(x, 1.1, value_tol)
\ No newline at end of file
......@@ -256,12 +256,12 @@ class TestModelicaMixin(TestCase):
self.assertEqual(repr(states), repr(verify))
def test_der(self):
der = self.problem.der('x', 0.05)
der = self.problem.der_at('x', 0.05)
verify = (self.problem.state_at('x', 0.05) -
self.problem.state_at('x', 0.0)) / 0.05
self.assertEqual(repr(der), repr(verify))
der = self.problem.der('x', 0.051)
der = self.problem.der_at('x', 0.051)
verify = (self.problem.state_at('x', 0.1) -
self.problem.state_at('x', 0.05)) / 0.05
self.assertEqual(repr(der), repr(verify))
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment