...
  View open merge request
Commits (3)
  • Anne Hommelberg's avatar
    Refactor of IO Mixins · d5dd1182
    Anne Hommelberg authored
    Large refactoring of the IO mixins (PIMixin and CSVMixin for both
    SimulationProblems and OptimizationProblems.
    
    We add a general DataStoreAccessor class extended by OptimizationProblem
    and SimulationProblem, which is used to store all data read by the IO
    mixins in a DataStore object. Higher lever mixins like PIMixin/CSVMixin
    do not store their data themselves anymore. Of course users are still
    free to override get_timeseries/set_timeseries and store data themselves
    in their own mixins.
    
    For both simulation and optimization we also add an IOMixin class for
    which contains methods that were previously duplicated in their
    respective PIMixin and CSVMixin classes. There is still quite a bit of
    duplicate code between the IOMixin for optimization and the IOMixin for
    simulation (and similarly for the respective PIMixins and CSVMixins),
    but that is something a future commit will address.
    d5dd1182
  • Tjerk Vreeken's avatar
    PIMixin: Remove reimplementation of initial_state() · 4e38744c
    Tjerk Vreeken authored
    OptimizationProblem already implements an initial_state() method that
    does the same thing.
    4e38744c
  • Tjerk Vreeken's avatar
    CSVMixin: Expose all parameters read from file · 36f8834d
    Tjerk Vreeken authored
    Instead of only exposing the parameters in the Modelica model, we expose
    all parameters read from the CSV file. This makes the behavior equal to
    that of PIMixin, and as such can be moved to the common IOMixin.
    36f8834d
This diff is collapsed.
import itertools
import logging
from abc import ABCMeta, abstractmethod
from abc import ABCMeta
import casadi as ca
......@@ -78,17 +78,6 @@ class CollocatedIntegratedOptimizationProblem(OptimizationProblem, metaclass=ABC
# Call super
super().__init__(**kwargs)
@abstractmethod
def times(self, variable=None):
"""
List of time stamps for variable.
:param variable: Variable name.
:returns: A list of time stamps for the given variable.
"""
pass
def interpolation_method(self, variable=None):
"""
Interpolation method for variable.
......
This diff is collapsed.
This diff is collapsed.
......@@ -279,8 +279,14 @@ class ModelicaMixin(OptimizationProblem):
M_ = float(M_)
# We take the intersection of all provided bounds
m = max(m, m_)
M = min(M, M_)
def intersect(old_bound, new_bound, intersecter):
if isinstance(old_bound, Timeseries):
return Timeseries(old_bound.times, intersecter(old_bound.values, new_bound))
else:
return intersecter(old_bound, new_bound)
m = intersect(m, m_, np.maximum)
M = intersect(M, M_, np.minimum)
bounds[sym_name] = (m, M)
......@@ -297,7 +303,7 @@ class ModelicaMixin(OptimizationProblem):
# Load seeds
for var in itertools.chain(self.__pymoca_model.states, self.__pymoca_model.alg_states):
if var.fixed:
if var.fixed or var.symbol.name() in seed.keys():
# Values will be set from import timeseries
continue
......
......@@ -6,19 +6,23 @@ import casadi as ca
import numpy as np
from rtctools._internal.alias_tools import AliasDict, AliasRelation
from rtctools._internal.alias_tools import AliasDict
from rtctools.data.storage import DataStoreAccessor
from .timeseries import Timeseries
logger = logging.getLogger("rtctools")
class OptimizationProblem(metaclass=ABCMeta):
class OptimizationProblem(DataStoreAccessor, metaclass=ABCMeta):
"""
Base class for all optimization problems.
"""
def __init__(self, **kwargs):
# Call parent class first for default behaviour.
super().__init__(**kwargs)
self.__mixed_integer = False
def optimize(self, preprocessing: bool = True, postprocessing: bool = True,
......@@ -406,10 +410,6 @@ class OptimizationProblem(metaclass=ABCMeta):
{variable: Timeseries(np.array([self.initial_time]), np.array([state]))
for variable, state in initial_state.items()})
@abstractproperty
def alias_relation(self) -> AliasRelation:
raise NotImplementedError
def variable_is_discrete(self, variable: str) -> bool:
"""
Returns ``True`` if the provided variable is discrete.
......
This diff is collapsed.
This diff is collapsed.
import bisect
import logging
from abc import ABCMeta, abstractmethod
import numpy as np
from rtctools._internal.alias_tools import AliasDict
from rtctools._internal.caching import cached
from rtctools.simulation.simulation_problem import SimulationProblem
logger = logging.getLogger("rtctools")
class IOMixin(SimulationProblem, metaclass=ABCMeta):
"""
Base class for all IO methods of optimization problems.
"""
def __init__(self, **kwargs):
# Call parent class first for default behaviour.
super().__init__(**kwargs)
def pre(self) -> None:
# Call read method to read all input
self.read()
@abstractmethod
def read(self) -> None:
"""
Reads input data from files, storing it in the internal data store through the various set or add methods
"""
pass
def post(self) -> None:
# Call write method to write all output
self.write()
@abstractmethod
def write(self) -> None:
""""
Writes output data to files, getting the data from the data store through the various get methods
"""
pass
def initialize(self, config_file=None):
# Set up experiment
timeseries_import_times = self.io.times_sec
self.__dt = timeseries_import_times[1] - timeseries_import_times[0]
self.setup_experiment(0, timeseries_import_times[-1], self.__dt)
parameter_variables = set(self.get_parameter_variables())
logger.debug("Model parameters are {}".format(parameter_variables))
for parameter, value in self.io.parameters().items():
if parameter in parameter_variables:
logger.debug("IOMixin: Setting parameter {} = {}".format(parameter, value))
self.set_var(parameter, value)
# Load input variable names
self.__input_variables = set(self.get_input_variables().keys())
# Set input values
t_idx = bisect.bisect_left(timeseries_import_times, 0.0)
self.__set_input_variables(t_idx)
logger.debug("Model inputs are {}".format(self.__input_variables))
# Empty output
self.__output_variables = self.get_output_variables()
n_times = len(self.io.times_sec)
self.__output = AliasDict(self.alias_relation)
self.__output.update({variable: np.full(n_times, np.nan) for variable in self.__output_variables})
# Call super, which will also initialize the model itself
super().initialize(config_file)
# Extract consistent t0 values
for variable in self.__output_variables:
self.__output[variable][t_idx] = self.get_var(variable)
def __set_input_variables(self, t_idx):
for variable in self.get_variables():
if variable in self.__input_variables:
_, values = self.io.get_timeseries_sec(variable)
value = values[t_idx]
if np.isfinite(value):
self.set_var(variable, value)
else:
logger.debug("IOMixin: Found bad value {} at index [{}] in timeseries aliased to input {}"
.format(value, t_idx, variable))
def update(self, dt):
# Time step
if dt < 0:
dt = self.__dt
# Current time stamp
t = self.get_current_time()
# Get current time index
t_idx = bisect.bisect_left(self.io.times_sec, t + dt)
# Set input values
self.__set_input_variables(t_idx)
# Call super
super().update(dt)
# Extract results
for variable in self.__output_variables:
self.__output[variable][t_idx] = self.get_var(variable)
@property
def output_variables(self):
return self.__output_variables
@property
def output(self):
return self.__output
@cached
def parameters(self):
"""
Return a dictionary of parameters, including parameters in the input files files.
:returns: Dictionary of parameters
"""
# Call parent class first for default values.
parameters = super().parameters()
# Load parameters from input files (stored in internal data store)
for parameter_name, value in self.io.parameters().items():
parameters[parameter_name] = value
if logger.getEffectiveLevel() == logging.DEBUG:
for parameter_name in self.io.parameters().keys():
logger.debug("IOMixin: Read parameter {}".format(parameter_name))
return parameters
def times(self, variable=None):
"""
Return a list of all the timesteps in seconds.
:param variable: Variable name.
:returns: List of all the timesteps in seconds.
"""
idx = bisect.bisect_left(self.io.datetimes, self.io.reference_datetime)
return self.io.times_sec[idx:]
def timeseries_at(self, variable, t):
"""
Return the value of a time series at the given time.
:param variable: Variable name.
:param t: Time.
:returns: The interpolated value of the time series.
:raises: KeyError
"""
timeseries_times_sec, values = self.io.get_timeseries_sec(variable)
t_idx = bisect.bisect_left(timeseries_times_sec, t)
if timeseries_times_sec[t_idx] == t:
return values[t_idx]
else:
return np.interp(t, timeseries_times_sec, values)
This diff is collapsed.
......@@ -16,11 +16,12 @@ import pymoca.backends.casadi.api
from rtctools._internal.alias_tools import AliasDict
from rtctools._internal.caching import cached
from rtctools.data.storage import DataStoreAccessor
logger = logging.getLogger("rtctools")
class SimulationProblem:
class SimulationProblem(DataStoreAccessor):
"""
Implements the `BMI <http://csdms.colorado.edu/wiki/BMI_Description>`_ Interface.
......@@ -209,7 +210,7 @@ class SimulationProblem:
self.__do_step = ca.rootfinder("next_state", "nlpsol", self.__res_vals, options)
# Call parent class for default behaviour.
super().__init__()
super().__init__(**kwargs)
def initialize(self, config_file=None):
"""
......
import logging
from datetime import datetime, timedelta
from unittest import TestCase
import numpy as np
from pymoca.backends.casadi.alias_relation import AliasRelation
from rtctools.data.storage import DataStoreAccessor
logger = logging.getLogger("rtctools")
logger.setLevel(logging.WARNING)
class DummyDataStore(DataStoreAccessor):
@property
def alias_relation(self):
return AliasRelation()
class TestDummyDataStore(TestCase):
def setUp(self):
self.datastore = DummyDataStore(input_folder='dummyInput', output_folder='dummyOutput')
self.tolerance = 1e-6
def test_times(self):
# Set a reference datetime
ref_datetime = datetime(2000, 1, 1)
self.datastore.io.reference_datetime = ref_datetime
expected_times_sec = np.array([-7200, -3600, 0, 3600, 7200, 9800], dtype=np.float64)
expected_datetimes = [ref_datetime + timedelta(seconds=x) for x in expected_times_sec]
self.datastore.io.set_timeseries(expected_datetimes, np.zeros((6,)), 'dummyVar')
actual_datetimes = self.datastore.io.datetimes
self.assertEqual(actual_datetimes, expected_datetimes)
actual_times = self.datastore.io.times_sec
self.assertTrue(np.array_equal(actual_times, expected_times_sec))
def test_timeseries(self):
# expect a KeyError when getting a timeseries that has not been set
with self.assertRaises(KeyError):
self.datastore.io.get_timeseries('someNoneExistentVariable')
# Set a reference datetime
ref_datetime = datetime(2000, 1, 1)
self.datastore.io.reference_datetime = ref_datetime
# Make a timeseries
times_sec = np.array([-3600, 0, 7200], dtype=np.float64)
datetimes = [ref_datetime + timedelta(seconds=x) for x in times_sec]
expected_values = np.array([3.1, 2.4, 2.5])
self.datastore.io.set_timeseries(datetimes, expected_values, 'myNewVariable')
_, actual_values = self.datastore.io.get_timeseries('myNewVariable')
self.assertTrue(np.array_equal(actual_values, expected_values))
# Also check using the seconds interface
actual_times, actual_values = self.datastore.io.get_timeseries_sec('myNewVariable')
self.assertTrue(np.array_equal(actual_times, times_sec))
self.assertTrue(np.array_equal(actual_values, expected_values))
# Check that we can no longer overwrite the reference datetime,
# because we called get_timeseries_sec/set_timeseries_sec.
with self.assertRaisesRegex(RuntimeError, "Cannot change reference datetime after times in seconds"):
self.datastore.io.reference_datetime = datetime(2010, 1, 1)
# expect a KeyError when getting timeseries for an ensemble member that doesn't exist
with self.assertRaisesRegex(KeyError, "ensemble_member 1 does not exist"):
self.datastore.io.get_timeseries('myNewVariable', 1)
# Set timeseries with times in seconds
expected_values = np.array([1.1, 1.4, 1.5])
self.datastore.io.set_timeseries_sec(times_sec, expected_values, 'ensembleVariable', ensemble_member=1)
with self.assertRaises(KeyError):
self.datastore.io.get_timeseries('ensembleVariable', 0)
_, actual_values = self.datastore.io.get_timeseries('ensembleVariable', 1)
self.assertTrue(np.array_equal(actual_values, expected_values))
# expect a warning when overwriting a timeseries with check_duplicates=True
new_values = np.array([2.1, 1.1, 0.1])
with self.assertLogs(logger, level='WARN') as cm:
self.datastore.io.set_timeseries(datetimes, new_values, 'myNewVariable', check_duplicates=True)
self.assertEqual(cm.output,
['WARNING:rtctools:Time series values for ensemble member 0 and variable '
'myNewVariable set twice. Overwriting old values.'])
_, actual_values = self.datastore.io.get_timeseries('myNewVariable')
self.assertTrue(np.array_equal(actual_values, new_values))
# By default we expect no warning when verwriting old values
newest_values = np.array([-0.4, 2.14, 29.1])
with self.assertLogs(logger, level='WARN') as cm:
self.datastore.io.set_timeseries(datetimes, newest_values, 'myNewVariable')
self.assertEqual(cm.output, [])
logger.warning('All is well') # if no log message occurs, assertLogs will throw an AssertionError
_, actual_values = self.datastore.io.get_timeseries('myNewVariable')
self.assertTrue(np.array_equal(actual_values, newest_values))
def test_parameters(self):
# expect a KeyError when getting a parameter that has not been set
with self.assertRaises(KeyError):
self.datastore.io.get_parameter('someNoneExistentParameter')
self.datastore.io.set_parameter('myNewParameter', 1.4)
self.assertEqual(self.datastore.io.get_parameter('myNewParameter'), 1.4)
self.assertEqual(self.datastore.io.parameters()['myNewParameter'], 1.4)
# expect a KeyError when getting parameters for an ensemble member that doesn't exist
with self.assertRaises(KeyError):
self.datastore.io.get_parameter('myNewParameter', 1)
with self.assertRaises(KeyError):
self.datastore.io.parameters(1)['myNewParameter']
self.datastore.io.set_parameter('ensembleParameter', 1.2, ensemble_member=1)
with self.assertRaises(KeyError):
self.datastore.io.get_parameter('ensembleParameter', 0)
with self.assertRaises(KeyError):
self.datastore.io.parameters(0)['ensembleParameter']
self.assertEqual(self.datastore.io.get_parameter('ensembleParameter', 1), 1.2)
self.assertEqual(self.datastore.io.parameters(1)['ensembleParameter'], 1.2)
# expect a warning when overwriting a parameter with check_duplicates=True
with self.assertLogs(logger, level='WARN') as cm:
self.datastore.io.set_parameter('myNewParameter', 2.5, check_duplicates=True)
self.assertEqual(cm.output,
['WARNING:rtctools:Attempting to set parameter value for ensemble member 0 '
'and name myNewParameter twice. Using new value of 2.5.'])
self.assertEqual(self.datastore.io.get_parameter('myNewParameter'), 2.5)
self.assertEqual(self.datastore.io.parameters()['myNewParameter'], 2.5)
# By default we expect no warning when overwriting old values
with self.assertLogs(logger, level='WARN') as cm:
self.datastore.io.set_parameter('myNewParameter', 2.2)
self.assertEqual(cm.output, [])
logger.warning('All is well') # if no log message occurs, assertLogs will throw an AssertionError
self.assertEqual(self.datastore.io.get_parameter('myNewParameter'), 2.2)
self.assertEqual(self.datastore.io.parameters()['myNewParameter'], 2.2)
......@@ -3,7 +3,7 @@ import logging
import numpy as np
from rtctools.optimization.collocated_integrated_optimization_problem import (
CollocatedIntegratedOptimizationProblem,
CollocatedIntegratedOptimizationProblem
)
from rtctools.optimization.csv_mixin import CSVMixin
from rtctools.optimization.modelica_mixin import ModelicaMixin
......@@ -105,7 +105,7 @@ class TestCSVMixin(TestCase):
self.assertAlmostEqual(a, b, self.tolerance)
class TestPIMixinEnsemble(TestCase):
class TestCSVMixinEnsemble(TestCase):
def setUp(self):
self.problem = ModelEnsemble()
self.problem.optimize()
......
import bisect
import logging
from datetime import datetime, timedelta
from unittest import TestCase
import casadi as ca
import numpy as np
from rtctools.optimization.collocated_integrated_optimization_problem import (
CollocatedIntegratedOptimizationProblem
)
from rtctools.optimization.io_mixin import IOMixin
from rtctools.optimization.modelica_mixin import ModelicaMixin
from rtctools.optimization.timeseries import Timeseries
from .data_path import data_path
logger = logging.getLogger("rtctools")
logger.setLevel(logging.WARNING)
class DummyIOMixin(IOMixin):
def read(self):
# fill with dummy data
ref_datetime = datetime(2000, 1, 1)
self.io.reference_datetime = ref_datetime
times_sec = [-7200, -3600, 0, 3600, 7200, 9800]
datetimes = [ref_datetime + timedelta(seconds=x) for x in times_sec]
values = {
'constant_input': [1.1, 1.4, 0.9, 1.2, 1.5, 1.7],
'u_Min': [0.5, 0.2, 0.3, 0.1, 0.4, 0.0],
'u_Max': [2.1, 2.2, 2.0, 2.4, 2.5, 2.3],
'alias': [3.1, 3.2, 3.3, 3.4, 3.5, 3.6] # alias of 'x'
}
for key, value in values.items():
self.io.set_timeseries(datetimes, np.array(value), key)
def write(self):
pass
class Model(DummyIOMixin, ModelicaMixin, CollocatedIntegratedOptimizationProblem):
def __init__(self, **kwargs):
kwargs["model_name"] = kwargs.get("model_name", "Model")
kwargs["input_folder"] = data_path()
kwargs["output_folder"] = data_path()
kwargs["model_folder"] = data_path()
super().__init__(**kwargs)
def objective(self, ensemble_member):
# Quadratic penalty on state 'x' at final time
xf = self.state_at("x", self.times()[-1])
f = xf ** 2
return f
def constraints(self, ensemble_member):
# No additional constraints
return []
def compiler_options(self):
compiler_options = super().compiler_options()
compiler_options["cache"] = False
return compiler_options
class TestOptimizationProblem(TestCase):
"""
Tests the default methods from OptimizationProblem
"""
def setUp(self):
self.problem = Model()
self.problem.read()
self.tolerance = 1e-6
def test_get_timeseries(self):
timeseries = self.problem.get_timeseries('constant_input')
expected_times = [-7200, -3600, 0, 3600, 7200, 9800]
self.assertTrue(np.array_equal(timeseries.times, expected_times))
expected_values = [1.1, 1.4, 0.9, 1.2, 1.5, 1.7]
self.assertTrue(np.array_equal(timeseries.values, expected_values))
timeseries_x = self.problem.get_timeseries('x')
self.assertTrue(np.array_equal(timeseries_x.times, expected_times))
expected_values = [3.1, 3.2, 3.3, 3.4, 3.5, 3.6]
self.assertTrue(np.array_equal(timeseries_x.values, expected_values))
def test_set_timeseries_with_timeseries(self):
times = self.problem.io.times_sec
values = [0.1, 1.1, 2.1, 3.1, 4.1, 5.1]
self.problem.set_timeseries('newVar', Timeseries(times, values))
actual_series = self.problem.get_timeseries('newVar')
self.assertTrue(np.array_equal(actual_series.values, values))
self.assertTrue(np.array_equal(actual_series.times, times))
# test if it was actually stored in the internal data store
_, actual_values = self.problem.io.get_timeseries_sec('newVar')
self.assertTrue(np.array_equal(actual_values, values))
# now let's do this again but only give part of the values
values = [1.1, 2.1, 3.1]
# with check_consistency=True (default) we should get a ValueError
with self.assertRaises(ValueError):
self.problem.set_timeseries('partialSeries', Timeseries(times[-3:], values))
self.problem.set_timeseries('partialSeries', Timeseries(times[-3:], values), check_consistency=False)
actual_series = self.problem.get_timeseries('partialSeries')
self.assertTrue(np.array_equal(actual_series.times, times))
self.assertTrue(np.array_equal(actual_series.values[-3:], values))
self.assertTrue(np.all(np.isnan(actual_series.values[:-3])))
def test_set_timeseries_with_array(self):
times = self.problem.times()
values = np.ones(times.shape)
self.problem.set_timeseries('newVar', values)
actual_series = self.problem.get_timeseries('newVar')
forecast_index = bisect.bisect_left(self.problem.io.datetimes, self.problem.io.reference_datetime)
self.assertTrue(np.array_equal(actual_series.values[forecast_index:], values))
self.assertTrue(np.all(np.isnan(actual_series.values[:forecast_index])))
def test_timeseries_at(self):
times = self.problem.io.times_sec
values = times.astype(dtype=np.float64) / 10
self.problem.set_timeseries('myVar', Timeseries(times, values))
actual = self.problem.timeseries_at('myVar', times[0])
self.assertEqual(actual, times[0] / 10)
actual = self.problem.timeseries_at('myVar', (times[0] + times[1]) / 2)
self.assertEqual(actual, (values[0] + values[1]) / 2)
def test_bounds(self):
bounds = self.problem.bounds()
self.assertEqual(bounds['x'], [float("-inf"), float("inf")])
min_u = bounds['u'][0]
max_u = bounds['u'][1]
expected_times = [0, 3600, 7200, 9800]
self.assertTrue(np.array_equal(min_u.times, expected_times))
self.assertTrue(np.array_equal(max_u.times, expected_times))
expected_min_values = [0.3, 0.1, 0.4, 0.0]
self.assertTrue(np.array_equal(min_u.values, expected_min_values))
expected_max_values = [2.0, 2.4, 2.5, 2.3]
self.assertTrue(np.array_equal(max_u.values, expected_max_values))
def test_history(self):
history = self.problem.history(0)
expected_times = [-7200, -3600, 0]
self.assertTrue(np.array_equal(history['x'].times, expected_times))
self.assertTrue(np.array_equal(history['constant_input'].times, expected_times))
expected_history_x = [3.1, 3.2, 3.3]
self.assertTrue(np.array_equal(history['x'].values, expected_history_x))
expected_history_u = [1.1, 1.4, 0.9]
self.assertTrue(np.array_equal(history['constant_input'].values, expected_history_u))
def test_seed(self):
# add another variable containing some nans
self.problem.io.set_timeseries_sec(self.problem.io.times_sec,
np.array([np.nan, 0.1, 0.2, np.nan, 3.1, np.nan]),
'some_missing')
self.problem.dae_variables['free_variables'].append(ca.MX().sym('some_missing'))
seed = self.problem.seed(0)
self.assertTrue(np.array_equal(seed['x'].values, [3.1, 3.2, 3.3, 3.4, 3.5, 3.6]))
self.assertTrue(np.array_equal(seed['alias'].values, [3.1, 3.2, 3.3, 3.4, 3.5, 3.6]))
self.assertTrue(np.array_equal(seed['some_missing'].values, [0, 0.1, 0.2, 0, 3.1, 0]))
def test_constant_inputs(self):
constant_inputs = self.problem.constant_inputs(0)
self.assertTrue(np.array_equal(constant_inputs['constant_input'].values, [1.1, 1.4, 0.9, 1.2, 1.5, 1.7]))
import bisect
import numpy as np
from rtctools.optimization.collocated_integrated_optimization_problem import (
......@@ -90,26 +92,28 @@ class TestPIMixin(TestCase):
self.assertAlmostLessThan(self.results["u"], 2, self.tolerance)
def test_interpolation(self):
t_idx = bisect.bisect_left(self.problem.io.times_sec, 0.0)
t = (
self.problem.get_timeseries("x", 0).times[
self.problem.get_forecast_index() + 1
t_idx + 1
]
+ (
self.problem.get_timeseries("x", 0).times[
self.problem.get_forecast_index() + 2
t_idx + 2
]
- self.problem.get_timeseries("x", 0).times[
self.problem.get_forecast_index() + 1
t_idx + 1
]
)
/ 2
)
x_ref = (
self.problem.get_timeseries("x", 0).values[
self.problem.get_forecast_index() + 1
t_idx + 1
]
+ self.problem.get_timeseries("x", 0).values[
self.problem.get_forecast_index() + 2
t_idx + 2
]
) / 2
self.assertAlmostEqual(
......
import bisect
from datetime import datetime, timedelta
import numpy as np
from rtctools.simulation.io_mixin import IOMixin
from rtctools.simulation.simulation_problem import SimulationProblem
from test_case import TestCase
from .data_path import data_path
class DummyIOMixin(IOMixin):
def read(self):
# fill with dummy data
ref_datetime = datetime(2000, 1, 1)
self.io.reference_datetime = ref_datetime
times_sec = [-7200, -3600, 0, 3600, 7200, 9800]
datetimes = [ref_datetime + timedelta(seconds=x) for x in times_sec]
values = {
'constant_input': [1.1, 1.4, 0.9, 1.2, 1.5, 1.7],
'u': [0.5, 0.2, 0.3, 0.1, 0.4, 0.0]
}
for key, value in values.items():
self.io.set_timeseries(datetimes, np.array(value), key)
# set some parameters as well
self.io.set_parameter('k', 1.01)
self.io.set_parameter('x_start', 1.02)
def write(self):
pass
class Model(DummyIOMixin, SimulationProblem):
_force_zero_delay = True
def __init__(self, **kwargs):
super().__init__(
input_folder=data_path(),
output_folder=data_path(),
model_name="Model",
model_folder=data_path()
)
def compiler_options(self):
compiler_options = super().compiler_options()
compiler_options["cache"] = False
return compiler_options
class TestDummyIOMixin(TestCase):
def setUp(self):
self.problem = Model()
self.problem.read()
def test_initialize(self):
self.assertTrue(np.isnan(self.problem.get_var('u')))
self.assertTrue(np.isnan(self.problem.get_var('constant_input')))
self.assertTrue(np.isnan(self.problem.get_var('k')))
self.assertTrue(np.isnan(self.problem.get_var('x_start')))
self.problem.initialize()
self.assertEqual(self.problem.get_var('u'), 0.3)
self.assertEqual(self.problem.get_var('constant_input'), 0.9)
self.assertEqual(self.problem.get_var('k'), 1.01)
self.assertEqual(self.problem.get_var('x_start'), 1.02)
self.assertEqual(self.problem.get_var('x'), 1.02) # x should start equal to x_start
self.assertEqual(self.problem.get_var('alias'), 1.02) # x = alias
self.assertEqual(self.problem.get_var('w'), 0.0) # w should start at 0.0
self.assertEqual(self.problem.get_var('y'), 1.98) # x + y = 3.0
self.assertEqual(self.problem.get_var('z'), 1.0404) # z = x^2 + sin(time)
self.assertEqual(self.problem.get_var('u_out'), 1.3) # u_out = u + 1
self.assertEqual(self.problem.get_var('switched'), 1.0) # 1.0 if x > 0.5 else 2.0
self.assertEqual(self.problem.get_var('constant_output'), 0.9) # constant_output = constant_input
# todo add check for x_delayed once delay is properly implemented
t_idx = bisect.bisect_left(self.problem.io.datetimes, self.problem.io.reference_datetime)
for output_variable in self.problem.output_variables:
self.assertEqual(
self.problem.output[output_variable][t_idx],
self.problem.get_var(output_variable)
)
def test_update(self):
self.problem.initialize()
self.problem.update(-1)
self.assertEqual(self.problem.get_var('u'), 0.1)
self.assertEqual(self.problem.get_var('constant_input'), 1.2)
self.assertEqual(self.problem.get_var('k'), 1.01)
self.assertEqual(self.problem.get_var('x_start'), 1.02)
self.assertEqual(self.problem.get_var('u_out'), 1.1) # u_out = u + 1
self.assertEqual(self.problem.get_var('switched'), 2.0) # 1.0 if x > 0.5 else 2.0
self.assertEqual(self.problem.get_var('constant_output'), 1.2) # constant_output = constant_input
t_idx = bisect.bisect_left(self.problem.io.datetimes, self.problem.io.reference_datetime)
for output_variable in self.problem.output_variables:
self.assertEqual(
self.problem.output[output_variable][t_idx + 1],
self.problem.get_var(output_variable)
)
def test_times(self):
self.assertTrue(np.array_equal(self.problem.times(), [0, 3600, 7200, 9800]))
def test_parameters(self):
parameters = self.problem.parameters()
self.assertEqual(len(parameters), 2)
self.assertEqual(parameters['k'], 1.01)
self.assertEqual(parameters['x_start'], 1.02)
def test_timeseries_at(self):
self.assertEqual(self.problem.timeseries_at('u', 3600), 0.1) # no interpolation needed
self.assertEqual(self.problem.timeseries_at('u', 1800), (0.3 + 0.1) / 2) # interpolation