Commit 00c7de3f authored by Anne Hommelberg's avatar Anne Hommelberg

Add general DataStore and IOMixin classes

Large refactoring of all IO mixins (PIMixin and CSVMixin).
Adds a general DataStore class extended by OptimizationProblem
and SimulationProblem, which is used to store all data read by
the IO mixins.
Adds an optimization IOMixin class which contains methods that
were previously duplicated in the optimization PIMixin and
CSVMixin.
Adds a simulation IOMixin class which does the same for the
simulation PIMixin and IOMixin.
parent 8b20fccf
import logging
from abc import ABCMeta, abstractmethod
from datetime import datetime, timedelta
from typing import Iterable, Set, Union
import numpy as np
from rtctools._internal.alias_tools import AliasDict, AliasRelation
logger = logging.getLogger("rtctools")
class DataStore(metaclass=ABCMeta):
"""
Base class for all problems.
Adds an internal data store where which timeseries, parameters and initial states can be stored and read from.
:cvar timeseries_import_basename:
Import file basename. Default is ``timeseries_import``.
:cvar timeseries_export_basename:
Export file basename. Default is ``timeseries_export``.
"""
#: Import file basename
timeseries_import_basename = 'timeseries_import'
#: Export file basename
timeseries_export_basename = 'timeseries_export'
def __init__(self, **kwargs):
# Check arguments
assert ('input_folder' in kwargs)
assert ('output_folder' in kwargs)
# Save arguments
self._input_folder = kwargs['input_folder']
self._output_folder = kwargs['output_folder']
# Should all be set by subclass via setters
self.__forecast_index = 0
self.__timeseries_times_sec = None
self.__timeseries_values = []
self.__parameters = []
# todo add support for storing initial states
# self.__initial_state = []
def get_times(self) -> np.ndarray:
""""
Returns the timeseries times in seconds.
:return timseries times in seconds, or None if there has been no call to set_times
"""
return self.__timeseries_times_sec
def set_times(self, times_in_sec: np.ndarray) -> None:
"""
Sets the timeseries times in seconds in the internal data store.
Must be called in .read() to store the times in the IOMixin before calling set_timeseries_values
to store the values for an input timeseries.
:param times_in_sec: np.ndarray containing the times in seconds
"""
if self.__timeseries_times_sec is not None and not np.array_equal(times_in_sec, self.__timeseries_times_sec):
raise RuntimeError("Attempting to overwrite the input time series times with different values. "
"Please ensure all input time series have the same times.")
self.__timeseries_times_sec = times_in_sec
def set_timeseries_values(self,
variable: str,
values: np.ndarray,
ensemble_member: int = 0,
check_duplicates: bool = True) -> None:
"""
Stores input time series values in the internal data store.
:param variable: Variable name.
:param values: The values to be stored.
:param ensemble_member: The ensemble member index.
:param check_duplicates: If True, a warning will be given when attempting to overwrite values.
If False, existing values can be silently overwritten with new values.
"""
if self.__timeseries_times_sec is None:
raise RuntimeError("First call set_times before calling set_timeseries_values")
if len(self.__timeseries_times_sec) != len(values):
raise ValueError("Length of values ({}) must be the same as length of times ({})"
.format(len(values), len(self.__timeseries_times_sec)))
while ensemble_member >= len(self.__timeseries_values):
self.__timeseries_values.append(AliasDict(self.alias_relation))
if check_duplicates and variable in self.__timeseries_values[ensemble_member].keys():
logger.warning("Attempting to set time series values for ensemble member {} and variable {} twice. "
"Ignoring second set of values.".format(ensemble_member, variable))
return
self.__timeseries_values[ensemble_member][variable] = values
def get_timeseries_values(self, variable: str, ensemble_member: int = 0) -> np.ndarray:
"""
Looks up the time series values in the internal data store.
"""
if ensemble_member >= len(self.__timeseries_values):
raise KeyError("ensemble_member {} does not exist".format(ensemble_member))
return self.__timeseries_values[ensemble_member][variable]
def get_variables(self, ensemble_member: int = 0) -> Set:
"""
Returns a set of variables for which timeseries values are stored in the internal data store
:param ensemble_member: The ensemble member index.
"""
if ensemble_member >= len(self.__timeseries_values):
return set()
return self.__timeseries_values[ensemble_member].keys()
def get_ensemble_size(self):
"""
Returns the number of ensemble members for which timeseries are stored in the internal data store
"""
return len(self.__timeseries_values)
def get_forecast_index(self) -> int:
""""
Looks up the forecast index from the internal data store
:return: Current forecast index, values before this index will be considered "history".
"""
return self.__forecast_index
def set_forecast_index(self, forecast_index: int) -> None:
"""
Sets the forecast index in the internal data store.
Values (and times) before this index will be considered "history"
:param forecast_index: New forecast index.
"""
self.__forecast_index = forecast_index
def set_parameter(self,
parameter_name: str,
value: float,
ensemble_member: int = 0,
check_duplicates: bool = True) -> None:
"""
Stores the parameter value in the internal data store.
:param parameter_name: Parameter name.
:param value: The values to be stored.
:param ensemble_member: The ensemble member index.
:param check_duplicates: If True, a warning will be given when attempting to overwrite values.
If False, existing values can be silently overwritten with new values.
"""
while ensemble_member >= len(self.__parameters):
self.__parameters.append(AliasDict(self.alias_relation))
if check_duplicates and parameter_name in self.__parameters[ensemble_member].keys():
logger.warning("Attempting to set parameter value for ensemble member {} and name {} twice. "
"Ignoring second set of values.".format(ensemble_member, parameter_name))
return
self.__parameters[ensemble_member][parameter_name] = value
def get_parameter(self, parameter_name: str, ensemble_member: int = 0) -> float:
"""
Looks up the parameter value in the internal data store.
"""
if ensemble_member >= len(self.__parameters):
raise KeyError("ensemble_member {} does not exist".format(ensemble_member))
return self.__parameters[ensemble_member][parameter_name]
def get_parameter_names(self, ensemble_member: int = 0) -> Set:
"""
Returns a set of variables for which timeseries values are stored in the internal data store
:param ensemble_member: The ensemble member index.
"""
if ensemble_member >= len(self.__parameters):
return set()
return self.__parameters[ensemble_member].keys()
@property
def initial_time(self) -> float:
"""
The initial time in seconds.
"""
if self.__timeseries_times_sec is None:
raise RuntimeError("Attempting to access initial_time before setting times")
return self.__timeseries_times_sec[self.__forecast_index]
@staticmethod
def datetime_to_sec(d: Union[Iterable[datetime], datetime], t0: datetime) -> Union[Iterable[float], float]:
"""
Returns the date/timestamps in seconds since t0.
:param d: Iterable of datetimes or a single datetime object.
:param t0: Reference datetime.
"""
if hasattr(d, '__iter__'):
return np.array([(t - t0).total_seconds() for t in d])
else:
return (d - t0).total_seconds()
@staticmethod
def sec_to_datetime(s: Union[Iterable[float], float], t0: datetime) -> Union[Iterable[datetime], datetime]:
"""
Return the date/timestamps in seconds since t0 as datetime objects.
:param s: Iterable of ints or a single int (number of seconds before or after t0).
:param t0: Reference datetime.
"""
if hasattr(s, '__iter__'):
return [t0 + timedelta(seconds=t) for t in s]
else:
return t0 + timedelta(seconds=s)
@property
@abstractmethod
def alias_relation(self) -> AliasRelation:
raise NotImplementedError
import itertools
import logging
from abc import ABCMeta, abstractmethod
from abc import ABCMeta
import casadi as ca
......@@ -78,17 +78,6 @@ class CollocatedIntegratedOptimizationProblem(OptimizationProblem, metaclass=ABC
# Call super
super().__init__(**kwargs)
@abstractmethod
def times(self, variable=None):
"""
List of time stamps for variable.
:param variable: Variable name.
:returns: A list of time stamps for the given variable.
"""
pass
def interpolation_method(self, variable=None):
"""
Interpolation method for variable.
......
This diff is collapsed.
This diff is collapsed.
......@@ -279,8 +279,14 @@ class ModelicaMixin(OptimizationProblem):
M_ = float(M_)
# We take the intersection of all provided bounds
m = max(m, m_)
M = min(M, M_)
def intersect(old_bound, new_bound, intersecter):
if isinstance(old_bound, Timeseries):
return Timeseries(old_bound.times, intersecter(old_bound.values, new_bound))
else:
return intersecter(old_bound, new_bound)
m = intersect(m, m_, np.maximum)
M = intersect(M, M_, np.minimum)
bounds[sym_name] = (m, M)
......@@ -297,7 +303,7 @@ class ModelicaMixin(OptimizationProblem):
# Load seeds
for var in itertools.chain(self.__pymoca_model.states, self.__pymoca_model.alg_states):
if var.fixed:
if var.fixed or var.symbol.name() in seed.keys():
# Values will be set from import timeseries
continue
......
......@@ -6,19 +6,23 @@ import casadi as ca
import numpy as np
from rtctools._internal.alias_tools import AliasDict, AliasRelation
from rtctools._internal.alias_tools import AliasDict
from rtctools.data.storage import DataStore
from .timeseries import Timeseries
logger = logging.getLogger("rtctools")
class OptimizationProblem(metaclass=ABCMeta):
class OptimizationProblem(DataStore, metaclass=ABCMeta):
"""
Base class for all optimization problems.
"""
def __init__(self, **kwargs):
# Call parent class first for default behaviour.
super().__init__(**kwargs)
self.__mixed_integer = False
def optimize(self, preprocessing: bool = True, postprocessing: bool = True,
......@@ -406,10 +410,6 @@ class OptimizationProblem(metaclass=ABCMeta):
{variable: Timeseries(np.array([self.initial_time]), np.array([state]))
for variable, state in initial_state.items()})
@abstractproperty
def alias_relation(self) -> AliasRelation:
raise NotImplementedError
def variable_is_discrete(self, variable: str) -> bool:
"""
Returns ``True`` if the provided variable is discrete.
......
This diff is collapsed.
This diff is collapsed.
import bisect
import logging
from abc import ABCMeta, abstractmethod
import numpy as np
from rtctools._internal.alias_tools import AliasDict
from rtctools._internal.caching import cached
from rtctools.simulation.simulation_problem import SimulationProblem
logger = logging.getLogger("rtctools")
class IOMixin(SimulationProblem, metaclass=ABCMeta):
"""
Base class for all IO methods of optimization problems.
"""
def __init__(self, **kwargs):
# Call parent class first for default behaviour.
super().__init__(**kwargs)
def pre(self) -> None:
# Call read method to read all input
self.read()
@abstractmethod
def read(self) -> None:
"""
Reads input data from files, storing it in the internal data store through the various set or add methods
"""
pass
def post(self) -> None:
# Call write method to write all output
self.write()
@abstractmethod
def write(self) -> None:
""""
Writes output data to files, getting the data from the data store through the various get methods
"""
pass
def initialize(self, config_file=None):
# Set up experiment
timeseries_import_times = self.get_times()
self.__dt = timeseries_import_times[1] - timeseries_import_times[0]
self.setup_experiment(0, timeseries_import_times[-1], self.__dt)
parameter_variables = set(self.get_parameter_variables())
logger.debug("Model parameters are {}".format(parameter_variables))
for parameter in self.get_parameter_names():
if parameter in parameter_variables:
value = self.get_parameter(parameter)
logger.debug("IOMixin: Setting parameter {} = {}".format(parameter, value))
self.set_var(parameter, value)
# Load input variable names
self.__input_variables = set(self.get_input_variables().keys())
# Set input values
self.__set_input_variables(self.get_forecast_index())
logger.debug("Model inputs are {}".format(self.__input_variables))
# Empty output
self.__output_variables = self.get_output_variables()
n_times = len(self.get_times())
self.__output = AliasDict(self.alias_relation)
self.__output.update({variable: np.full(n_times, np.nan) for variable in self.__output_variables})
# Call super, which will also initialize the model itself
super().initialize(config_file)
# Extract consistent t0 values
for variable in self.__output_variables:
self.__output[variable][self.get_forecast_index()] = self.get_var(variable)
def __set_input_variables(self, t_idx):
for variable in self.get_variables():
if variable in self.__input_variables:
value = self.get_timeseries_values(variable)[t_idx]
if np.isfinite(value):
self.set_var(variable, value)
else:
logger.debug("IOMixin: Found bad value {} at index [{}] in timeseries aliased to input {}"
.format(value, t_idx, variable))
def update(self, dt):
# Time step
if dt < 0:
dt = self.__dt
# Current time stamp
t = self.get_current_time()
# Get current time index
t_idx = bisect.bisect_left(self.get_times(), t + dt)
# Set input values
self.__set_input_variables(t_idx)
# Call super
super().update(dt)
# Extract results
for variable in self.__output_variables:
self.__output[variable][t_idx] = self.get_var(variable)
@property
def output_variables(self):
return self.__output_variables
@property
def output(self):
return self.__output
@cached
def parameters(self):
"""
Return a dictionary of parameters, including parameters in the input files files.
:returns: Dictionary of parameters
"""
# Call parent class first for default values.
parameters = super().parameters()
# Load parameters from input files (stored in internal data store)
for parameter_name in self.get_parameter_names():
parameters[parameter_name] = self.get_parameter(parameter_name)
if logger.getEffectiveLevel() == logging.DEBUG:
for parameter_name in self.get_parameter_names():
logger.debug("IOMixin: Read parameter {}".format(parameter_name))
return parameters
def times(self, variable=None):
"""
Return a list of all the timesteps in seconds.
:param variable: Variable name.
:returns: List of all the timesteps in seconds.
"""
return self.get_times()[self.get_forecast_index():]
def timeseries_at(self, variable, t):
"""
Return the value of a time series at the given time.
:param variable: Variable name.
:param t: Time.
:returns: The interpolated value of the time series.
:raises: KeyError
"""
values = self.get_timeseries_values(variable)
timeseries_times_sec = self.get_times()
t_idx = bisect.bisect_left(timeseries_times_sec, t)
if timeseries_times_sec[t_idx] == t:
return values[t_idx]
else:
return np.interp(t, timeseries_times_sec, values)
This diff is collapsed.
......@@ -15,11 +15,12 @@ import pymoca.backends.casadi.api
from rtctools._internal.alias_tools import AliasDict, AliasRelation
from rtctools._internal.caching import cached
from rtctools.data.storage import DataStore
logger = logging.getLogger("rtctools")
class SimulationProblem:
class SimulationProblem(DataStore):
"""
Implements the `BMI <http://csdms.colorado.edu/wiki/BMI_Description>`_ Interface.
......@@ -199,7 +200,7 @@ class SimulationProblem:
self.__do_step = ca.rootfinder("next_state", "nlpsol", self.__res_vals, options)
# Call parent class for default behaviour.
super().__init__()
super().__init__(**kwargs)
def initialize(self, config_file=None):
"""
......
import logging
from unittest import TestCase
import numpy as np
from pymoca.backends.casadi.alias_relation import AliasRelation
from rtctools.data.storage import DataStore
logger = logging.getLogger("rtctools")
logger.setLevel(logging.WARNING)
class DummyDataStore(DataStore):
def read(self):
pass
def write(self):
pass
@property
def alias_relation(self):
return AliasRelation()
class TestDummyDataStore(TestCase):
def setUp(self):
self.datastore = DummyDataStore(input_folder='dummyInput', output_folder='dummyOutput')
self.tolerance = 1e-6
def test_times(self):
expected_times = np.array([-7200, -3600, 0, 3600, 7200, 9800])
self.datastore.set_times(expected_times)
actual_times = self.datastore.get_times()
self.assertTrue(np.array_equal(actual_times, expected_times))
def test_forecast_index(self):
forecast_index = self.datastore.get_forecast_index()
self.assertEqual(forecast_index, 0) # default forecast_index should be 0
times = np.array([-7200, -3600, 0, 3600, 7200, 9800])
self.datastore.set_times(times)
initial_time = self.datastore.initial_time
self.assertEqual(initial_time, -7200)
self.datastore.set_forecast_index(3)
self.assertEqual(self.datastore.get_forecast_index(), 3)
self.assertEqual(self.datastore.initial_time, 3600)
def test_timeseries(self):
# expect a KeyError when getting a timeseries that has not been set
with self.assertRaises(KeyError):
self.datastore.get_timeseries_values('someNoneExistentVariable')
# expect a RunTimeError when setting timeseries values before setting times
with self.assertRaises(RuntimeError):
self.datastore.set_timeseries_values('myNewVariable', np.array([3.1, 2.4, 2.5]))
self.datastore.set_times(np.array([-3600, 0, 7200]))
expected_values = np.array([3.1, 2.4, 2.5])
self.datastore.set_timeseries_values('myNewVariable', expected_values)
actual_values = self.datastore.get_timeseries_values('myNewVariable')
self.assertTrue(np.array_equal(actual_values, expected_values))
# expect a KeyError when getting timeseries for an ensemble member that doesn't exist
with self.assertRaises(KeyError):
self.datastore.get_timeseries_values('myNewVariable', 1)
expected_values = np.array([1.1, 1.4, 1.5])
self.datastore.set_timeseries_values('ensembleVariable', expected_values, ensemble_member=1)
with self.assertRaises(KeyError):
self.datastore.get_timeseries_values('ensembleVariable', 0)
self.assertTrue(np.array_equal(self.datastore.get_timeseries_values('ensembleVariable', 1), expected_values))
# expect a warning when overwriting a timeseries with check_duplicates=True (default)
new_values = np.array([2.1, 1.1, 0.1])
with self.assertLogs(logger, level='WARN') as cm:
self.datastore.set_timeseries_values('myNewVariable', new_values)
self.assertEqual(cm.output,
['WARNING:rtctools:Attempting to set time series values for ensemble member 0 '
'and variable myNewVariable twice. Ignoring second set of values.'])
self.assertFalse(np.array_equal(self.datastore.get_timeseries_values('myNewVariable'), new_values))
# disable check to allow overwriting old values
self.datastore.set_timeseries_values('myNewVariable', new_values, check_duplicates=False)
self.assertTrue(np.array_equal(self.datastore.get_timeseries_values('myNewVariable'), new_values))
def test_parameters(self):
# expect a KeyError when getting a parameter that has not been set
with self.assertRaises(KeyError):
self.datastore.get_parameter('someNoneExistentParameter')
self.datastore.set_parameter('myNewParameter', 1.4)
self.assertEqual(self.datastore.get_parameter('myNewParameter'), 1.4)
# expect a KeyError when getting parameters for an ensemble member that doesn't exist
with self.assertRaises(KeyError):
self.datastore.get_parameter('myNewParameter', 1)
self.datastore.set_parameter('ensembleParameter', 1.2, ensemble_member=1)
with self.assertRaises(KeyError):
self.datastore.get_parameter('ensembleParameter', 0)
self.assertEqual(self.datastore.get_parameter('ensembleParameter', 1), 1.2)
# expect a warning when overwriting a parameter with check_duplicates=True (default)
with self.assertLogs(logger, level='WARN') as cm:
self.datastore.set_parameter('myNewParameter', 2.5)
self.assertEqual(cm.output,
['WARNING:rtctools:Attempting to set parameter value for ensemble member 0 '
'and name myNewParameter twice. Ignoring second set of values.'])
self.assertEqual(self.datastore.get_parameter('myNewParameter'), 1.4)
# disable check to allow overwriting old values
self.datastore.set_parameter('myNewParameter', 2.2, check_duplicates=False)
self.assertEqual(self.datastore.get_parameter('myNewParameter'), 2.2)
# todo add tests that use newly added methods: get_variables, get_ensemble_size and get_parameter_names()
......@@ -3,7 +3,7 @@ import logging
import numpy as np
from rtctools.optimization.collocated_integrated_optimization_problem import (
CollocatedIntegratedOptimizationProblem,
CollocatedIntegratedOptimizationProblem
)
from rtctools.optimization.csv_mixin import CSVMixin
from rtctools.optimization.modelica_mixin import ModelicaMixin
......@@ -105,7 +105,7 @@ class TestCSVMixin(TestCase):
self.assertAlmostEqual(a, b, self.tolerance)
class TestPIMixinEnsemble(TestCase):
class TestCSVMixinEnsemble(TestCase):
def setUp(self):
self.problem = ModelEnsemble()
self.problem.optimize()
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment