Commit eb5e4b7f authored by Anne Hommelberg's avatar Anne Hommelberg

Add unit tests for new io classes

Adds unit tests for the two IOMixin and DataStore classes.
Also fixes some minor bugs in the set_timeseries method.
Also adds the get_parameter_ensemble_size method to make parameter
access in the DataStore consistent with access to the stored time
series.
parent bc0b8320
......@@ -204,6 +204,12 @@ class DataStore(metaclass=ABCMeta):
return set()
return self.__parameters[ensemble_member].keys()
def get_parameter_ensemble_size(self):
"""
Returns the number of ensemble members for which parameters are stored in the internal data store
"""
return len(self.__parameters)
@staticmethod
def datetime_to_sec(d: Union[Iterable[datetime], datetime], t0: datetime) -> Union[Iterable[float], float]:
"""
......
......@@ -75,7 +75,7 @@ class IOMixin(OptimizationProblem, metaclass=ABCMeta):
def stretch_values(values, t_pos):
# Construct a values range with preceding and possibly following nans
new_values = np.full_like(self.io.__timeseries_times_sec, np.nan)
new_values = np.full(self.io.get_times().shape, np.nan)
new_values[t_pos:] = values
return new_values
......@@ -105,28 +105,25 @@ class IOMixin(OptimizationProblem, metaclass=ABCMeta):
# times.
t_pos = bisect.bisect_left(timeseries_times_sec, timeseries.times[0])
# Construct a new values range with length of self.__timeseries_times_sec
# Construct a new values range with length of self.io.get_times()
values = stretch_values(timeseries.values, t_pos)
else:
values = timeseries.values
else:
if len(self.times()) == len(timeseries):
values = timeseries
else:
if check_consistency:
raise ValueError('IOMixin: Trying to set values for {} with a different '
'length ({}) than the forecast length. Please make sure the '
'values covers all timesteps of the longest imported timeseries (length {}).'
.format(variable, len(timeseries), len(self.times())))
# If times is not supplied with the timeseries, we add the
# forecast times range to a new Timeseries object. Hereby
# we assume that the supplied values stretch from T0 to end.
t_pos = self.io.get_forecast_index()
# Construct a new values range with length of self.__timeseries_times_sec
values = stretch_values(timeseries, t_pos)
if check_consistency and len(self.times()) != len(timeseries):
raise ValueError('IOMixin: Trying to set values for {} with a different '
'length ({}) than the forecast length. Please make sure the '
'values covers all timesteps of the longest imported timeseries (length {}).'
.format(variable, len(timeseries), len(self.times())))
# If times is not supplied with the timeseries, we add the
# forecast times range to a new Timeseries object. Hereby
# we assume that the supplied values stretch from T0 to end.
t_pos = self.io.get_forecast_index()
# Construct a new values range with length of self.io.get_times()
values = stretch_values(timeseries, t_pos)
self.io.set_timeseries_values(variable, values, ensemble_member)
......
......@@ -116,4 +116,71 @@ class TestDummyDataStore(TestCase):
logger.warning('All is well') # if no log message occurs, assertLogs will throw an AssertionError
self.assertEqual(self.datastore.io.get_parameter('myNewParameter'), 2.2)
# todo add tests that use newly added methods: get_variables, get_ensemble_size and get_parameter_names()
def test_variables(self):
self.assertEqual(len(self.datastore.io.get_variables()), 0)
self.datastore.io.set_times(np.array([0, 1, 2]))
self.datastore.io.set_timeseries_values('var1', np.array([1.0, 2.0, 3.0]))
self.datastore.io.set_timeseries_values('var2', np.array([2.0, 3.0, 4.0]))
variables = self.datastore.io.get_variables()
self.assertEqual(len(variables), 2)
self.assertTrue('var1' in variables)
self.assertTrue('var2' in variables)
self.assertEqual(len(self.datastore.io.get_variables(ensemble_member=1)), 0)
self.datastore.io.set_timeseries_values('var3', np.array([0.1, 0.2, 0.3]), ensemble_member=1)
variables = self.datastore.io.get_variables(ensemble_member=1)
self.assertEqual(len(variables), 1)
self.assertTrue('var3' in variables)
def test_ensemble_size(self):
self.assertEqual(self.datastore.io.get_ensemble_size(), 0)
self.datastore.io.set_times(np.array([0, 1, 2]))
self.datastore.io.set_timeseries_values('var1', np.array([1.0, 2.0, 3.0]))
self.datastore.io.set_timeseries_values('var2', np.array([2.0, 3.0, 4.0]))
self.assertEqual(self.datastore.io.get_ensemble_size(), 1)
self.datastore.io.set_timeseries_values('var3', np.array([0.1, 0.2, 0.3]), ensemble_member=1)
self.assertEqual(self.datastore.io.get_ensemble_size(), 2)
self.datastore.io.set_timeseries_values('var4', np.array([1.1, 2.2, 3.3]), ensemble_member=100)
self.assertEqual(self.datastore.io.get_ensemble_size(), 101)
def test_parameter_names(self):
self.assertEqual(len(self.datastore.io.get_parameter_names()), 0)
self.datastore.io.set_parameter('par1', 1.0)
self.datastore.io.set_parameter('par2', 2.3)
names = self.datastore.io.get_parameter_names()
self.assertEqual(len(names), 2)
self.assertTrue('par1' in names)
self.assertTrue('par2' in names)
self.assertEqual(len(self.datastore.io.get_parameter_names(ensemble_member=1)), 0)
self.datastore.io.set_parameter('par3', 3.1, ensemble_member=1)
names = self.datastore.io.get_parameter_names(ensemble_member=1)
self.assertEqual(len(names), 1)
self.assertTrue('par3' in names)
def test_parameter_ensemble_size(self):
self.assertEqual(self.datastore.io.get_parameter_ensemble_size(), 0)
self.datastore.io.set_parameter('par1', 1.0)
self.datastore.io.set_parameter('par2', 2.3)
self.assertEqual(self.datastore.io.get_parameter_ensemble_size(), 1)
self.datastore.io.set_parameter('par3', 3.1, ensemble_member=1)
self.assertEqual(self.datastore.io.get_parameter_ensemble_size(), 2)
self.datastore.io.set_parameter('par4', 4.5, ensemble_member=100)
self.assertEqual(self.datastore.io.get_parameter_ensemble_size(), 101)
import logging
from unittest import TestCase
import casadi as ca
import numpy as np
from rtctools.optimization.collocated_integrated_optimization_problem import (
CollocatedIntegratedOptimizationProblem
)
from rtctools.optimization.io_mixin import IOMixin
from rtctools.optimization.modelica_mixin import ModelicaMixin
from rtctools.optimization.timeseries import Timeseries
from .data_path import data_path
logger = logging.getLogger("rtctools")
logger.setLevel(logging.WARNING)
class DummyIOMixin(IOMixin):
def read(self):
# fill with dummy data
times = np.array([-7200, -3600, 0, 3600, 7200, 9800])
self.io.set_times(times)
forecast_index = 2
self.io.set_forecast_index(forecast_index)
values = {
'constant_input': [1.1, 1.4, 0.9, 1.2, 1.5, 1.7],
'u_Min': [0.5, 0.2, 0.3, 0.1, 0.4, 0.0],
'u_Max': [2.1, 2.2, 2.0, 2.4, 2.5, 2.3],
'alias': [3.1, 3.2, 3.3, 3.4, 3.5, 3.6] # alias of 'x'
}
for key, value in values.items():
self.io.set_timeseries_values(key, np.array(value))
def write(self):
pass
class Model(DummyIOMixin, ModelicaMixin, CollocatedIntegratedOptimizationProblem):
def __init__(self, **kwargs):
kwargs["model_name"] = kwargs.get("model_name", "Model")
kwargs["input_folder"] = data_path()
kwargs["output_folder"] = data_path()
kwargs["model_folder"] = data_path()
super().__init__(**kwargs)
def objective(self, ensemble_member):
# Quadratic penalty on state 'x' at final time
xf = self.state_at("x", self.times()[-1])
f = xf ** 2
return f
def constraints(self, ensemble_member):
# No additional constraints
return []
def compiler_options(self):
compiler_options = super().compiler_options()
compiler_options["cache"] = False
return compiler_options
class TestOptimizationProblem(TestCase):
"""
Tests the default methods from OptimizationProblem
"""
def setUp(self):
self.problem = Model()
self.problem.read()
self.tolerance = 1e-6
def test_get_timeseries(self):
timeseries = self.problem.get_timeseries('constant_input')
expected_times = [-7200, -3600, 0, 3600, 7200, 9800]
self.assertTrue(np.array_equal(timeseries.times, expected_times))
expected_values = [1.1, 1.4, 0.9, 1.2, 1.5, 1.7]
self.assertTrue(np.array_equal(timeseries.values, expected_values))
timeseries_x = self.problem.get_timeseries('x')
self.assertTrue(np.array_equal(timeseries_x.times, expected_times))
expected_values = [3.1, 3.2, 3.3, 3.4, 3.5, 3.6]
self.assertTrue(np.array_equal(timeseries_x.values, expected_values))
def test_set_timeseries_with_timeseries(self):
times = self.problem.io.get_times()
values = [0.1, 1.1, 2.1, 3.1, 4.1, 5.1]
self.problem.set_timeseries('newVar', Timeseries(times, values))
actual_series = self.problem.get_timeseries('newVar')
self.assertTrue(np.array_equal(actual_series.values, values))
self.assertTrue(np.array_equal(actual_series.times, times))
# test if it was actually stored in the internal data store
actual_values = self.problem.io.get_timeseries_values('newVar')
self.assertTrue(np.array_equal(actual_values, values))
# now let's do this again but only give part of the values
values = [1.1, 2.1, 3.1]
# with check_consistency=True (default) we should get a ValueError
with self.assertRaises(ValueError):
self.problem.set_timeseries('partialSeries', Timeseries(times[-3:], values))
self.problem.set_timeseries('partialSeries', Timeseries(times[-3:], values), check_consistency=False)
actual_series = self.problem.get_timeseries('partialSeries')
self.assertTrue(np.array_equal(actual_series.times, times))
self.assertTrue(np.array_equal(actual_series.values[-3:], values))
self.assertTrue(np.all(np.isnan(actual_series.values[:-3])))
def test_set_timeseries_with_array(self):
times = self.problem.times()
values = np.ones(times.shape)
self.problem.set_timeseries('newVar', values)
actual_series = self.problem.get_timeseries('newVar')
forecast_index = self.problem.io.get_forecast_index()
self.assertTrue(np.array_equal(actual_series.values[forecast_index:], values))
self.assertTrue(np.all(np.isnan(actual_series.values[:forecast_index])))
def test_timeseries_at(self):
times = self.problem.io.get_times()
values = times.astype(dtype=np.float64) / 10
self.problem.set_timeseries('myVar', Timeseries(times, values))
actual = self.problem.timeseries_at('myVar', times[0])
self.assertEqual(actual, times[0] / 10)
actual = self.problem.timeseries_at('myVar', (times[0] + times[1]) / 2)
self.assertEqual(actual, (values[0] + values[1]) / 2)
def test_bounds(self):
bounds = self.problem.bounds()
self.assertEqual(bounds['x'], [float("-inf"), float("inf")])
min_u = bounds['u'][0]
max_u = bounds['u'][1]
expected_times = [0, 3600, 7200, 9800]
self.assertTrue(np.array_equal(min_u.times, expected_times))
self.assertTrue(np.array_equal(max_u.times, expected_times))
expected_min_values = [0.3, 0.1, 0.4, 0.0]
self.assertTrue(np.array_equal(min_u.values, expected_min_values))
expected_max_values = [2.0, 2.4, 2.5, 2.3]
self.assertTrue(np.array_equal(max_u.values, expected_max_values))
def test_history(self):
history = self.problem.history(0)
expected_times = [-7200, -3600, 0]
self.assertTrue(np.array_equal(history['x'].times, expected_times))
self.assertTrue(np.array_equal(history['constant_input'].times, expected_times))
expected_history_x = [3.1, 3.2, 3.3]
self.assertTrue(np.array_equal(history['x'].values, expected_history_x))
expected_history_u = [1.1, 1.4, 0.9]
self.assertTrue(np.array_equal(history['constant_input'].values, expected_history_u))
def test_seed(self):
# add another variable containing some nans
self.problem.io.set_timeseries_values('some_missing', np.array([np.nan, 0.1, 0.2, np.nan, 3.1, np.nan]))
self.problem.dae_variables['free_variables'].append(ca.MX().sym('some_missing'))
seed = self.problem.seed(0)
self.assertTrue(np.array_equal(seed['x'].values, [3.1, 3.2, 3.3, 3.4, 3.5, 3.6]))
self.assertTrue(np.array_equal(seed['alias'].values, [3.1, 3.2, 3.3, 3.4, 3.5, 3.6]))
self.assertTrue(np.array_equal(seed['some_missing'].values, [0, 0.1, 0.2, 0, 3.1, 0]))
def test_constant_inputs(self):
constant_inputs = self.problem.constant_inputs(0)
self.assertTrue(np.array_equal(constant_inputs['constant_input'].values, [1.1, 1.4, 0.9, 1.2, 1.5, 1.7]))
import numpy as np
from rtctools.simulation.io_mixin import IOMixin
from rtctools.simulation.simulation_problem import SimulationProblem
from test_case import TestCase
from .data_path import data_path
class DummyIOMixin(IOMixin):
def read(self):
# fill with dummy data
times = np.array([-7200, -3600, 0, 3600, 7200, 9800])
self.io.set_times(times)
forecast_index = 2
self.io.set_forecast_index(forecast_index)
values = {
'constant_input': [1.1, 1.4, 0.9, 1.2, 1.5, 1.7],
'u': [0.5, 0.2, 0.3, 0.1, 0.4, 0.0]
}
for key, value in values.items():
self.io.set_timeseries_values(key, np.array(value))
# set some parameters as well
self.io.set_parameter('k', 1.01)
self.io.set_parameter('x_start', 1.02)
def write(self):
pass
class Model(DummyIOMixin, SimulationProblem):
_force_zero_delay = True
def __init__(self, **kwargs):
super().__init__(
input_folder=data_path(),
output_folder=data_path(),
model_name="Model",
model_folder=data_path()
)
def compiler_options(self):
compiler_options = super().compiler_options()
compiler_options["cache"] = False
return compiler_options
class TestDummyIOMixin(TestCase):
def setUp(self):
self.problem = Model()
self.problem.read()
def test_initialize(self):
self.assertTrue(np.isnan(self.problem.get_var('u')))
self.assertTrue(np.isnan(self.problem.get_var('constant_input')))
self.assertTrue(np.isnan(self.problem.get_var('k')))
self.assertTrue(np.isnan(self.problem.get_var('x_start')))
self.problem.initialize()
self.assertEqual(self.problem.get_var('u'), 0.3)
self.assertEqual(self.problem.get_var('constant_input'), 0.9)
self.assertEqual(self.problem.get_var('k'), 1.01)
self.assertEqual(self.problem.get_var('x_start'), 1.02)
self.assertEqual(self.problem.get_var('x'), 1.02) # x should start equal to x_start
self.assertEqual(self.problem.get_var('alias'), 1.02) # x = alias
self.assertEqual(self.problem.get_var('w'), 0.0) # w should start at 0.0
self.assertEqual(self.problem.get_var('y'), 1.98) # x + y = 3.0
self.assertEqual(self.problem.get_var('z'), 1.0404) # z = x^2 + sin(time)
self.assertEqual(self.problem.get_var('u_out'), 1.3) # u_out = u + 1
self.assertEqual(self.problem.get_var('switched'), 1.0) # 1.0 if x > 0.5 else 2.0
self.assertEqual(self.problem.get_var('constant_output'), 0.9) # constant_output = constant_input
# todo add check for x_delayed once delay is properly implemented
for output_variable in self.problem.output_variables:
self.assertEqual(
self.problem.output[output_variable][self.problem.io.get_forecast_index()],
self.problem.get_var(output_variable)
)
def test_update(self):
self.problem.initialize()
self.problem.update(-1)
self.assertEqual(self.problem.get_var('u'), 0.1)
self.assertEqual(self.problem.get_var('constant_input'), 1.2)
self.assertEqual(self.problem.get_var('k'), 1.01)
self.assertEqual(self.problem.get_var('x_start'), 1.02)
self.assertEqual(self.problem.get_var('u_out'), 1.1) # u_out = u + 1
self.assertEqual(self.problem.get_var('switched'), 2.0) # 1.0 if x > 0.5 else 2.0
self.assertEqual(self.problem.get_var('constant_output'), 1.2) # constant_output = constant_input
for output_variable in self.problem.output_variables:
self.assertEqual(
self.problem.output[output_variable][self.problem.io.get_forecast_index() + 1],
self.problem.get_var(output_variable)
)
def test_times(self):
self.assertTrue(np.array_equal(self.problem.times(), [0, 3600, 7200, 9800]))
def test_parameters(self):
parameters = self.problem.parameters()
self.assertEqual(len(parameters), 2)
self.assertEqual(parameters['k'], 1.01)
self.assertEqual(parameters['x_start'], 1.02)
def test_timeseries_at(self):
self.assertEqual(self.problem.timeseries_at('u', 3600), 0.1) # no interpolation needed
self.assertEqual(self.problem.timeseries_at('u', 1800), (0.3 + 0.1) / 2) # interpolation
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment