Commit b8da58b3 authored by Anne Hommelberg's avatar Anne Hommelberg

Add NetCDFMixin

Adds a NetCDFMixin to import and export data to
and from NetCDF files.
parent eb5e4b7f
import os
from collections import OrderedDict
from datetime import datetime, timedelta
from typing import Iterable, List, Set, Union
from netCDF4 import Dataset, Variable, chartostring, num2date
import numpy as np
class Stations:
def __init__(self, dataset: Dataset, station_variable: Variable):
self.__station_variable = station_variable
station_dimension = station_variable.dimensions[0]
# todo make this a bit smarter, right now variables like station_name would be forgotten
self.__attribute_variables = {}
for variable_name in dataset.variables:
variable = dataset.variables[variable_name]
if variable != station_variable and variable.dimensions == (station_dimension,):
self.__attribute_variables[variable_name] = variable
self.__attributes = OrderedDict()
for i in range(station_variable.shape[0]):
id = str(chartostring(station_variable[i]))
values = {}
for variable_name in self.__attribute_variables.keys():
values[variable_name] = dataset.variables[variable_name][i]
self.__attributes[id] = values
@property
def station_ids(self) -> Iterable:
"""
:return: An ordered iterable of the station ids (location ids) for which station data is available.
"""
return self.__attributes.keys()
@property
def attributes(self) -> OrderedDict:
"""
:return: An OrderedDict containing dicts containing the values for all station attributes of the input dataset.
"""
return self.__attributes
@property
def attribute_variables(self) -> dict:
"""
:return: A dict containing the station attribute variables of the input dataset.
"""
return self.__attribute_variables
class ImportDataset:
"""
A class used to open and import the data from a NetCDF file.
Uses the NetCDF4 library. Contains various methods for reading the data in the file.
"""
def __init__(self, folder: str, basename: str):
"""
:param folder: Folder the file is located in.
:param basename: Basename of the file, extension ".nc" will be appended to this
"""
# Load the content of a NetCDF file into a Dataset.
self.__filename = os.path.join(
folder,
basename + ".nc"
)
self.__dataset = Dataset(self.__filename)
# Find the time and station id variables
self.__time_variable = self.__find_time_variable()
if self.__time_variable is None:
raise Exception('No time variable found in file ' + self.__filename + '. '
'Please ensure the file contains a time variable with standard_name "time" and axis "T".')
self.__station_variable = self.__find_station_variable()
if self.__station_variable is None:
raise Exception('No station variable found in file ' + self.__filename + '. '
'Please ensure the file contains a variable with cf_role "timeseries_id".')
def __str__(self):
return self.__filename
def __find_time_variable(self) -> Union[Variable, None]:
"""
Find the variable containing the times in the given Dataset.
:param dataset: The Dataset to be searched.
:return: a netCDF4.Variable object of the time variable (or None if none found)
"""
for variable in self.__dataset.variables.values():
if ('standard_name' in variable.ncattrs() and 'axis' in variable.ncattrs()
and variable.standard_name == 'time' and variable.axis == 'T'):
return variable
return None
def __find_station_variable(self) -> Union[Variable, None]:
"""
Find the variable containing station id's (location id's) in the given Dataset.
:param dataset: The Dataset to be searched.
:return: a netCDF4.Variable object of the station id variable (or None if none found)
"""
for variable in self.__dataset.variables.values():
if 'cf_role' in variable.ncattrs() and variable.cf_role == 'timeseries_id':
return variable
return None
def read_import_times(self) -> np.ndarray:
"""
Reads the import times in the time variable of the dataset.
:param time_variable: The time variable containing input times
:return: an array containing the input times as datetime objects
"""
time_values = self.__time_variable[:]
time_unit = self.__time_variable.units
try:
time_calendar = self.__time_variable.calendar
except AttributeError:
time_calendar = u'gregorian'
return num2date(time_values, units=time_unit, calendar=time_calendar)
def read_station_data(self) -> Stations:
return Stations(self.__dataset, self.__station_variable)
def find_timeseries_variables(self) -> List[str]:
"""
Find the keys of all 2d variables with dimensions (station, time) or (time, station),
where station is the dimension of the station_variable and time the dimension of the time_variable.
:param dataset: The Dataset to be searched.
:param station_variable: The station id variable.
:param time_variable: The time variable.
:return: a list of strings containing all keys found.
"""
station_dim = self.__station_variable.dimensions[0]
time_dim = self.__time_variable.dimensions[0]
expected_dims = [(station_dim, time_dim), (time_dim, station_dim)]
timeseries_variables = []
for var_key, variable in self.__dataset.variables.items():
if variable.dimensions in expected_dims:
timeseries_variables.append(var_key)
return timeseries_variables
def read_timeseries_values(self, station_index: int, variable_name: str) -> np.ndarray:
"""
Reads the specified timeseries from the input file.
:param station_index: The index of the station for which the values should be read
:param variable_name: The name of the variable for which the values should be read
:return: an array of values
"""
station_dim = self.__station_variable.dimensions[0]
timeseries_variable = self.__dataset.variables[variable_name]
assert timeseries_variable.dimensions[0] == station_dim or timeseries_variable.dimensions[1] == station_dim
if timeseries_variable.dimensions[0] == station_dim:
values = timeseries_variable[station_index, :]
else:
values = timeseries_variable[:, station_index]
# NetCDF4 reads the values as a numpy masked array,
# convert to a normal array with nan where mask == True
return np.ma.filled(values, np.nan)
@property
def time_variable(self):
return self.__time_variable
@property
def station_variable(self):
return self.__station_variable
class ExportDataset:
"""
A class used to write data to a NetCDF file.
Creates a new file or overwrites an old file. The file metadata will be written upon initialization. Data such
as times, station data and timeseries data should be presented to the ExportDataset through the various methods.
When all data has been written, the close method must be called to flush the changes from local memory to the
actual file on disk.
"""
def __init__(self, folder: str, basename: str):
"""
:param folder: Folder the file will be located in.
:param basename: Basename of the file, extension ".nc" will be appended to this
"""
# Create the file and open a Dataset to access it
self.__filename = os.path.join(
folder,
basename + ".nc"
)
# use same write format as FEWS
self.__dataset = Dataset(self.__filename, mode='w', format='NETCDF3_CLASSIC')
# write metadata to the file
self.__dataset.title = 'RTC-Tools Output Data'
self.__dataset.institution = 'Deltares'
self.__dataset.source = 'RTC-Tools'
self.__dataset.history = 'Generated on {}'.format(datetime.now())
self.__dataset.Conventions = 'CF-1.6'
self.__dataset.featureType = 'timeseries'
# dimensions are created when writing times and station data, must be created before writing variables
self.__time_dim = None
self.__station_dim = None
self.__station_id_to_index_mapping = None
self.__timeseries_variables = {}
def __str__(self):
return self.__filename
def write_times(self, times: np.ndarray, forecast_time: float, forecast_date: datetime) -> None:
"""
Writes a time variable to the given dataset.
:param dataset: The NetCDF4.Dataset object that the times will be written to (must have write permission)
:param times: The times that are to be written in seconds.
:param forecast_time: The forecast time in seconds corresponding to the forecast date
:param forecast_date: The datetime corresponding with time in seconds at the forecast index.
"""
# in a NetCDF file times are written with respect to a reference date
# the written values for the times may never be negative, so use the earliest time as the reference date
reference_date = forecast_date
minimum_time = np.min(times)
if minimum_time < 0:
times = times - minimum_time
reference_date = reference_date - timedelta(seconds=forecast_time - minimum_time)
self.__time_dim = self.__dataset.createDimension('time', None)
time_var = self.__dataset.createVariable('time', 'f8', ('time',))
time_var.standard_name = 'time'
time_var.units = 'seconds since {}'.format(reference_date)
time_var.axis = 'T'
time_var[:] = times
def write_station_data(self, stations: Stations, output_station_ids: Set[str]) -> None:
"""
Writes the station ids and additional station information to the given dataset.
:param stations: The stations data read from the input file.
:param output_station_ids: The set of station ids for which output will be written.
"""
self.__station_dim = self.__dataset.createDimension('station', len(output_station_ids))
# first write the ids
max_id_length = max(len(id) for id in output_station_ids)
self.__dataset.createDimension('char_leng_id', max_id_length)
station_id_var = self.__dataset.createVariable('station_id', 'c', ('station', 'char_leng_id'))
station_id_var.long_name = 'station identification code'
station_id_var.cf_role = 'timeseries_id'
# we must store the index we use for each station id, to be able to write the data at the correct index later
self.__station_id_to_index_mapping = {}
for i, id in enumerate(output_station_ids):
station_id_var[i, :] = list(id)
self.__station_id_to_index_mapping[id] = i
# now write the stored attributes
for var_name, attr_var in stations.attribute_variables.items():
variable = self.__dataset.createVariable(var_name, attr_var.datatype, ('station',))
# copy all attributes from the original input variable
variable.setncatts(attr_var.__dict__)
for station_id in output_station_ids:
if station_id in stations.attributes:
station_index = self.__station_id_to_index_mapping[station_id]
variable[station_index] = stations.attributes[station_id][var_name]
def create_variables(self, variable_names: Set[str]) -> None:
"""
Creates variables in the dataset for each of the provided parameter ids.
The write_times and write_station_data methods must be called first, to ensure the necessary dimensions have
already been created in the output NetCDF file.
:param variable_names: The parameter ids for which variables must be created.
"""
assert self.__time_dim is not None, 'First call write_times to ensure the time dimension has been created.'
assert self.__station_dim is not None, 'First call write_station_data to ensure ' \
'the station dimension has been created'
assert self.__station_id_to_index_mapping is not None # should also be created in write_station_data
for variable_name in variable_names:
self.__dataset.createVariable(variable_name, 'f8', ('time', 'station'), fill_value=np.nan)
def write_output_values(self, station_id: str, variable_name: str, values: np.ndarray) -> None:
"""
Writes the given data to the dataset. The variable must have already been created through the
create_variables method. After all calls to write_output_values, the close method must be called to flush all
changes.
:param station_id: The id of the station the data is written for.
:param variable_name: The name of the variable the data is written to (must have already been created).
:param values: The values that are to be written to the file
"""
assert self.__station_id_to_index_mapping is not None, 'First call write_station_data and create_variables.'
station_index = self.__station_id_to_index_mapping[station_id]
self.__dataset.variables[variable_name][:, station_index] = values
def close(self) -> None:
"""
Closes the NetCDF4 Dataset to ensure all changes made are written to the file.
This method must be called after writing all data through the various write method.
"""
self.__dataset.close()
import logging
import os
import rtctools.data.netcdf as netcdf
from rtctools.data import rtc
from rtctools.optimization.io_mixin import IOMixin
logger = logging.getLogger("rtctools")
# todo add support for ensembles
class NetCDFMixin(IOMixin):
"""
Adds NetCDF I/O to your optimization problem.
During preprocessing, a file named timeseries_import.nc is read from the ``input`` subfolder.
During postprocessing a file named timeseries_export.nc is written to the ``output`` subfolder.
Both the input and output nc files are expected to follow the FEWS format for scalar data in a Netcdf file, i.e.:
- They must contain a variable with the station id's (location id's) which can be recognized by the attribute
'cf_role' set to 'timeseries_id'.
- They must contain a time variable with attributes 'standard_name' = 'time' and 'axis' = 'T'
From the input file, all 2d variables with dimensions equal to the station id's and time variable are read.
To determine the rtc-tools variable name, the NetCDF mixin uses the station id (location id) and name of the
timeseries variable in the file (parameter). An rtcDataConfig.xml file can be given in the input folder to
configure variable names for specific location and parameter combinations. If this file is present, and contains
a configured variable name for a read timeseries, this variable name will be used. If the file is present, but does
not contain a configured variable name, a default variable name is constructed and a warning is given to alert the
user that the current rtcDataConfig may contain a mistake. To suppress this warning if this is intentional, set the
check_missing_variable_names attribute to False. Finally, if no file is present, the default variable name will
always be used, and no warnings will be given. With debug logging enabled, the NetCDF mixin will report the chosen
variable name for each location and parameter combination.
To construct the default variable name, the station id is concatenated with the name of the variable in the NetCDF
file, separted by the location_parameter_delimeter (set to a double underscore - '__' - by default). For example,
if a NetCDF file contains two stations 'loc_1' and 'loc_2', and a timeseries variable called 'water_level', this
will result in two rtc-tools variables called 'loc_1__water_level' and 'loc_2__water_level' (with the default
location_parameter_delimiter of '__').
:cvar location_parameter_delimiter:
Delimiter used between location and parameter id when constructing the variable name.
:cvar check_missing_variable_names:
Warn if an rtcDataConfig.xml file is given but does not contain a variable name for a read timeseries.
Default is ``True``
:cvar netcdf_validate_timeseries:
Check consistency of timeseries. Default is ``True``
"""
#: Delimiter used between location and parameter id when constructing the variable name.
location_parameter_delimiter = '__'
#: Warn if an rtcDataConfig.xml file is given but does not contain a variable name for a read timeseries.
check_missing_variable_names = True
#: Check consistency of timeseries.
netcdf_validate_timeseries = True
def __init__(self, **kwargs):
# call parent class for default behaviour
super().__init__(**kwargs)
path = os.path.join(self._input_folder, "rtcDataConfig.xml")
self.__data_config = rtc.DataConfig(self._input_folder) if os.path.isfile(path) else None
def read(self):
# Call parent class first for default behaviour
super().read()
dataset = netcdf.ImportDataset(self._input_folder, self.timeseries_import_basename)
# convert and store the import times
self.__import_datetimes = dataset.read_import_times()
times = self.io.datetime_to_sec(self.__import_datetimes, self.__import_datetimes[self.io.get_forecast_index()])
self.io.set_times(times)
if self.netcdf_validate_timeseries:
# check if strictly increasing
for i in range(len(times) - 1):
if times[i] >= times[i + 1]:
raise Exception('NetCDFMixin: Time stamps must be strictly increasing.')
self.__dt = times[1] - times[0] if len(times) >= 2 else 0
for i in range(len(times) - 1):
if times[i + 1] - times[i] != self.__dt:
self.__dt = None
break
# store the station data for later use
self.__stations = dataset.read_station_data()
# read all available timeseries from the dataset
timeseries_var_keys = dataset.find_timeseries_variables()
# todo add support for ensembles
for parameter in timeseries_var_keys:
for i, location_id in enumerate(self.__stations.station_ids):
default_name = location_id + self.location_parameter_delimiter + parameter
if self.__data_config is not None:
try:
name = self.__data_config.parameter(parameter, location_id)
except KeyError:
if self.check_missing_variable_names:
logger.warning('No configured variable name found in rtcDataConfig.xml for location id "{}"'
' and parameter id "{}", using default variable name "{}" instead. '
'(To suppress this warning set check_missing_variable_names to False.)'
.format(location_id, parameter, default_name))
name = default_name
else:
name = default_name
values = dataset.read_timeseries_values(i, parameter)
self.io.set_timeseries_values(name, values)
logger.debug('Read timeseries data for location id "{}" and parameter "{}", '
'stored under variable name "{}"'
.format(location_id, parameter, name))
logger.debug("NetCDFMixin: Read timeseries")
def write(self):
dataset = netcdf.ExportDataset(self._output_folder, self.timeseries_export_basename)
times = self.times()
forecast_index = self.io.get_forecast_index()
dataset.write_times(times, self.initial_time, self.__import_datetimes[forecast_index])
output_variables = [sym.name() for sym in self.output_variables]
output_location_parameter_ids = {var_name: self.extract_station_id(var_name) for var_name in output_variables}
output_station_ids = {loc_par[0] for loc_par in output_location_parameter_ids.values()}
dataset.write_station_data(self.__stations, output_station_ids)
output_parameter_ids = {loc_par[1] for loc_par in output_location_parameter_ids.values()}
dataset.create_variables(output_parameter_ids)
for ensemble_member in range(self.ensemble_size):
results = self.extract_results(ensemble_member)
for var_name in output_variables:
# determine the output values
try:
values = results[var_name]
if len(values) != len(times):
values = self.interpolate(
times, self.times(var_name), values, self.interpolation_method(var_name))
except KeyError:
try:
ts = self.get_timeseries(var_name, ensemble_member)
if len(ts.times) != len(times):
values = self.interpolate(
times, ts.times, ts.values)
else:
values = ts.values
except KeyError:
logger.error(
'NetCDFMixin: Output requested for non-existent variable {}. '
'Will not be in output file.'.format(var_name))
continue
# determine where to put this output
location_parameter_id = output_location_parameter_ids[var_name]
location_id = location_parameter_id[0]
parameter_id = location_parameter_id[1]
dataset.write_output_values(location_id, parameter_id, values)
dataset.close()
def extract_station_id(self, variable_name: str) -> tuple:
"""
Returns the station id corresponding to the given RTC-Tools variable name.
:param variable_name: The name of the RTC-Tools variable
:return: the station id
"""
try:
return self.__data_config.pi_variable_ids(variable_name)[:2]
except KeyError:
return tuple(variable_name.split(self.location_parameter_delimiter))
@property
def equidistant(self):
return self.__dt is not None
import os
from datetime import datetime, timedelta
from unittest import TestCase
from netCDF4 import Dataset
import numpy as np
import rtctools.data.netcdf as netcdf
from .data_path import data_path
class TestImportDataset(TestCase):
def setUp(self):
self.dataset = netcdf.ImportDataset(data_path(), 'timeseries_import')
def test_init(self):
time_var = self.dataset.time_variable
self.assertEqual(time_var._name, 'time')
self.assertEqual(time_var.standard_name, 'time')
self.assertEqual(time_var.long_name, 'time')
self.assertEqual(time_var.axis, 'T')
self.assertEqual(time_var.units, 'minutes since 1970-01-01 00:00:00.0 +0000')
station_var = self.dataset.station_variable
self.assertEqual(station_var._name, 'station_id')
self.assertEqual(station_var.long_name, 'station identification code')
self.assertEqual(station_var.cf_role, 'timeseries_id')
def test_read_times(self):
datetimes = self.dataset.read_import_times()
forecast_datetime = datetime(2013, 1, 15)
expected_datetimes = [forecast_datetime + timedelta(hours=i) for i in range(25)]
self.assertTrue(np.array_equal(datetimes, expected_datetimes))
def test_find_timeseries_variables(self):
variables = self.dataset.find_timeseries_variables()
self.assertEqual(variables, ['waterlevel'])
def test_stations(self):
stations = self.dataset.read_station_data()
ids = stations.station_ids
self.assertEqual(len(ids), 3)
self.assertTrue('LocA' in ids)
self.assertTrue('LocB' in ids)
self.assertTrue('LocC' in ids)
for id in ids:
read_attributes = stations.attributes[id].keys()
self.assertTrue(len(read_attributes), 5)
self.assertTrue('lat' in read_attributes)
self.assertTrue('lon' in read_attributes)
self.assertTrue('x' in read_attributes)
self.assertTrue('y' in read_attributes)
self.assertTrue('z' in read_attributes)
self.assertEqual(stations.attributes['LocA']['lat'], 53.0)
class TestExportDataset(TestCase):
def get_exported_dataset(self):
filename = os.path.join(
data_path(),
'timeseries_export.nc'
)
return Dataset(filename)
def setUp(self):
self.dataset = netcdf.ExportDataset(data_path(), 'timeseries_export')
def test_write_times(self):
times = np.array([-120, -300, -60, 300, 360])
self.dataset.write_times(times, -180.0, datetime(2018, 12, 21, 17, 30))
self.dataset.close()
dataset = self.get_exported_dataset()
self.assertTrue('time' in dataset.variables)
time_var = dataset.variables['time']
self.assertEqual(time_var.units, 'seconds since 2018-12-21 17:28:00')
self.assertEqual(time_var.axis, 'T')
self.assertEqual(time_var.standard_name, 'time')
self.assertTrue(np.array_equal(time_var[:], times + 300))
# todo create tests for write_station_data, create_variables and write_output_values
model NetcdfModel
Real loc_a__x(start=1.1);
Real loc_a__w(start=0.0);
Real alias;
parameter Real k = 1.0;
input Real loc_b__u(fixed=false);
output Real loc_c__y;
output Real loc_a__z;
input Real loc_a__x_delayed(fixed=false);
output Real loc_c__switched;
input Real loc_a__constant_input(fixed=true);
output Real loc_a__constant_output;
equation
der(loc_a__x) = k * loc_a__x + loc_b__u;
der(loc_a__w) = loc_a__x;
alias = loc_a__x;
loc_c__y + loc_a__x = 3.0;
loc_a__z = alias^2 + sin(time);
loc_a__x_delayed = delay(loc_a__x, 0.1);
if loc_a__x > 0.5 then
loc_c__switched = 1.0;
else
loc_c__switched = 2.0;
end if;
loc_a__constant_output = loc_a__constant_input;
end NetcdfModel;
\ No newline at end of file
import os
from unittest import TestCase
from netCDF4 import Dataset, chartostring
import numpy as np
import numpy.ma as ma
from rtctools.optimization.collocated_integrated_optimization_problem import CollocatedIntegratedOptimizationProblem
from rtctools.optimization.modelica_mixin import ModelicaMixin
from rtctools.optimization.netcdf_mixin import NetCDFMixin
from .data_path import data_path
class NetcdfModel(NetCDFMixin, ModelicaMixin, CollocatedIntegratedOptimizationProblem):
def __init__(self):
super().__init__(
input_folder=data_path(),
output_folder=data_path(),
model_name="NetcdfModel",
model_folder=data_path()
)
def read(self):
super().read()
# just add the parameters ourselves for now (values taken from test_pi_mixin)
params = {'k': 1.01, 'x': 1.02, 'SV_V_y': 22.02, 'j': 12.01, 'b': 13.01, 'y': 12.02, 'SV_H_y': 22.02}
for key, value in params.items():
self.io.set_parameter(key, value)
def objective(self, ensemble_member):
# Quadratic penalty on state 'x' at final time
xf = self.state_at("loc_a__x", self.times()[-1])
f = xf ** 2
return f
def constraints(self, ensemble_member):
# No additional constraints
return []
def compiler_options(self):
compiler_options = super().compiler_options()
compiler_options["cache"] = False
return compiler_options
class TestNetCDFMixin(TestCase):
def setUp(self):
self.problem = NetcdfModel()
self.tolerance = 1e-5
def test_read(self):
self.problem.read()
datastore = self.problem.io
self.assertTrue(np.all(datastore.get_timeseries_values('loc_a__u_min') == -3.0))
self.assertTrue(np.all(datastore.get_timeseries_values('loc_b__u_min') == -2.0))
self.assertTrue(np.all(datastore.get_timeseries_values('loc_a__u_max') == 3.0))
self.assertTrue(np.all(datastore.get_timeseries_values('loc_b__u_max') == 2.0))
expected_values = np.zeros((22,), dtype=float)
expected_values[0] = 1.02
expected_values[2] = 0.03
self.assertTrue(np.array_equal(datastore.get_timeseries_values('loc_a__x'), expected_values))