pi_mixin.py 19.9 KB
Newer Older
1 2 3 4
from datetime import timedelta
from casadi import MX
import numpy as np
import logging
5
import bisect
6 7 8 9

import rtctools.data.rtc as rtc
import rtctools.data.pi as pi

Jorn Baayen's avatar
Jorn Baayen committed
10 11 12 13
from .optimization_problem import OptimizationProblem
from .timeseries import Timeseries
from .alias_tools import AliasDict
from .caching import cached
14 15 16 17 18 19 20 21 22 23 24 25 26 27

logger = logging.getLogger("rtctools")


class PIMixin(OptimizationProblem):
    """
    Adds `Delft-FEWS Published Interface <https://publicwiki.deltares.nl/display/FEWSDOC/The+Delft-Fews+Published+Interface>`_ I/O to your optimization problem.

    During preprocessing, files named ``rtcDataConfig.xml``, ``timeseries_import.xml``, ``rtcParameterConfig.xml``,
    and ``rtcParameterConfig_Numerical.xml`` are read from the ``input`` subfolder.  ``rtcDataConfig.xml`` maps
    tuples of FEWS identifiers, including location and parameter ID, to RTC-Tools time series identifiers.

    During postprocessing, a file named ``timeseries_export.xml`` is written to the ``output`` subfolder.

28 29 30 31
    :cvar pi_binary_timeseries: Whether to use PI binary timeseries format.  Default is ``False``.
    :cvar pi_parameter_config_basenames: List of parameter config file basenames to read. Default is [``rtcParameterConfig``].
    :cvar pi_parameter_config_numerical_basename: Numerical config file basename to read. Default is ``rtcParameterConfig_Numerical``.
    :cvar pi_check_for_duplicate_parameters: Check if duplicate parameters are read. Default is ``False``.
32
    :cvar pi_validate_timeseries: Check consistency of timeseries.  Default is ``True``.
33 34 35 36 37
    """

    #: Whether to use PI binary timeseries format
    pi_binary_timeseries = False

38 39 40
    #: Location of rtcParameterConfig files
    pi_parameter_config_basenames           = ['rtcParameterConfig']
    pi_parameter_config_numerical_basename  = 'rtcParameterConfig_Numerical'
41

42 43 44
    #: Check consistency of timeseries
    pi_validate_timeseries = True

45 46 47
    #: Check for duplicate parameters
    pi_check_for_duplicate_parameters = True

48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64
    def __init__(self, **kwargs):
        # Check arguments
        assert('input_folder' in kwargs)
        assert('output_folder' in kwargs)

        # Save arguments
        self._input_folder = kwargs['input_folder']
        self._output_folder = kwargs['output_folder']

        # Load rtcDataConfig.xml.  We assume this file does not change over the
        # life time of this object.
        self._data_config = rtc.DataConfig(self._input_folder)

        # Additional output variables
        self._output_timeseries = set()

        # Call parent class first for default behaviour.
65
        super().__init__(**kwargs)
66 67 68

    def pre(self):
        # Call parent class first for default behaviour.
69
        super().pre()
70

71 72
        # rtcParameterConfig
        self._parameter_config = []
73
        try:
74 75 76
            for pi_parameter_config_basename in self.pi_parameter_config_basenames:
                self._parameter_config.append(pi.ParameterConfig(
                    self._input_folder, pi_parameter_config_basename))
77 78
        except IOError:
            raise Exception(
79
                "PI: {}.xml not found in {}.".format(pi_parameter_config_basename, self._input_folder))
80 81 82

        try:
            self._parameter_config_numerical = pi.ParameterConfig(
83
                  self._input_folder, self.pi_parameter_config_numerical_basename)
84 85 86 87 88 89 90 91 92 93
        except IOError:
            self._parameter_config_numerical = None

        # timeseries_{import,export}.xml. rtcDataConfig can override (if not
        # falsy)
        basename_import = self._data_config._basename_import or 'timeseries_import'
        basename_export = self._data_config._basename_export or 'timeseries_export'

        try:
            self._timeseries_import = pi.Timeseries(
94
                self._data_config, self._input_folder, basename_import, binary=self.pi_binary_timeseries, pi_validate_times=self.pi_validate_timeseries)
95 96 97 98 99
        except IOError:
            raise Exception("PI: {}.xml not found in {}.".format(
                basename_import, self._input_folder))

        self._timeseries_export = pi.Timeseries(
100
            self._data_config, self._output_folder, basename_export, binary=self.pi_binary_timeseries, pi_validate_times=False, make_new_file=True)
101 102 103 104 105

        # Convert timeseries timestamps to seconds since t0 for internal use
        self._timeseries_import_times = self._datetime_to_sec(
            self._timeseries_import.times)

106 107 108 109 110 111 112 113 114 115 116 117 118 119
        # Timestamp check
        if self.pi_validate_timeseries:
            for i in range(len(self._timeseries_import_times) - 1):
                if self._timeseries_import_times[i] >= self._timeseries_import_times[i + 1]:
                    raise Exception(
                        'PIMixin: Time stamps must be strictly increasing.')

        if self.equidistant:
            # Check if the timeseries are truly equidistant
            if self.pi_validate_timeseries:
                dt = self._timeseries_import_times[
                    1] - self._timeseries_import_times[0]
                for i in range(len(self._timeseries_import_times) - 1):
                    if self._timeseries_import_times[i + 1] - self._timeseries_import_times[i] != dt:
120
                        raise Exception('PIMixin: Expecting equidistant timeseries, the time step towards {} is not the same as the time step(s) before. Set unit to nonequidistant if this is intended.'.format(
121 122
                            self._timeseries_import.times[i + 1]))

123 124 125
        # Stick timeseries into an AliasDict
        self._timeseries_import_dict = [AliasDict(self.alias_relation) for ensemble_member in range(self.ensemble_size)]
        for ensemble_member in range(self.ensemble_size):
126
            for key, value in self._timeseries_import.items(ensemble_member):
127 128
                self._timeseries_import_dict[ensemble_member][key] = value

129
    @cached
130
    def times(self, variable=None):
131
        return self._timeseries_import_times[self._timeseries_import.forecast_index:]
132 133 134

    @property
    def equidistant(self):
135
        if self._timeseries_import._dt != None:
136 137 138
            return True
        else:
            return False
139 140 141

    def solver_options(self):
        # Call parent
142
        options = super().solver_options()
143 144

        # Only do this if we have a rtcParameterConfig_Numerical
145
        if self._parameter_config_numerical is None:
146 147 148
            return options

        # Load solver options from parameter config
149
        for location_id, model, option, value in self._parameter_config_numerical:
150 151 152 153 154
            options[option] = value

        # Done
        return options

155
    @cached
156 157
    def parameters(self, ensemble_member):
        # Call parent class first for default values.
158
        parameters = super().parameters(ensemble_member)
159 160

        # Load parameters from parameter config
161 162 163 164 165 166 167 168
        for parameter_config in self._parameter_config:
            for location_id, model_id, parameter_id, value in parameter_config:
                try:
                    parameter = self._data_config.parameter(parameter_id, location_id, model_id)
                except KeyError:
                    parameter = parameter_id

                if self.pi_check_for_duplicate_parameters:
169 170
                    if parameter in parameters.keys():
                        logger.warning("PIMixin: parameter {} defined in file {} was already present. Using value {}.".format(parameter, parameter_config._path_xml, value))
171 172

                parameters[parameter] = value
173 174 175 176

        # Done
        return parameters

177
    @cached
178 179
    def constant_inputs(self, ensemble_member):
        # Call parent class first for default values.
180
        constant_inputs = super().constant_inputs(ensemble_member)
181 182 183

        # Load bounds from timeseries
        for variable in self.dae_variables['constant_inputs']:
Jorn Baayen's avatar
Jorn Baayen committed
184
            variable = variable.name()
185 186 187 188 189 190 191 192 193 194 195
            try:
                timeseries = Timeseries(
                    self._timeseries_import_times, self._timeseries_import_dict[ensemble_member][variable])
            except KeyError:
                pass
            else:
                if np.any(np.isnan(timeseries.values[self._timeseries_import.forecast_index:])):
                    raise Exception("Constant input {} contains NaN".format(variable))
                constant_inputs[variable] = timeseries
                if logger.getEffectiveLevel() == logging.DEBUG:
                    logger.debug("Read constant input {}".format(variable))
196 197
        return constant_inputs

198
    @cached
199 200
    def bounds(self):
        # Call parent class first for default values.
201
        bounds = super().bounds()
202 203 204

        # Load bounds from timeseries
        for variable in self.dae_variables['free_variables']:
Jorn Baayen's avatar
Jorn Baayen committed
205
            variable = variable.name()
206

207
            m, M = None, None
208

209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228
            timeseries_id = self.min_timeseries_id(variable)
            try:
                m = self._timeseries_import_dict[0][timeseries_id][
                    self._timeseries_import.forecast_index:]
            except KeyError:
                pass
            else:
                if logger.getEffectiveLevel() == logging.DEBUG:
                    logger.debug("Read lower bound for variable {}".format(variable))

            timeseries_id = self.max_timeseries_id(variable)
            try:
                M = self._timeseries_import_dict[0][timeseries_id][
                    self._timeseries_import.forecast_index:]
            except KeyError:
                pass
            else:
                if logger.getEffectiveLevel() == logging.DEBUG:
                    logger.debug("Read upper bound for variable {}".format(variable))

229 230 231 232 233 234 235 236 237 238 239 240
            # Replace NaN with +/- inf, and create Timeseries objects
            if m != None:
                m[np.isnan(m)] = np.finfo(m.dtype).min
                m = Timeseries(self._timeseries_import_times[
                               self._timeseries_import.forecast_index:], m)
            if M != None:
                M[np.isnan(M)] = np.finfo(M.dtype).max
                M = Timeseries(self._timeseries_import_times[
                               self._timeseries_import.forecast_index:], M)

            # Store
            if m != None or M != None:
241
                bounds[variable] = (m, M)
242 243
        return bounds

244
    @cached
245 246
    def history(self, ensemble_member):
        # Load history
247
        history = AliasDict(self.alias_relation)
248
        for variable in self.dae_variables['states'] + self.dae_variables['algebraics'] + self.dae_variables['control_inputs'] + self.dae_variables['constant_inputs']:
Jorn Baayen's avatar
Jorn Baayen committed
249
            variable = variable.name()
250 251 252 253 254 255 256 257
            try:
                history[variable] = Timeseries(self._timeseries_import_times[:self._timeseries_import.forecast_index + 1],
                                                        self._timeseries_import_dict[ensemble_member][variable][:self._timeseries_import.forecast_index + 1])
            except KeyError:
                pass
            else:
                if logger.getEffectiveLevel() == logging.DEBUG:
                    logger.debug("Read history for state {}".format(variable))
258 259 260 261 262 263
        return history

    @property
    def initial_time(self):
        return 0.0

264
    @cached
265 266
    def initial_state(self, ensemble_member):
        history = self.history(ensemble_member)
267
        return AliasDict(self.alias_relation, {variable: timeseries.values[-1] for variable, timeseries in history.items()})
268

269
    @cached
270 271
    def seed(self, ensemble_member):
        # Call parent class first for default values.
272
        seed = super().seed(ensemble_member)
273 274 275

        # Load seeds
        for variable in self.dae_variables['free_variables']:
Jorn Baayen's avatar
Jorn Baayen committed
276
            variable = variable.name()
277 278 279 280 281 282 283 284 285 286
            try:
                s = Timeseries(self._timeseries_import_times, self._timeseries_import_dict[ensemble_member][variable])
            except KeyError:
                pass
            else:
                if logger.getEffectiveLevel() == logging.DEBUG:
                    logger.debug("Seeded free variable {}".format(variable))
                # A seeding of NaN means no seeding
                s.values[np.isnan(s.values)] = 0.0
                seed[variable] = s
287 288 289 290
        return seed

    def post(self):
        # Call parent class first for default behaviour.
291
        super().post()
292

293 294 295 296 297 298 299 300 301 302 303 304 305 306
        # Start of write output
        # Write the time range for the export file.
        self._timeseries_export._times = self._timeseries_import.times[self._timeseries_import.forecast_index:]

        # Write other time settings
        self._timeseries_export._start_datetime = self._timeseries_import._forecast_datetime
        self._timeseries_export._end_datetime  = self._timeseries_import._end_datetime
        self._timeseries_export._forecast_datetime  = self._timeseries_import._forecast_datetime
        self._timeseries_export._dt = self._timeseries_import._dt
        self._timeseries_export._timezone = self._timeseries_import._timezone

        # Write the ensemble properties for the export file.
        self._timeseries_export._ensemble_size = self.ensemble_size
        self._timeseries_export._contains_ensemble = self._timeseries_import.contains_ensemble
307
        while self._timeseries_export._ensemble_size > len(self._timeseries_export._values):
308 309
            self._timeseries_export._values.append({})

310 311 312
        # Transfer units from import timeseries
        self._timeseries_export._units = self._timeseries_import._units

313
        # Start looping over the ensembles for extraction of the output values.
314 315 316 317
        times = self.times()
        for ensemble_member in range(self.ensemble_size):
            results = self.extract_results(ensemble_member)

318 319
            # For all variables that are output variables the values are
            # extracted from the results.
Jorn Baayen's avatar
Jorn Baayen committed
320
            for variable in [sym.name() for sym in self.output_variables]:
321 322 323 324
                try:
                    values = results[variable]
                    if len(values) != len(times):
                        values = self.interpolate(
325
                            times, self.times(variable), values, self.interpolation_method(variable))
326 327 328 329 330 331 332 333 334 335
                except KeyError:
                    try:
                        ts = self.get_timeseries(variable, ensemble_member)
                        if len(ts.times) != len(times):
                            values = self.interpolate(
                                times, ts.times, ts.values)
                        else:
                            values = ts.values
                    except KeyError:
                        logger.error(
336
                            'PIMixin: Output requested for non-existent variable {}. Will not be in output file.'.format(variable))
337 338
                        continue

339
                # Check if ID mapping is present
340
                try:
Olav van Duin's avatar
Olav van Duin committed
341
                    location_parameter_id = self._timeseries_export._data_config.pi_variable_ids(variable)
342
                except KeyError:
343
                    logger.debug('PIMixIn: variable {} has no mapping defined in rtcDataConfig so cannot be added to the output file.'.format(variable))
344 345 346
                    continue

                # Add series to output file
347
                self._timeseries_export.set(variable, values, ensemble_member=ensemble_member)
348

349
        # Write output file to disk
350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366
        self._timeseries_export.write()

    def _datetime_to_sec(self, d):
        # Return the date/timestamps in seconds since t0.
        if hasattr(d, '__iter__'):
            return np.array([(t - self._timeseries_import.forecast_datetime).total_seconds() for t in d])
        else:
            return (d - self._timeseries_import.forecast_datetime).total_seconds()

    def _sec_to_datetime(self, s):
        # Return the date/timestamps in seconds since t0 as datetime objects.
        if hasattr(s, '__iter__'):
            return [self._timeseries_import.forecast_datetime + timedelta(seconds=t) for t in s]
        else:
            return self._timeseries_import.forecast_datetime + timedelta(seconds=s)

    def get_timeseries(self, variable, ensemble_member=0):
367
        return Timeseries(self._timeseries_import_times, self._timeseries_import_dict[ensemble_member][variable])
368

369
    def set_timeseries(self, variable, timeseries, ensemble_member=0, output=True, check_consistency=True, unit=None):
370 371 372

        def stretch_values(values, t_pos):
            # Construct a values range with preceding and possibly following nans
373
            new_values = np.full_like(self._timeseries_import_times, np.nan)
374 375 376
            new_values[t_pos:] = values
            return new_values

377 378
        if output:
            self._output_timeseries.add(variable)
379

380
        if isinstance(timeseries, Timeseries):
381 382 383 384 385
            try:
                assert(len(timeseries.values) == len(timeseries.times))
            except AssertionError:
                raise Exception("PI: Trying to set timeseries with times and values that are of different length (lengths of {} and {}, respectively).".format(len(timeseries.times), len(timeseries.values)))

386 387 388
            if not np.array_equal(self._timeseries_import_times, timeseries.times):
                if check_consistency:
                    if not set(self._timeseries_import_times).issuperset(timeseries.times):
Olav van Duin's avatar
Olav van Duin committed
389
                        raise Exception("PI: Trying to set/append timeseries {} with different times (in seconds) than the imported timeseries. Please make sure the timeseries covers startDate through endDate of the longest imported timeseries with timestep {}..".format(variable, self._timeseries_import._dt))
390 391 392 393 394 395 396 397 398 399

                # Determine position of first times of added timeseries within the
                # import times. For this we assume that both time ranges are ordered,
                # and that the times of the added series is a subset of the import
                # times.
                t_pos = bisect.bisect_left(self._timeseries_import_times, timeseries.times[0])

                # Construct a new values range and reconstruct the Timeseries object
                timeseries = Timeseries(self._timeseries_import_times, stretch_values(timeseries.values, t_pos))

400
        else:
401 402 403 404
            if check_consistency:
                try:
                    assert(len(self.times()) == len(timeseries))
                except AssertionError:
Olav van Duin's avatar
Olav van Duin committed
405
                    raise Exception("PI: Trying to set/append values {} with a different length than the forecast length. Please make sure the values cover forecastDate through endDate with timestep {}.".format(variable, self._timeseries_import._dt))
406 407 408

            # If times is not supplied with the timeseries, we add the
            # forecast times range to a new Timeseries object. Hereby
Olav van Duin's avatar
Olav van Duin committed
409
            # we assume that the supplied values stretch from T0 to end.
410 411 412 413 414
            t_pos = self.get_forecast_index()

            # Construct a new values range and construct the Timeseries objet
            timeseries = Timeseries(self._timeseries_import_times, stretch_values(timeseries, t_pos))

415
        if unit is None:
Olav van Duin's avatar
Olav van Duin committed
416
            unit = self._timeseries_import._get_unit(variable, ensemble_member=ensemble_member)
417
        self._timeseries_import.set(
418
            variable, timeseries.values, ensemble_member=ensemble_member, unit=unit)
419
        self._timeseries_import_dict[ensemble_member][variable] = timeseries.values
420 421 422 423 424

    def get_forecast_index(self):
        return self._timeseries_import.forecast_index

    def timeseries_at(self, variable, t, ensemble_member=0):
425
        return self.interpolate(t, self._timeseries_import_times, self._timeseries_import_dict[ensemble_member][variable])
426

427 428
    @property
    def ensemble_size(self):
429
        return self._timeseries_import.ensemble_size
430

431 432
    @property
    def output_variables(self):
433
        variables = super().output_variables
434 435 436 437 438 439 440 441 442 443 444
        variables.extend([MX.sym(variable)
                          for variable in self._output_timeseries])
        return variables

    def min_timeseries_id(self, variable):
        """
        Returns the name of the lower bound timeseries for the specified variable.

        :param variable: Variable name
        """
        return '_'.join((variable, 'Min'))
445

446 447 448 449 450 451
    def max_timeseries_id(self, variable):
        """
        Returns the name of the upper bound timeseries for the specified variable.

        :param variable: Variable name
        """
452
        return '_'.join((variable, 'Max'))