...
  View open merge request
Commits (6)
  • Anne Hommelberg's avatar
    Add general DataStore and IOMixin classes · 00c7de3f
    Anne Hommelberg authored
    Large refactoring of all IO mixins (PIMixin and CSVMixin).
    Adds a general DataStore class extended by OptimizationProblem
    and SimulationProblem, which is used to store all data read by
    the IO mixins.
    Adds an optimization IOMixin class which contains methods that
    were previously duplicated in the optimization PIMixin and
    CSVMixin.
    Adds a simulation IOMixin class which does the same for the
    simulation PIMixin and IOMixin.
    00c7de3f
  • Anne Hommelberg's avatar
    Fix unit tests · f2aae576
    Anne Hommelberg authored
    The DataStore should allow the input and output folder to not be
    specified, in case the user chooses to not use any of the provided
    IOMixins.
    f2aae576
  • Anne Hommelberg's avatar
    Add io accessor for all DataStore access · 8f743998
    Anne Hommelberg authored
    To avoid method naming conflicts, all access methods for
    the internal data store have been put into an self.io accessor.
    For example, call self.io.get_times to get the times stored in the
    internal data store.
    8f743998
  • Anne Hommelberg's avatar
    Fix duplicate checks in DataStore · bc0b8320
    Anne Hommelberg authored
    To keep backwards compatibility with the old duplicate parameters
    check in PIMixin, the DataStore should overwrite the old values with
    new values when duplicates occur. If check_duplicates is True, a
    warning will be given each time this happens.
    bc0b8320
  • Anne Hommelberg's avatar
    Add unit tests for new io classes · eb5e4b7f
    Anne Hommelberg authored
    Adds unit tests for the two IOMixin and DataStore classes.
    Also fixes some minor bugs in the set_timeseries method.
    Also adds the get_parameter_ensemble_size method to make parameter
    access in the DataStore consistent with access to the stored time
    series.
    eb5e4b7f
  • Anne Hommelberg's avatar
    Add NetCDFMixin · b8da58b3
    Anne Hommelberg authored
    Adds a NetCDFMixin to import and export data to
    and from NetCDF files.
    b8da58b3
This diff is collapsed.
import logging
from abc import ABCMeta, abstractmethod
from datetime import datetime, timedelta
from typing import Iterable, Set, Union
import numpy as np
from rtctools._internal.alias_tools import AliasDict, AliasRelation
logger = logging.getLogger("rtctools")
class DataStoreAccessor(metaclass=ABCMeta):
"""
Base class for all problems.
Adds an internal data store where timeseries and parameters can be stored.
Access to the internal data store is always done through the io accessor.
:cvar timeseries_import_basename:
Import file basename. Default is ``timeseries_import``.
:cvar timeseries_export_basename:
Export file basename. Default is ``timeseries_export``.
"""
#: Import file basename
timeseries_import_basename = 'timeseries_import'
#: Export file basename
timeseries_export_basename = 'timeseries_export'
def __init__(self, **kwargs):
# Save arguments
self._input_folder = kwargs['input_folder'] if 'input_folder' in kwargs else 'input'
self._output_folder = kwargs['output_folder'] if 'output_folder' in kwargs else 'output'
if logger.getEffectiveLevel() == logging.DEBUG:
logger.debug("Expecting input files to be located in '" + self._input_folder + "'.")
logger.debug("Writing output files to '" + self._output_folder + "'.")
self.io = DataStore(self)
@property
@abstractmethod
def alias_relation(self) -> AliasRelation:
raise NotImplementedError
@property
def initial_time(self) -> float:
"""
The initial time in seconds.
"""
times = self.io.get_times()
if times is None:
raise RuntimeError("Attempting to access initial_time before setting times")
return times[self.io.get_forecast_index()]
class DataStore(metaclass=ABCMeta):
"""
DataStore class used by the DataStoreAccessor.
Contains all methods needed to access the internal data store.
"""
def __init__(self, accessor):
self.__accessor = accessor
# Should all be set by subclass via setters
self.__forecast_index = 0
self.__timeseries_times_sec = None
self.__timeseries_values = []
self.__parameters = []
# todo add support for storing initial states
# self.__initial_state = []
def get_times(self) -> np.ndarray:
""""
Returns the timeseries times in seconds.
:return timseries times in seconds, or None if there has been no call to set_times
"""
return self.__timeseries_times_sec
def set_times(self, times_in_sec: np.ndarray) -> None:
"""
Sets the timeseries times in seconds in the internal data store.
Must be called in .read() to store the times in the IOMixin before calling set_timeseries_values
to store the values for an input timeseries.
:param times_in_sec: np.ndarray containing the times in seconds
"""
if self.__timeseries_times_sec is not None and not np.array_equal(times_in_sec, self.__timeseries_times_sec):
raise RuntimeError("Attempting to overwrite the input time series times with different values. "
"Please ensure all input time series have the same times.")
self.__timeseries_times_sec = times_in_sec
def set_timeseries_values(self,
variable: str,
values: np.ndarray,
ensemble_member: int = 0,
check_duplicates: bool = True) -> None:
"""
Stores input time series values in the internal data store.
:param variable: Variable name.
:param values: The values to be stored.
:param ensemble_member: The ensemble member index.
:param check_duplicates: If True, a warning will be given when overwriting values.
If False, existing values can be silently overwritten with new values.
"""
if self.__timeseries_times_sec is None:
raise RuntimeError("First call set_times before calling set_timeseries_values")
if len(self.__timeseries_times_sec) != len(values):
raise ValueError("Length of values ({}) must be the same as length of times ({})"
.format(len(values), len(self.__timeseries_times_sec)))
while ensemble_member >= len(self.__timeseries_values):
self.__timeseries_values.append(AliasDict(self.__accessor.alias_relation))
if check_duplicates and variable in self.__timeseries_values[ensemble_member].keys():
logger.warning("Time series values for ensemble member {} and variable {} set twice. "
"Overwriting old values.".format(ensemble_member, variable))
self.__timeseries_values[ensemble_member][variable] = values
def get_timeseries_values(self, variable: str, ensemble_member: int = 0) -> np.ndarray:
"""
Looks up the time series values in the internal data store.
"""
if ensemble_member >= len(self.__timeseries_values):
raise KeyError("ensemble_member {} does not exist".format(ensemble_member))
return self.__timeseries_values[ensemble_member][variable]
def get_variables(self, ensemble_member: int = 0) -> Set:
"""
Returns a set of variables for which timeseries values are stored in the internal data store
:param ensemble_member: The ensemble member index.
"""
if ensemble_member >= len(self.__timeseries_values):
return set()
return self.__timeseries_values[ensemble_member].keys()
def get_ensemble_size(self):
"""
Returns the number of ensemble members for which timeseries are stored in the internal data store
"""
return len(self.__timeseries_values)
def get_forecast_index(self) -> int:
""""
Looks up the forecast index from the internal data store
:return: Current forecast index, values before this index will be considered "history".
"""
return self.__forecast_index
def set_forecast_index(self, forecast_index: int) -> None:
"""
Sets the forecast index in the internal data store.
Values (and times) before this index will be considered "history"
:param forecast_index: New forecast index.
"""
self.__forecast_index = forecast_index
def set_parameter(self,
parameter_name: str,
value: float,
ensemble_member: int = 0,
check_duplicates: bool = True) -> None:
"""
Stores the parameter value in the internal data store.
:param parameter_name: Parameter name.
:param value: The values to be stored.
:param ensemble_member: The ensemble member index.
:param check_duplicates: If True, a warning will be given when overwriting values.
If False, existing values can be silently overwritten with new values.
"""
while ensemble_member >= len(self.__parameters):
self.__parameters.append(AliasDict(self.__accessor.alias_relation))
if check_duplicates and parameter_name in self.__parameters[ensemble_member].keys():
logger.warning("Attempting to set parameter value for ensemble member {} and name {} twice. "
"Using new value of {}.".format(ensemble_member, parameter_name, value))
self.__parameters[ensemble_member][parameter_name] = value
def get_parameter(self, parameter_name: str, ensemble_member: int = 0) -> float:
"""
Looks up the parameter value in the internal data store.
"""
if ensemble_member >= len(self.__parameters):
raise KeyError("ensemble_member {} does not exist".format(ensemble_member))
return self.__parameters[ensemble_member][parameter_name]
def get_parameter_names(self, ensemble_member: int = 0) -> Set:
"""
Returns a set of variables for which timeseries values are stored in the internal data store
:param ensemble_member: The ensemble member index.
"""
if ensemble_member >= len(self.__parameters):
return set()
return self.__parameters[ensemble_member].keys()
def get_parameter_ensemble_size(self):
"""
Returns the number of ensemble members for which parameters are stored in the internal data store
"""
return len(self.__parameters)
@staticmethod
def datetime_to_sec(d: Union[Iterable[datetime], datetime], t0: datetime) -> Union[Iterable[float], float]:
"""
Returns the date/timestamps in seconds since t0.
:param d: Iterable of datetimes or a single datetime object.
:param t0: Reference datetime.
"""
if hasattr(d, '__iter__'):
return np.array([(t - t0).total_seconds() for t in d])
else:
return (d - t0).total_seconds()
@staticmethod
def sec_to_datetime(s: Union[Iterable[float], float], t0: datetime) -> Union[Iterable[datetime], datetime]:
"""
Return the date/timestamps in seconds since t0 as datetime objects.
:param s: Iterable of ints or a single int (number of seconds before or after t0).
:param t0: Reference datetime.
"""
if hasattr(s, '__iter__'):
return [t0 + timedelta(seconds=t) for t in s]
else:
return t0 + timedelta(seconds=s)
import itertools
import logging
from abc import ABCMeta, abstractmethod
from abc import ABCMeta
import casadi as ca
......@@ -78,17 +78,6 @@ class CollocatedIntegratedOptimizationProblem(OptimizationProblem, metaclass=ABC
# Call super
super().__init__(**kwargs)
@abstractmethod
def times(self, variable=None):
"""
List of time stamps for variable.
:param variable: Variable name.
:returns: A list of time stamps for the given variable.
"""
pass
def interpolation_method(self, variable=None):
"""
Interpolation method for variable.
......
This diff is collapsed.
This diff is collapsed.
......@@ -279,8 +279,14 @@ class ModelicaMixin(OptimizationProblem):
M_ = float(M_)
# We take the intersection of all provided bounds
m = max(m, m_)
M = min(M, M_)
def intersect(old_bound, new_bound, intersecter):
if isinstance(old_bound, Timeseries):
return Timeseries(old_bound.times, intersecter(old_bound.values, new_bound))
else:
return intersecter(old_bound, new_bound)
m = intersect(m, m_, np.maximum)
M = intersect(M, M_, np.minimum)
bounds[sym_name] = (m, M)
......@@ -297,7 +303,7 @@ class ModelicaMixin(OptimizationProblem):
# Load seeds
for var in itertools.chain(self.__pymoca_model.states, self.__pymoca_model.alg_states):
if var.fixed:
if var.fixed or var.symbol.name() in seed.keys():
# Values will be set from import timeseries
continue
......
import logging
import os
import rtctools.data.netcdf as netcdf
from rtctools.data import rtc
from rtctools.optimization.io_mixin import IOMixin
logger = logging.getLogger("rtctools")
# todo add support for ensembles
class NetCDFMixin(IOMixin):
"""
Adds NetCDF I/O to your optimization problem.
During preprocessing, a file named timeseries_import.nc is read from the ``input`` subfolder.
During postprocessing a file named timeseries_export.nc is written to the ``output`` subfolder.
Both the input and output nc files are expected to follow the FEWS format for scalar data in a Netcdf file, i.e.:
- They must contain a variable with the station id's (location id's) which can be recognized by the attribute
'cf_role' set to 'timeseries_id'.
- They must contain a time variable with attributes 'standard_name' = 'time' and 'axis' = 'T'
From the input file, all 2d variables with dimensions equal to the station id's and time variable are read.
To determine the rtc-tools variable name, the NetCDF mixin uses the station id (location id) and name of the
timeseries variable in the file (parameter). An rtcDataConfig.xml file can be given in the input folder to
configure variable names for specific location and parameter combinations. If this file is present, and contains
a configured variable name for a read timeseries, this variable name will be used. If the file is present, but does
not contain a configured variable name, a default variable name is constructed and a warning is given to alert the
user that the current rtcDataConfig may contain a mistake. To suppress this warning if this is intentional, set the
check_missing_variable_names attribute to False. Finally, if no file is present, the default variable name will
always be used, and no warnings will be given. With debug logging enabled, the NetCDF mixin will report the chosen
variable name for each location and parameter combination.
To construct the default variable name, the station id is concatenated with the name of the variable in the NetCDF
file, separted by the location_parameter_delimeter (set to a double underscore - '__' - by default). For example,
if a NetCDF file contains two stations 'loc_1' and 'loc_2', and a timeseries variable called 'water_level', this
will result in two rtc-tools variables called 'loc_1__water_level' and 'loc_2__water_level' (with the default
location_parameter_delimiter of '__').
:cvar location_parameter_delimiter:
Delimiter used between location and parameter id when constructing the variable name.
:cvar check_missing_variable_names:
Warn if an rtcDataConfig.xml file is given but does not contain a variable name for a read timeseries.
Default is ``True``
:cvar netcdf_validate_timeseries:
Check consistency of timeseries. Default is ``True``
"""
#: Delimiter used between location and parameter id when constructing the variable name.
location_parameter_delimiter = '__'
#: Warn if an rtcDataConfig.xml file is given but does not contain a variable name for a read timeseries.
check_missing_variable_names = True
#: Check consistency of timeseries.
netcdf_validate_timeseries = True
def __init__(self, **kwargs):
# call parent class for default behaviour
super().__init__(**kwargs)
path = os.path.join(self._input_folder, "rtcDataConfig.xml")
self.__data_config = rtc.DataConfig(self._input_folder) if os.path.isfile(path) else None
def read(self):
# Call parent class first for default behaviour
super().read()
dataset = netcdf.ImportDataset(self._input_folder, self.timeseries_import_basename)
# convert and store the import times
self.__import_datetimes = dataset.read_import_times()
times = self.io.datetime_to_sec(self.__import_datetimes, self.__import_datetimes[self.io.get_forecast_index()])
self.io.set_times(times)
if self.netcdf_validate_timeseries:
# check if strictly increasing
for i in range(len(times) - 1):
if times[i] >= times[i + 1]:
raise Exception('NetCDFMixin: Time stamps must be strictly increasing.')
self.__dt = times[1] - times[0] if len(times) >= 2 else 0
for i in range(len(times) - 1):
if times[i + 1] - times[i] != self.__dt:
self.__dt = None
break
# store the station data for later use
self.__stations = dataset.read_station_data()
# read all available timeseries from the dataset
timeseries_var_keys = dataset.find_timeseries_variables()
# todo add support for ensembles
for parameter in timeseries_var_keys:
for i, location_id in enumerate(self.__stations.station_ids):
default_name = location_id + self.location_parameter_delimiter + parameter
if self.__data_config is not None:
try:
name = self.__data_config.parameter(parameter, location_id)
except KeyError:
if self.check_missing_variable_names:
logger.warning('No configured variable name found in rtcDataConfig.xml for location id "{}"'
' and parameter id "{}", using default variable name "{}" instead. '
'(To suppress this warning set check_missing_variable_names to False.)'
.format(location_id, parameter, default_name))
name = default_name
else:
name = default_name
values = dataset.read_timeseries_values(i, parameter)
self.io.set_timeseries_values(name, values)
logger.debug('Read timeseries data for location id "{}" and parameter "{}", '
'stored under variable name "{}"'
.format(location_id, parameter, name))
logger.debug("NetCDFMixin: Read timeseries")
def write(self):
dataset = netcdf.ExportDataset(self._output_folder, self.timeseries_export_basename)
times = self.times()
forecast_index = self.io.get_forecast_index()
dataset.write_times(times, self.initial_time, self.__import_datetimes[forecast_index])
output_variables = [sym.name() for sym in self.output_variables]
output_location_parameter_ids = {var_name: self.extract_station_id(var_name) for var_name in output_variables}
output_station_ids = {loc_par[0] for loc_par in output_location_parameter_ids.values()}
dataset.write_station_data(self.__stations, output_station_ids)
output_parameter_ids = {loc_par[1] for loc_par in output_location_parameter_ids.values()}
dataset.create_variables(output_parameter_ids)
for ensemble_member in range(self.ensemble_size):
results = self.extract_results(ensemble_member)
for var_name in output_variables:
# determine the output values
try:
values = results[var_name]
if len(values) != len(times):
values = self.interpolate(
times, self.times(var_name), values, self.interpolation_method(var_name))
except KeyError:
try:
ts = self.get_timeseries(var_name, ensemble_member)
if len(ts.times) != len(times):
values = self.interpolate(
times, ts.times, ts.values)
else:
values = ts.values
except KeyError:
logger.error(
'NetCDFMixin: Output requested for non-existent variable {}. '
'Will not be in output file.'.format(var_name))
continue
# determine where to put this output
location_parameter_id = output_location_parameter_ids[var_name]
location_id = location_parameter_id[0]
parameter_id = location_parameter_id[1]
dataset.write_output_values(location_id, parameter_id, values)
dataset.close()
def extract_station_id(self, variable_name: str) -> tuple:
"""
Returns the station id corresponding to the given RTC-Tools variable name.
:param variable_name: The name of the RTC-Tools variable
:return: the station id
"""
try:
return self.__data_config.pi_variable_ids(variable_name)[:2]
except KeyError:
return tuple(variable_name.split(self.location_parameter_delimiter))
@property
def equidistant(self):
return self.__dt is not None
......@@ -6,19 +6,23 @@ import casadi as ca
import numpy as np
from rtctools._internal.alias_tools import AliasDict, AliasRelation
from rtctools._internal.alias_tools import AliasDict
from rtctools.data.storage import DataStoreAccessor
from .timeseries import Timeseries
logger = logging.getLogger("rtctools")
class OptimizationProblem(metaclass=ABCMeta):
class OptimizationProblem(DataStoreAccessor, metaclass=ABCMeta):
"""
Base class for all optimization problems.
"""
def __init__(self, **kwargs):
# Call parent class first for default behaviour.
super().__init__(**kwargs)
self.__mixed_integer = False
def optimize(self, preprocessing: bool = True, postprocessing: bool = True,
......@@ -406,10 +410,6 @@ class OptimizationProblem(metaclass=ABCMeta):
{variable: Timeseries(np.array([self.initial_time]), np.array([state]))
for variable, state in initial_state.items()})
@abstractproperty
def alias_relation(self) -> AliasRelation:
raise NotImplementedError
def variable_is_discrete(self, variable: str) -> bool:
"""
Returns ``True`` if the provided variable is discrete.
......
This diff is collapsed.
This diff is collapsed.
import bisect
import logging
from abc import ABCMeta, abstractmethod
import numpy as np
from rtctools._internal.alias_tools import AliasDict
from rtctools._internal.caching import cached
from rtctools.simulation.simulation_problem import SimulationProblem
logger = logging.getLogger("rtctools")
class IOMixin(SimulationProblem, metaclass=ABCMeta):
"""
Base class for all IO methods of optimization problems.
"""
def __init__(self, **kwargs):
# Call parent class first for default behaviour.
super().__init__(**kwargs)
def pre(self) -> None:
# Call read method to read all input
self.read()
@abstractmethod
def read(self) -> None:
"""
Reads input data from files, storing it in the internal data store through the various set or add methods
"""
pass
def post(self) -> None:
# Call write method to write all output
self.write()
@abstractmethod
def write(self) -> None:
""""
Writes output data to files, getting the data from the data store through the various get methods
"""
pass
def initialize(self, config_file=None):
# Set up experiment
timeseries_import_times = self.io.get_times()
self.__dt = timeseries_import_times[1] - timeseries_import_times[0]
self.setup_experiment(0, timeseries_import_times[-1], self.__dt)
parameter_variables = set(self.get_parameter_variables())
logger.debug("Model parameters are {}".format(parameter_variables))
for parameter in self.io.get_parameter_names():
if parameter in parameter_variables:
value = self.io.get_parameter(parameter)
logger.debug("IOMixin: Setting parameter {} = {}".format(parameter, value))
self.set_var(parameter, value)
# Load input variable names
self.__input_variables = set(self.get_input_variables().keys())
# Set input values
self.__set_input_variables(self.io.get_forecast_index())
logger.debug("Model inputs are {}".format(self.__input_variables))
# Empty output
self.__output_variables = self.get_output_variables()
n_times = len(self.io.get_times())
self.__output = AliasDict(self.alias_relation)
self.__output.update({variable: np.full(n_times, np.nan) for variable in self.__output_variables})
# Call super, which will also initialize the model itself
super().initialize(config_file)
# Extract consistent t0 values
for variable in self.__output_variables:
self.__output[variable][self.io.get_forecast_index()] = self.get_var(variable)
def __set_input_variables(self, t_idx):
for variable in self.get_variables():
if variable in self.__input_variables:
value = self.io.get_timeseries_values(variable)[t_idx]
if np.isfinite(value):
self.set_var(variable, value)
else:
logger.debug("IOMixin: Found bad value {} at index [{}] in timeseries aliased to input {}"
.format(value, t_idx, variable))
def update(self, dt):
# Time step
if dt < 0:
dt = self.__dt
# Current time stamp
t = self.get_current_time()
# Get current time index
t_idx = bisect.bisect_left(self.io.get_times(), t + dt)
# Set input values
self.__set_input_variables(t_idx)
# Call super
super().update(dt)
# Extract results
for variable in self.__output_variables:
self.__output[variable][t_idx] = self.get_var(variable)
@property
def output_variables(self):
return self.__output_variables
@property
def output(self):
return self.__output
@cached
def parameters(self):
"""
Return a dictionary of parameters, including parameters in the input files files.
:returns: Dictionary of parameters
"""
# Call parent class first for default values.
parameters = super().parameters()
# Load parameters from input files (stored in internal data store)
for parameter_name in self.io.get_parameter_names():
parameters[parameter_name] = self.io.get_parameter(parameter_name)
if logger.getEffectiveLevel() == logging.DEBUG:
for parameter_name in self.io.get_parameter_names():
logger.debug("IOMixin: Read parameter {}".format(parameter_name))
return parameters
def times(self, variable=None):
"""
Return a list of all the timesteps in seconds.
:param variable: Variable name.
:returns: List of all the timesteps in seconds.
"""
return self.io.get_times()[self.io.get_forecast_index():]
def timeseries_at(self, variable, t):
"""
Return the value of a time series at the given time.
:param variable: Variable name.
:param t: Time.
:returns: The interpolated value of the time series.
:raises: KeyError
"""
values = self.io.get_timeseries_values(variable)
timeseries_times_sec = self.io.get_times()
t_idx = bisect.bisect_left(timeseries_times_sec, t)
if timeseries_times_sec[t_idx] == t:
return values[t_idx]
else:
return np.interp(t, timeseries_times_sec, values)
This diff is collapsed.
......@@ -15,11 +15,12 @@ import pymoca.backends.casadi.api
from rtctools._internal.alias_tools import AliasDict, AliasRelation
from rtctools._internal.caching import cached
from rtctools.data.storage import DataStoreAccessor
logger = logging.getLogger("rtctools")
class SimulationProblem:
class SimulationProblem(DataStoreAccessor):
"""
Implements the `BMI <http://csdms.colorado.edu/wiki/BMI_Description>`_ Interface.
......@@ -199,7 +200,7 @@ class SimulationProblem:
self.__do_step = ca.rootfinder("next_state", "nlpsol", self.__res_vals, options)
# Call parent class for default behaviour.
super().__init__()
super().__init__(**kwargs)
def initialize(self, config_file=None):
"""
......
import os
from datetime import datetime, timedelta
from unittest import TestCase
from netCDF4 import Dataset
import numpy as np
import rtctools.data.netcdf as netcdf
from .data_path import data_path
class TestImportDataset(TestCase):
def setUp(self):
self.dataset = netcdf.ImportDataset(data_path(), 'timeseries_import')
def test_init(self):
time_var = self.dataset.time_variable
self.assertEqual(time_var._name, 'time')
self.assertEqual(time_var.standard_name, 'time')
self.assertEqual(time_var.long_name, 'time')
self.assertEqual(time_var.axis, 'T')
self.assertEqual(time_var.units, 'minutes since 1970-01-01 00:00:00.0 +0000')
station_var = self.dataset.station_variable
self.assertEqual(station_var._name, 'station_id')
self.assertEqual(station_var.long_name, 'station identification code')
self.assertEqual(station_var.cf_role, 'timeseries_id')
def test_read_times(self):
datetimes = self.dataset.read_import_times()
forecast_datetime = datetime(2013, 1, 15)
expected_datetimes = [forecast_datetime + timedelta(hours=i) for i in range(25)]
self.assertTrue(np.array_equal(datetimes, expected_datetimes))
def test_find_timeseries_variables(self):
variables = self.dataset.find_timeseries_variables()
self.assertEqual(variables, ['waterlevel'])
def test_stations(self):
stations = self.dataset.read_station_data()
ids = stations.station_ids
self.assertEqual(len(ids), 3)
self.assertTrue('LocA' in ids)
self.assertTrue('LocB' in ids)
self.assertTrue('LocC' in ids)
for id in ids:
read_attributes = stations.attributes[id].keys()
self.assertTrue(len(read_attributes), 5)
self.assertTrue('lat' in read_attributes)
self.assertTrue('lon' in read_attributes)
self.assertTrue('x' in read_attributes)
self.assertTrue('y' in read_attributes)
self.assertTrue('z' in read_attributes)
self.assertEqual(stations.attributes['LocA']['lat'], 53.0)
class TestExportDataset(TestCase):
def get_exported_dataset(self):
filename = os.path.join(
data_path(),
'timeseries_export.nc'
)
return Dataset(filename)
def setUp(self):
self.dataset = netcdf.ExportDataset(data_path(), 'timeseries_export')
def test_write_times(self):
times = np.array([-120, -300, -60, 300, 360])
self.dataset.write_times(times, -180.0, datetime(2018, 12, 21, 17, 30))
self.dataset.close()
dataset = self.get_exported_dataset()
self.assertTrue('time' in dataset.variables)
time_var = dataset.variables['time']
self.assertEqual(time_var.units, 'seconds since 2018-12-21 17:28:00')
self.assertEqual(time_var.axis, 'T')
self.assertEqual(time_var.standard_name, 'time')
self.assertTrue(np.array_equal(time_var[:], times + 300))
# todo create tests for write_station_data, create_variables and write_output_values
import logging
from unittest import TestCase
import numpy as np
from pymoca.backends.casadi.alias_relation import AliasRelation
from rtctools.data.storage import DataStoreAccessor
logger = logging.getLogger("rtctools")
logger.setLevel(logging.WARNING)
class DummyDataStore(DataStoreAccessor):
@property
def alias_relation(self):
return AliasRelation()
class TestDummyDataStore(TestCase):
def setUp(self):
self.datastore = DummyDataStore(input_folder='dummyInput', output_folder='dummyOutput')
self.tolerance = 1e-6
def test_times(self):
expected_times = np.array([-7200, -3600, 0, 3600, 7200, 9800])
self.datastore.io.set_times(expected_times)
actual_times = self.datastore.io.get_times()
self.assertTrue(np.array_equal(actual_times, expected_times))
def test_forecast_index(self):
forecast_index = self.datastore.io.get_forecast_index()
self.assertEqual(forecast_index, 0) # default forecast_index should be 0
times = np.array([-7200, -3600, 0, 3600, 7200, 9800])
self.datastore.io.set_times(times)
initial_time = self.datastore.initial_time
self.assertEqual(initial_time, -7200)
self.datastore.io.set_forecast_index(3)
self.assertEqual(self.datastore.io.get_forecast_index(), 3)
self.assertEqual(self.datastore.initial_time, 3600)
def test_timeseries(self):
# expect a KeyError when getting a timeseries that has not been set
with self.assertRaises(KeyError):
self.datastore.io.get_timeseries_values('someNoneExistentVariable')
# expect a RunTimeError when setting timeseries values before setting times
with self.assertRaises(RuntimeError):
self.datastore.io.set_timeseries_values('myNewVariable', np.array([3.1, 2.4, 2.5]))
self.datastore.io.set_times(np.array([-3600, 0, 7200]))
expected_values = np.array([3.1, 2.4, 2.5])
self.datastore.io.set_timeseries_values('myNewVariable', expected_values)
actual_values = self.datastore.io.get_timeseries_values('myNewVariable')
self.assertTrue(np.array_equal(actual_values, expected_values))
# expect a KeyError when getting timeseries for an ensemble member that doesn't exist
with self.assertRaises(KeyError):
self.datastore.io.get_timeseries_values('myNewVariable', 1)
expected_values = np.array([1.1, 1.4, 1.5])
self.datastore.io.set_timeseries_values('ensembleVariable', expected_values, ensemble_member=1)
with self.assertRaises(KeyError):
self.datastore.io.get_timeseries_values('ensembleVariable', 0)
self.assertTrue(np.array_equal(self.datastore.io.get_timeseries_values('ensembleVariable', 1), expected_values))
# expect a warning when overwriting a timeseries with check_duplicates=True (default)
new_values = np.array([2.1, 1.1, 0.1])
with self.assertLogs(logger, level='WARN') as cm:
self.datastore.io.set_timeseries_values('myNewVariable', new_values)
self.assertEqual(cm.output,
['WARNING:rtctools:Time series values for ensemble member 0 and variable '
'myNewVariable set twice. Overwriting old values.'])
self.assertTrue(np.array_equal(self.datastore.io.get_timeseries_values('myNewVariable'), new_values))
# disable check to allow overwriting old values
newest_values = np.array([-0.4, 2.14, 29.1])
with self.assertLogs(logger, level='WARN') as cm:
self.datastore.io.set_timeseries_values('myNewVariable', newest_values, check_duplicates=False)
self.assertEqual(cm.output, [])
logger.warning('All is well') # if no log message occurs, assertLogs will throw an AssertionError
self.assertTrue(np.array_equal(self.datastore.io.get_timeseries_values('myNewVariable'), newest_values))
def test_parameters(self):
# expect a KeyError when getting a parameter that has not been set
with self.assertRaises(KeyError):
self.datastore.io.get_parameter('someNoneExistentParameter')
self.datastore.io.set_parameter('myNewParameter', 1.4)
self.assertEqual(self.datastore.io.get_parameter('myNewParameter'), 1.4)
# expect a KeyError when getting parameters for an ensemble member that doesn't exist
with self.assertRaises(KeyError):
self.datastore.io.get_parameter('myNewParameter', 1)
self.datastore.io.set_parameter('ensembleParameter', 1.2, ensemble_member=1)
with self.assertRaises(KeyError):
self.datastore.io.get_parameter('ensembleParameter', 0)
self.assertEqual(self.datastore.io.get_parameter('ensembleParameter', 1), 1.2)
# expect a warning when overwriting a parameter with check_duplicates=True (default)
with self.assertLogs(logger, level='WARN') as cm:
self.datastore.io.set_parameter('myNewParameter', 2.5)
self.assertEqual(cm.output,
['WARNING:rtctools:Attempting to set parameter value for ensemble member 0 '
'and name myNewParameter twice. Using new value of 2.5.'])
self.assertEqual(self.datastore.io.get_parameter('myNewParameter'), 2.5)
# disable check to allow overwriting old values
with self.assertLogs(logger, level='WARN') as cm:
self.datastore.io.set_parameter('myNewParameter', 2.2, check_duplicates=False)
self.assertEqual(cm.output, [])
logger.warning('All is well') # if no log message occurs, assertLogs will throw an AssertionError
self.assertEqual(self.datastore.io.get_parameter('myNewParameter'), 2.2)
def test_variables(self):
self.assertEqual(len(self.datastore.io.get_variables()), 0)
self.datastore.io.set_times(np.array([0, 1, 2]))
self.datastore.io.set_timeseries_values('var1', np.array([1.0, 2.0, 3.0]))
self.datastore.io.set_timeseries_values('var2', np.array([2.0, 3.0, 4.0]))
variables = self.datastore.io.get_variables()
self.assertEqual(len(variables), 2)
self.assertTrue('var1' in variables)
self.assertTrue('var2' in variables)
self.assertEqual(len(self.datastore.io.get_variables(ensemble_member=1)), 0)
self.datastore.io.set_timeseries_values('var3', np.array([0.1, 0.2, 0.3]), ensemble_member=1)
variables = self.datastore.io.get_variables(ensemble_member=1)
self.assertEqual(len(variables), 1)
self.assertTrue('var3' in variables)
def test_ensemble_size(self):
self.assertEqual(self.datastore.io.get_ensemble_size(), 0)
self.datastore.io.set_times(np.array([0, 1, 2]))
self.datastore.io.set_timeseries_values('var1', np.array([1.0, 2.0, 3.0]))
self.datastore.io.set_timeseries_values('var2', np.array([2.0, 3.0, 4.0]))
self.assertEqual(self.datastore.io.get_ensemble_size(), 1)
self.datastore.io.set_timeseries_values('var3', np.array([0.1, 0.2, 0.3]), ensemble_member=1)
self.assertEqual(self.datastore.io.get_ensemble_size(), 2)
self.datastore.io.set_timeseries_values('var4', np.array([1.1, 2.2, 3.3]), ensemble_member=100)
self.assertEqual(self.datastore.io.get_ensemble_size(), 101)
def test_parameter_names(self):
self.assertEqual(len(self.datastore.io.get_parameter_names()), 0)
self.datastore.io.set_parameter('par1', 1.0)
self.datastore.io.set_parameter('par2', 2.3)
names = self.datastore.io.get_parameter_names()
self.assertEqual(len(names), 2)
self.assertTrue('par1' in names)
self.assertTrue('par2' in names)
self.assertEqual(len(self.datastore.io.get_parameter_names(ensemble_member=1)), 0)
self.datastore.io.set_parameter('par3', 3.1, ensemble_member=1)
names = self.datastore.io.get_parameter_names(ensemble_member=1)
self.assertEqual(len(names), 1)
self.assertTrue('par3' in names)
def test_parameter_ensemble_size(self):
self.assertEqual(self.datastore.io.get_parameter_ensemble_size(), 0)
self.datastore.io.set_parameter('par1', 1.0)
self.datastore.io.set_parameter('par2', 2.3)
self.assertEqual(self.datastore.io.get_parameter_ensemble_size(), 1)
self.datastore.io.set_parameter('par3', 3.1, ensemble_member=1)
self.assertEqual(self.datastore.io.get_parameter_ensemble_size(), 2)
self.datastore.io.set_parameter('par4', 4.5, ensemble_member=100)
self.assertEqual(self.datastore.io.get_parameter_ensemble_size(), 101)
model NetcdfModel
Real loc_a__x(start=1.1);
Real loc_a__w(start=0.0);
Real alias;
parameter Real k = 1.0;
input Real loc_b__u(fixed=false);
output Real loc_c__y;
output Real loc_a__z;
input Real loc_a__x_delayed(fixed=false);
output Real loc_c__switched;
input Real loc_a__constant_input(fixed=true);
output Real loc_a__constant_output;
equation
der(loc_a__x) = k * loc_a__x + loc_b__u;
der(loc_a__w) = loc_a__x;
alias = loc_a__x;
loc_c__y + loc_a__x = 3.0;
loc_a__z = alias^2 + sin(time);
loc_a__x_delayed = delay(loc_a__x, 0.1);
if loc_a__x > 0.5 then
loc_c__switched = 1.0;
else
loc_c__switched = 2.0;
end if;
loc_a__constant_output = loc_a__constant_input;
end NetcdfModel;
\ No newline at end of file
......@@ -3,7 +3,7 @@ import logging
import numpy as np
from rtctools.optimization.collocated_integrated_optimization_problem import (
CollocatedIntegratedOptimizationProblem,
CollocatedIntegratedOptimizationProblem
)
from rtctools.optimization.csv_mixin import CSVMixin
from rtctools.optimization.modelica_mixin import ModelicaMixin
......@@ -105,7 +105,7 @@ class TestCSVMixin(TestCase):
self.assertAlmostEqual(a, b, self.tolerance)
class TestPIMixinEnsemble(TestCase):
class TestCSVMixinEnsemble(TestCase):
def setUp(self):
self.problem = ModelEnsemble()
self.problem.optimize()
......
import logging
from unittest import TestCase
import casadi as ca
import numpy as np
from rtctools.optimization.collocated_integrated_optimization_problem import (
CollocatedIntegratedOptimizationProblem
)
from rtctools.optimization.io_mixin import IOMixin
from rtctools.optimization.modelica_mixin import ModelicaMixin
from rtctools.optimization.timeseries import Timeseries
from .data_path import data_path
logger = logging.getLogger("rtctools")
logger.setLevel(logging.WARNING)
class DummyIOMixin(IOMixin):
def read(self):
# fill with dummy data
times = np.array([-7200, -3600, 0, 3600, 7200, 9800])
self.io.set_times(times)
forecast_index = 2
self.io.set_forecast_index(forecast_index)
values = {
'constant_input': [1.1, 1.4, 0.9, 1.2, 1.5, 1.7],
'u_Min': [0.5, 0.2, 0.3, 0.1, 0.4, 0.0],
'u_Max': [2.1, 2.2, 2.0, 2.4, 2.5, 2.3],
'alias': [3.1, 3.2, 3.3, 3.4, 3.5, 3.6] # alias of 'x'
}
for key, value in values.items():
self.io.set_timeseries_values(key, np.array(value))
def write(self):
pass
class Model(DummyIOMixin, ModelicaMixin, CollocatedIntegratedOptimizationProblem):
def __init__(self, **kwargs):
kwargs["model_name"] = kwargs.get("model_name", "Model")
kwargs["input_folder"] = data_path()
kwargs["output_folder"] = data_path()
kwargs["model_folder"] = data_path()
super().__init__(**kwargs)
def objective(self, ensemble_member):
# Quadratic penalty on state 'x' at final time
xf = self.state_at("x", self.times()[-1])
f = xf ** 2
return f
def constraints(self, ensemble_member):
# No additional constraints
return []
def compiler_options(self):
compiler_options = super().compiler_options()
compiler_options["cache"] = False
return compiler_options
class TestOptimizationProblem(TestCase):
"""
Tests the default methods from OptimizationProblem
"""
def setUp(self):
self.problem = Model()
self.problem.read()
self.tolerance = 1e-6
def test_get_timeseries(self):
timeseries = self.problem.get_timeseries('constant_input')
expected_times = [-7200, -3600, 0, 3600, 7200, 9800]
self.assertTrue(np.array_equal(timeseries.times, expected_times))
expected_values = [1.1, 1.4, 0.9, 1.2, 1.5, 1.7]
self.assertTrue(np.array_equal(timeseries.values, expected_values))
timeseries_x = self.problem.get_timeseries('x')
self.assertTrue(np.array_equal(timeseries_x.times, expected_times))
expected_values = [3.1, 3.2, 3.3, 3.4, 3.5, 3.6]
self.assertTrue(np.array_equal(timeseries_x.values, expected_values))
def test_set_timeseries_with_timeseries(self):
times = self.problem.io.get_times()
values = [0.1, 1.1, 2.1, 3.1, 4.1, 5.1]
self.problem.set_timeseries('newVar', Timeseries(times, values))
actual_series = self.problem.get_timeseries('newVar')
self.assertTrue(np.array_equal(actual_series.values, values))
self.assertTrue(np.array_equal(actual_series.times, times))
# test if it was actually stored in the internal data store
actual_values = self.problem.io.get_timeseries_values('newVar')
self.assertTrue(np.array_equal(actual_values, values))
# now let's do this again but only give part of the values
values = [1.1, 2.1, 3.1]
# with check_consistency=True (default) we should get a ValueError
with self.assertRaises(ValueError):
self.problem.set_timeseries('partialSeries', Timeseries(times[-3:], values))
self.problem.set_timeseries('partialSeries', Timeseries(times[-3:], values), check_consistency=False)
actual_series = self.problem.get_timeseries('partialSeries')
self.assertTrue(np.array_equal(actual_series.times, times))
self.assertTrue(np.array_equal(actual_series.values[-3:], values))
self.assertTrue(np.all(np.isnan(actual_series.values[:-3])))
def test_set_timeseries_with_array(self):
times = self.problem.times()
values = np.ones(times.shape)
self.problem.set_timeseries('newVar', values)
actual_series = self.problem.get_timeseries('newVar')
forecast_index = self.problem.io.get_forecast_index()
self.assertTrue(np.array_equal(actual_series.values[forecast_index:], values))
self.assertTrue(np.all(np.isnan(actual_series.values[:forecast_index])))
def test_timeseries_at(self):
times = self.problem.io.get_times()
values = times.astype(dtype=np.float64) / 10
self.problem.set_timeseries('myVar', Timeseries(times, values))
actual = self.problem.timeseries_at('myVar', times[0])
self.assertEqual(actual, times[0] / 10)
actual = self.problem.timeseries_at('myVar', (times[0] + times[1]) / 2)
self.assertEqual(actual, (values[0] + values[1]) / 2)
def test_bounds(self):
bounds = self.problem.bounds()
self.assertEqual(bounds['x'], [float("-inf"), float("inf")])
min_u = bounds['u'][0]
max_u = bounds['u'][1]
expected_times = [0, 3600, 7200, 9800]
self.assertTrue(np.array_equal(min_u.times, expected_times))
self.assertTrue(np.array_equal(max_u.times, expected_times))
expected_min_values = [0.3, 0.1, 0.4, 0.0]
self.assertTrue(np.array_equal(min_u.values, expected_min_values))
expected_max_values = [2.0, 2.4, 2.5, 2.3]
self.assertTrue(np.array_equal(max_u.values, expected_max_values))
def test_history(self):
history = self.problem.history(0)
expected_times = [-7200, -3600, 0]
self.assertTrue(np.array_equal(history['x'].times, expected_times))
self.assertTrue(np.array_equal(history['constant_input'].times, expected_times))
expected_history_x = [3.1, 3.2, 3.3]
self.assertTrue(np.array_equal(history['x'].values, expected_history_x))
expected_history_u = [1.1, 1.4, 0.9]
self.assertTrue(np.array_equal(history['constant_input'].values, expected_history_u))
def test_seed(self):
# add another variable containing some nans
self.problem.io.set_timeseries_values('some_missing', np.array([np.nan, 0.1, 0.2, np.nan, 3.1, np.nan]))
self.problem.dae_variables['free_variables'].append(ca.MX().sym('some_missing'))
seed = self.problem.seed(0)
self.assertTrue(np.array_equal(seed['x'].values, [3.1, 3.2, 3.3, 3.4, 3.5, 3.6]))
self.assertTrue(np.array_equal(seed['alias'].values, [3.1, 3.2, 3.3, 3.4, 3.5, 3.6]))
self.assertTrue(np.array_equal(seed['some_missing'].values, [0, 0.1, 0.2, 0, 3.1, 0]))
def test_constant_inputs(self):
constant_inputs = self.problem.constant_inputs(0)
self.assertTrue(np.array_equal(constant_inputs['constant_input'].values, [1.1, 1.4, 0.9, 1.2, 1.5, 1.7]))
import os
from unittest import TestCase
from netCDF4 import Dataset, chartostring
import numpy as np
import numpy.ma as ma
from rtctools.optimization.collocated_integrated_optimization_problem import CollocatedIntegratedOptimizationProblem
from rtctools.optimization.modelica_mixin import ModelicaMixin
from rtctools.optimization.netcdf_mixin import NetCDFMixin
from .data_path import data_path
class NetcdfModel(NetCDFMixin, ModelicaMixin, CollocatedIntegratedOptimizationProblem):
def __init__(self):
super().__init__(
input_folder=data_path(),
output_folder=data_path(),
model_name="NetcdfModel",
model_folder=data_path()
)
def read(self):
super().read()
# just add the parameters ourselves for now (values taken from test_pi_mixin)
params = {'k': 1.01, 'x': 1.02, 'SV_V_y': 22.02, 'j': 12.01, 'b': 13.01, 'y': 12.02, 'SV_H_y': 22.02}
for key, value in params.items():
self.io.set_parameter(key, value)
def objective(self, ensemble_member):
# Quadratic penalty on state 'x' at final time
xf = self.state_at("loc_a__x", self.times()[-1])
f = xf ** 2
return f
def constraints(self, ensemble_member):
# No additional constraints
return []
def compiler_options(self):
compiler_options = super().compiler_options()
compiler_options["cache"] = False
return compiler_options
class TestNetCDFMixin(TestCase):
def setUp(self):
self.problem = NetcdfModel()
self.tolerance = 1e-5
def test_read(self):
self.problem.read()
datastore = self.problem.io
self.assertTrue(np.all(datastore.get_timeseries_values('loc_a__u_min') == -3.0))
self.assertTrue(np.all(datastore.get_timeseries_values('loc_b__u_min') == -2.0))
self.assertTrue(np.all(datastore.get_timeseries_values('loc_a__u_max') == 3.0))
self.assertTrue(np.all(datastore.get_timeseries_values('loc_b__u_max') == 2.0))
expected_values = np.zeros((22,), dtype=float)
expected_values[0] = 1.02
expected_values[2] = 0.03
self.assertTrue(np.array_equal(datastore.get_timeseries_values('loc_a__x'), expected_values))
self.assertTrue(np.all(np.isnan(datastore.get_timeseries_values('loc_b__x'))))
expected_values = np.zeros((22,), dtype=float)
expected_values[2] = 0.03
self.assertTrue(np.array_equal(datastore.get_timeseries_values('loc_a__w'), expected_values))
self.assertTrue(np.all(np.isnan(datastore.get_timeseries_values('loc_b__w'))))
self.assertTrue(np.all(datastore.get_timeseries_values('loc_a__constant_input') == 1.0))
self.assertTrue(np.all(datastore.get_timeseries_values('loc_b__constant_input') == 1.5))
def test_write(self):
self.problem.optimize()
self.results = self.problem.extract_results()
# open the exported file
filename = os.path.join(
data_path(),
self.problem.timeseries_export_basename + ".nc"
)
dataset = Dataset(filename)
written_variables = dataset.variables.keys()
self.assertEqual(len(written_variables), 10)
self.assertTrue('time' in written_variables)
self.assertTrue('station_id' in written_variables)
self.assertTrue('lon' in written_variables)
self.assertTrue('lat' in written_variables)
self.assertTrue('y' in written_variables)
self.assertTrue('constant_output' in written_variables)
self.assertTrue('u' in written_variables)
self.assertTrue('z' in written_variables)
self.assertTrue('switched' in written_variables)
self.assertTrue('x_delayed' in written_variables)
ids_var = dataset.variables['station_id']
self.assertEqual(ids_var.shape, (3, 5))
self.assertEqual(ids_var.cf_role, 'timeseries_id')
station_ids = []
for i in range(3):
station_ids.append(str(chartostring(ids_var[i])))
self.assertTrue('loc_a'in station_ids)
self.assertTrue('loc_b' in station_ids)
self.assertTrue('loc_c' in station_ids)
# order of location ids is random each time the test runs...
loc_a_index = station_ids.index('loc_a')
loc_b_index = station_ids.index('loc_b')
loc_c_index = station_ids.index('loc_c')
self.assertAlmostEqual(dataset.variables['lon'][loc_a_index], 4.3780269, delta=self.tolerance)
y = dataset.variables['y']
self.assertEqual(y.shape, (22, 3))
for i in range(3):
data = ma.filled(y[:, i], np.nan)
if i == loc_c_index:
self.assertAlmostEqual(data[0], 1.98, delta=self.tolerance)
for j in range(1, 22):
self.assertAlmostEqual(data[j], 3.0, delta=self.tolerance)
else:
self.assertTrue(np.all(np.isnan(data)))
u = dataset.variables['u']
self.assertEqual(u.shape, (22, 3))
for i in range(3):
data = ma.filled(u[:, i], np.nan)
if i == loc_b_index:
self.assertTrue(np.all(~np.isnan(data)))
else:
self.assertTrue(np.all(np.isnan(data)))
constant_output = dataset.variables['constant_output']
self.assertEqual(constant_output.shape, (22, 3))
for i in range(3):
data = ma.filled(constant_output[:, i], np.nan)
if i == loc_a_index:
self.assertTrue(np.all(data == 1.0))
else:
self.assertTrue(np.all(np.isnan(data)))
time = dataset.variables['time']
self.assertEqual(time.units, 'seconds since 2013-05-09 22:00:00')
self.assertEqual(time.standard_name, 'time')
self.assertEqual(time.axis, 'T')
self.assertTrue(np.allclose(time[:], np.arange(0, 22*3600, 3600, dtype=float)))
......@@ -92,24 +92,24 @@ class TestPIMixin(TestCase):
def test_interpolation(self):
t = (
self.problem.get_timeseries("x", 0).times[
self.problem.get_forecast_index() + 1
self.problem.io.get_forecast_index() + 1
]
+ (
self.problem.get_timeseries("x", 0).times[
self.problem.get_forecast_index() + 2
self.problem.io.get_forecast_index() + 2
]
- self.problem.get_timeseries("x", 0).times[
self.problem.get_forecast_index() + 1
self.problem.io.get_forecast_index() + 1
]
)
/ 2
)
x_ref = (
self.problem.get_timeseries("x", 0).values[
self.problem.get_forecast_index() + 1
self.problem.io.get_forecast_index() + 1
]
+ self.problem.get_timeseries("x", 0).values[
self.problem.get_forecast_index() + 2
self.problem.io.get_forecast_index() + 2
]
) / 2
self.assertAlmostEqual(
......
import numpy as np
from rtctools.simulation.io_mixin import IOMixin
from rtctools.simulation.simulation_problem import SimulationProblem
from test_case import TestCase
from .data_path import data_path
class DummyIOMixin(IOMixin):
def read(self):
# fill with dummy data
times = np.array([-7200, -3600, 0, 3600, 7200, 9800])
self.io.set_times(times)
forecast_index = 2
self.io.set_forecast_index(forecast_index)
values = {
'constant_input': [1.1, 1.4, 0.9, 1.2, 1.5, 1.7],
'u': [0.5, 0.2, 0.3, 0.1, 0.4, 0.0]
}
for key, value in values.items():
self.io.set_timeseries_values(key, np.array(value))
# set some parameters as well
self.io.set_parameter('k',</