Commit 68380968 authored by Jesse VanderWees's avatar Jesse VanderWees 🐘

Flake8 fixes for tests

parent f13a19dc
......@@ -22,17 +22,3 @@ exclude =
# TODO files with known flake8 issues
# remove these ignores once they are fixed
src/rtctools/data/interpolation/__init__.py
tests/data/test_csv.py
tests/data/test_data.py
tests/data/test_interpolate.py
tests/data_path.py
tests/optimization/test_control_tree_mixin.py
tests/optimization/test_csv_mixin.py
tests/optimization/test_goal_programming.py
tests/optimization/test_integration.py
tests/optimization/test_modelica_mixin.py
tests/optimization/test_pi_mixin.py
tests/simulation/data_path.py
tests/simulation/test_pi_mixin.py
tests/simulation/test_simulation.py
tests/test_case.py
......@@ -7,4 +7,6 @@ def local_function():
def data_path():
return os.path.join(os.path.dirname(os.path.abspath(inspect.getsourcefile(local_function))), 'data')
return os.path.join(
os.path.dirname(os.path.abspath(inspect.getsourcefile(local_function))), "data"
)
import os
from unittest import TestCase
import rtctools.data.csv as csv
import os
import numpy as np
from datetime import datetime
from .data_path import data_path
......@@ -14,22 +12,46 @@ class TestCSV(TestCase):
pass
def test_semicolon_separated(self):
self.lookup_table_0 = csv.load(os.path.join(
data_path(), 'look_up_table_0.csv'), delimiter=';', with_time=False)
self.timeseries_import_0 = csv.load(os.path.join(
data_path(), 'timeseries_import_0.csv'), delimiter=';', with_time=True)
self.lookup_table_1 = csv.load(os.path.join(
data_path(), 'look_up_table_1.csv'), delimiter=';', with_time=False)
self.timeseries_import_1 = csv.load(os.path.join(
data_path(), 'timeseries_import_1.csv'), delimiter=';', with_time=True)
self.initial_state_0 = csv.load(os.path.join(
data_path(), 'initial_state_0.csv'), delimiter=';', with_time=False)
self.initial_state_1 = csv.load(os.path.join(
data_path(), 'initial_state_1.csv'), delimiter=';', with_time=False)
self.lookup_table_0 = csv.load(
os.path.join(data_path(), "look_up_table_0.csv"),
delimiter=";",
with_time=False,
)
self.timeseries_import_0 = csv.load(
os.path.join(data_path(), "timeseries_import_0.csv"),
delimiter=";",
with_time=True,
)
self.lookup_table_1 = csv.load(
os.path.join(data_path(), "look_up_table_1.csv"),
delimiter=";",
with_time=False,
)
self.timeseries_import_1 = csv.load(
os.path.join(data_path(), "timeseries_import_1.csv"),
delimiter=";",
with_time=True,
)
self.initial_state_0 = csv.load(
os.path.join(data_path(), "initial_state_0.csv"),
delimiter=";",
with_time=False,
)
self.initial_state_1 = csv.load(
os.path.join(data_path(), "initial_state_1.csv"),
delimiter=";",
with_time=False,
)
def test_comma_separated(self):
self.lookup_table_2 = csv.load(os.path.join(
data_path(), 'look_up_table_2.csv'), delimiter=',', with_time=False)
self.timeseries_import_3 = csv.load(os.path.join(
data_path(), 'timeseries_import_2.csv'), delimiter=',', with_time=True)
self.lookup_table_2 = csv.load(
os.path.join(data_path(), "look_up_table_2.csv"),
delimiter=",",
with_time=False,
)
self.timeseries_import_3 = csv.load(
os.path.join(data_path(), "timeseries_import_2.csv"),
delimiter=",",
with_time=True,
)
This diff is collapsed.
......@@ -16,10 +16,10 @@ from .data_path import data_path
class TestDiag(TestCase):
def setUp(self):
self.diag = pi.Diag(data_path(), 'diag')
self.diag = pi.Diag(data_path(), "diag")
def test_read_errors(self):
self.diag = pi.Diag(data_path(), 'diag')
self.diag = pi.Diag(data_path(), "diag")
diag_lines = self.diag.get(self.diag.INFO)
for child in diag_lines:
print(child.tag)
......
from unittest import TestCase, expectedFailure
from unittest import TestCase
from casadi import SX, Function, linspace
from casadi import Function, SX, linspace
import numpy as np
import rtctools.data.interpolation.bspline1d
import rtctools.data.interpolation.bspline2d
import os
import sys
class TestBSpline1DFit(TestCase):
# Only tests for defaut case, k = 3
......@@ -24,10 +22,12 @@ class TestBSpline1DFit(TestCase):
def _y_list(self, monotonicity=0, curvature=0):
y_list = np.empty(self.num_test_points - 1)
tck = rtctools.data.interpolation.bspline1d.BSpline1D.fit(
self.x, self.y, monotonicity=monotonicity, curvature=curvature)
x = SX.sym('x')
f = Function('f', [x], [rtctools.data.interpolation.bspline1d.BSpline1D(
*tck)(x)])
self.x, self.y, monotonicity=monotonicity, curvature=curvature
)
x = SX.sym("x")
f = Function(
"f", [x], [rtctools.data.interpolation.bspline1d.BSpline1D(*tck)(x)]
)
for xi in range(self.num_test_points - 1):
y_list[xi] = f(self.testpoints[xi])[0]
return y_list
......
import inspect
import os
def local_function():
pass
def data_path():
return os.path.join(os.path.dirname(os.path.abspath(inspect.getsourcefile(local_function))), 'data')
\ No newline at end of file
def data_path():
return os.path.join(
os.path.dirname(os.path.abspath(inspect.getsourcefile(local_function))), "data"
)
......@@ -7,4 +7,6 @@ def local_function():
def data_path():
return os.path.join(os.path.dirname(os.path.abspath(inspect.getsourcefile(local_function))), 'data')
return os.path.join(
os.path.dirname(os.path.abspath(inspect.getsourcefile(local_function))), "data"
)
from .data_path import data_path
from test_case import TestCase
import logging
from rtctools.optimization.collocated_integrated_optimization_problem import CollocatedIntegratedOptimizationProblem
from rtctools.optimization.modelica_mixin import ModelicaMixin
import numpy as np
from rtctools.optimization.collocated_integrated_optimization_problem import (
CollocatedIntegratedOptimizationProblem
)
from rtctools.optimization.control_tree_mixin import ControlTreeMixin
from rtctools.optimization.modelica_mixin import ModelicaMixin
from rtctools.optimization.timeseries import Timeseries
from casadi import MX
import numpy as np
import logging
import time
import sys
import os
from test_case import TestCase
from .data_path import data_path
logger = logging.getLogger("rtctools")
logger.setLevel(logging.DEBUG)
class TestProblem(ControlTreeMixin, ModelicaMixin, CollocatedIntegratedOptimizationProblem):
class TestProblem(
ControlTreeMixin, ModelicaMixin, CollocatedIntegratedOptimizationProblem
):
def __init__(self, branching_times):
super().__init__(
model_name='TestModelWithInitial', model_folder=data_path())
super().__init__(model_name="TestModelWithInitial", model_folder=data_path())
self._branching_times = branching_times
def times(self, variable=None):
......@@ -29,7 +31,7 @@ class TestProblem(ControlTreeMixin, ModelicaMixin, CollocatedIntegratedOptimizat
def parameters(self, ensemble_member):
parameters = super().parameters(ensemble_member)
parameters['u_max'] = 2.0
parameters["u_max"] = 2.0
return parameters
def pre(self):
......@@ -41,22 +43,30 @@ class TestProblem(ControlTreeMixin, ModelicaMixin, CollocatedIntegratedOptimizat
return 3
def control_tree_options(self):
return {'forecast_variables': ['constant_input'],
'branching_times': self._branching_times,
'k': 2}
return {
"forecast_variables": ["constant_input"],
"branching_times": self._branching_times,
"k": 2,
}
def constant_inputs(self, ensemble_member):
# Constant inputs
if ensemble_member == 0:
return {'constant_input': Timeseries(self.times(), np.linspace(1.0, 0.0, 21))}
return {
"constant_input": Timeseries(self.times(), np.linspace(1.0, 0.0, 21))
}
elif ensemble_member == 1:
return {'constant_input': Timeseries(self.times(), np.linspace(0.99, 0.5, 21))}
return {
"constant_input": Timeseries(self.times(), np.linspace(0.99, 0.5, 21))
}
else:
return {'constant_input': Timeseries(self.times(), np.linspace(0.98, 1.0, 21))}
return {
"constant_input": Timeseries(self.times(), np.linspace(0.98, 1.0, 21))
}
def bounds(self):
# Variable bounds
return {'u': (-2.0, 2.0)}
return {"u": (-2.0, 2.0)}
def seed(self, ensemble_member):
# No particular seeding
......@@ -64,9 +74,8 @@ class TestProblem(ControlTreeMixin, ModelicaMixin, CollocatedIntegratedOptimizat
def objective(self, ensemble_member):
# Quadratic penalty on state 'x' at final time
xf = self.state_at('x', self.times(
'x')[-1], ensemble_member=ensemble_member)
return xf**2
xf = self.state_at("x", self.times("x")[-1], ensemble_member=ensemble_member)
return xf ** 2
def constraints(self, ensemble_member):
# No additional constraints
......@@ -78,11 +87,12 @@ class TestProblem(ControlTreeMixin, ModelicaMixin, CollocatedIntegratedOptimizat
def compiler_options(self):
compiler_options = super().compiler_options()
compiler_options['cache'] = False
compiler_options["cache"] = False
return compiler_options
class TestControlTreeMixin1(TestCase):
@property
def branching_times(self):
return [0.1, 0.2]
......@@ -93,45 +103,53 @@ class TestControlTreeMixin1(TestCase):
self.tolerance = 1e-6
def test_tree(self):
v = [self.problem.control_vector('u', ensemble_member) for ensemble_member in range(self.problem.ensemble_size)]
v = [
self.problem.control_vector("u", ensemble_member)
for ensemble_member in range(self.problem.ensemble_size)
]
for i, t in enumerate(self.problem.times()):
if t < self.branching_times[0]:
self.assertEqual(len(set([repr(_v[i]) for _v in v])), 1)
self.assertEqual(len({repr(_v[i]) for _v in v}), 1)
elif t < self.branching_times[1]:
self.assertEqual(len(set([repr(_v[i]) for _v in v])), 2)
self.assertEqual(len({repr(_v[i]) for _v in v}), 2)
else:
self.assertEqual(len(set([repr(_v[i]) for _v in v])), 3)
self.assertEqual(len({repr(_v[i]) for _v in v}), 3)
class TestControlTreeMixin2(TestControlTreeMixin1):
@property
def branching_times(self):
return [0.0, 0.1, 0.2]
class TestControlTreeMixin3(TestControlTreeMixin1):
@property
def branching_times(self):
return np.linspace(0.0, 1.0, 21)[:-1]
class TestControlTreeMixin4(TestControlTreeMixin1):
@property
def branching_times(self):
return np.linspace(0.0, 1.0, 21)[1:-1]
class TestControlTreeMixin5(TestControlTreeMixin1):
@property
def branching_times(self):
return np.linspace(0.0, 1.0, 21)[1:]
class TestProblemDijkverruiming(ControlTreeMixin, ModelicaMixin, CollocatedIntegratedOptimizationProblem):
class TestProblemDijkverruiming(
ControlTreeMixin, ModelicaMixin, CollocatedIntegratedOptimizationProblem
):
def __init__(self):
super().__init__(
model_name='TestModelWithInitial', model_folder=data_path())
super().__init__(model_name="TestModelWithInitial", model_folder=data_path())
def times(self, variable=None):
# Collocation points
......@@ -139,7 +157,7 @@ class TestProblemDijkverruiming(ControlTreeMixin, ModelicaMixin, CollocatedInteg
def parameters(self, ensemble_member):
parameters = super().parameters(ensemble_member)
parameters['u_max'] = 2.0
parameters["u_max"] = 2.0
return parameters
def pre(self):
......@@ -151,9 +169,11 @@ class TestProblemDijkverruiming(ControlTreeMixin, ModelicaMixin, CollocatedInteg
return 12
def control_tree_options(self):
return {'forecast_variables': ['constant_input'],
'branching_times': [0.25, 0.5, 0.75],
'k': 3}
return {
"forecast_variables": ["constant_input"],
"branching_times": [0.25, 0.5, 0.75],
"k": 3,
}
def constant_inputs(self, ensemble_member):
# Constant inputs
......@@ -181,11 +201,11 @@ class TestProblemDijkverruiming(ControlTreeMixin, ModelicaMixin, CollocatedInteg
v = [16, 17, 18, 17]
elif ensemble_member == 11:
v = [16, 17, 18, 18]
return {'constant_input': Timeseries(self.times(), np.array(v))}
return {"constant_input": Timeseries(self.times(), np.array(v))}
def bounds(self):
# Variable bounds
return {'u': (-2.0, 2.0)}
return {"u": (-2.0, 2.0)}
def seed(self, ensemble_member):
# No particular seeding
......@@ -193,9 +213,8 @@ class TestProblemDijkverruiming(ControlTreeMixin, ModelicaMixin, CollocatedInteg
def objective(self, ensemble_member):
# Quadratic penalty on state 'x' at final time
xf = self.state_at('x', self.times(
'x')[-1], ensemble_member=ensemble_member)
return xf**2
xf = self.state_at("x", self.times("x")[-1], ensemble_member=ensemble_member)
return xf ** 2
def constraints(self, ensemble_member):
# No additional constraints
......@@ -207,18 +226,22 @@ class TestProblemDijkverruiming(ControlTreeMixin, ModelicaMixin, CollocatedInteg
def compiler_options(self):
compiler_options = super().compiler_options()
compiler_options['cache'] = False
compiler_options["cache"] = False
return compiler_options
class TestDijkverruiming(TestCase):
def setUp(self):
self.problem = TestProblemDijkverruiming()
self.problem.optimize()
self.tolerance = 1e-6
def test_tree(self):
v = [self.problem.control_vector('u', ensemble_member) for ensemble_member in range(self.problem.ensemble_size)]
v = [
self.problem.control_vector("u", ensemble_member)
for ensemble_member in range(self.problem.ensemble_size)
]
# t = 0
for ensemble_member in range(0, self.problem.ensemble_size):
......@@ -250,4 +273,6 @@ class TestDijkverruiming(TestCase):
# t = 0.75
for ensemble_member_1 in range(self.problem.ensemble_size):
for ensemble_member_2 in range(ensemble_member_1):
self.assertTrue(repr(v[ensemble_member_1][3]) != repr(v[ensemble_member_2][3]))
self.assertTrue(
repr(v[ensemble_member_1][3]) != repr(v[ensemble_member_2][3])
)
from .data_path import data_path
from test_case import TestCase
import logging
from rtctools.optimization.collocated_integrated_optimization_problem import CollocatedIntegratedOptimizationProblem
from rtctools.optimization.modelica_mixin import ModelicaMixin
from rtctools.optimization.csv_mixin import CSVMixin
from rtctools.optimization.csv_lookup_table_mixin import CSVLookupTableMixin
from casadi import MX
import numpy as np
import logging
import time
import sys
import os
from rtctools.optimization.collocated_integrated_optimization_problem import (
CollocatedIntegratedOptimizationProblem
)
from rtctools.optimization.csv_lookup_table_mixin import CSVLookupTableMixin
from rtctools.optimization.csv_mixin import CSVMixin
from rtctools.optimization.modelica_mixin import ModelicaMixin
from test_case import TestCase
from .data_path import data_path
logger = logging.getLogger("rtctools")
logger.setLevel(logging.WARNING)
......@@ -20,20 +20,20 @@ logger.setLevel(logging.WARNING)
class TestProblem(CSVMixin, ModelicaMixin, CollocatedIntegratedOptimizationProblem):
def __init__(self, **kwargs):
kwargs['model_name'] = kwargs.get('model_name', 'TestModel')
kwargs['input_folder'] = data_path()
kwargs['output_folder'] = data_path()
kwargs['model_folder'] = data_path()
kwargs["model_name"] = kwargs.get("model_name", "TestModel")
kwargs["input_folder"] = data_path()
kwargs["output_folder"] = data_path()
kwargs["model_folder"] = data_path()
super().__init__(**kwargs)
def delayed_feedback(self):
# Delayed feedback
return [('x', 'x_delayed', 0.1)]
return [("x", "x_delayed", 0.1)]
def objective(self, ensemble_member):
# Quadratic penalty on state 'x' at final time
xf = self.state_at('x', self.times()[-1])
f = xf**2
xf = self.state_at("x", self.times()[-1])
f = xf ** 2
return f
def constraints(self, ensemble_member):
......@@ -42,15 +42,20 @@ class TestProblem(CSVMixin, ModelicaMixin, CollocatedIntegratedOptimizationProbl
def compiler_options(self):
compiler_options = super().compiler_options()
compiler_options['cache'] = False
compiler_options["cache"] = False
return compiler_options
class TestProblemLookup(CSVLookupTableMixin, TestProblem):
def __init__(self):
super().__init__(input_folder=data_path(), output_folder=data_path(
), model_name='TestModel', model_folder=data_path(), lookup_tables=['constant_input'])
super().__init__(
input_folder=data_path(),
output_folder=data_path(),
model_name="TestModel",
model_folder=data_path(),
lookup_tables=["constant_input"],
)
class TestProblemEnsemble(TestProblem):
......@@ -58,8 +63,13 @@ class TestProblemEnsemble(TestProblem):
csv_ensemble_mode = True
def __init__(self):
super().__init__(input_folder=data_path(), output_folder=data_path(
), model_name='TestModel', model_folder=data_path(), lookup_tables=[])
super().__init__(
input_folder=data_path(),
output_folder=data_path(),
model_name="TestModel",
model_folder=data_path(),
lookup_tables=[],
)
class TestCSVMixin(TestCase):
......@@ -72,34 +82,44 @@ class TestCSVMixin(TestCase):
def test_parameter(self):
params = self.problem.parameters(0)
self.assertEqual(params['k'], 1.01)
self.assertEqual(params["k"], 1.01)
def test_initial_state(self):
initial_state = self.problem.initial_state(0)
self.assertAlmostEqual(initial_state['x'], 1.02, self.tolerance)
self.assertAlmostEqual(initial_state["x"], 1.02, self.tolerance)
def test_objective_value(self):
objective_value_tol = 1e-6
self.assertTrue(abs(self.problem.objective_value)
< objective_value_tol)
self.assertTrue(abs(self.problem.objective_value) < objective_value_tol)
def test_output(self):
self.assertAlmostEqual(self.results['x'][
:]**2 + np.sin(self.problem.times()), self.results['z'][:], self.tolerance)
self.assertAlmostEqual(
self.results["x"][:] ** 2 + np.sin(self.problem.times()),
self.results["z"][:],
self.tolerance,
)
def test_algebraic(self):
self.assertAlmostEqual(self.results[
'y'] + self.results['x'], np.ones(len(self.problem.times())) * 3.0, self.tolerance)
self.assertAlmostEqual(
self.results["y"] + self.results["x"],
np.ones(len(self.problem.times())) * 3.0,
self.tolerance,
)
def test_bounds(self):
self.assertAlmostGreaterThan(self.results['u'], -2, self.tolerance)
self.assertAlmostLessThan(self.results['u'], 2, self.tolerance)
self.assertAlmostGreaterThan(self.results["u"], -2, self.tolerance)
self.assertAlmostLessThan(self.results["u"], 2, self.tolerance)
def test_interpolate(self):
for v in ['x', 'y', 'u']:
for v in ["x", "y", "u"]:
for i in [0, int(len(self.problem.times()) / 2), -1]:
a = self.problem.interpolate(
self.problem.times()[i], self.problem.times(), self.results[v], 0.0, 0.0)
self.problem.times()[i],
self.problem.times(),
self.results[v],
0.0,
0.0,
)
b = self.results[v][i]
self.assertAlmostEqual(a, b, self.tolerance)
......@@ -113,10 +133,14 @@ class TestCSVLookupMixin(TestCSVMixin):
self.tolerance = 1e-6
def test_call(self):
self.assertAlmostEqual(self.problem.lookup_tables(
0)['constant_input'](0.2), 2.0, self.tolerance)
self.assertAlmostEqual(self.problem.lookup_tables(0)['constant_input'](
np.array([0.2, 0.3])), np.array([2.0, 3.0]), self.tolerance)
self.assertAlmostEqual(
self.problem.lookup_tables(0)["constant_input"](0.2), 2.0, self.tolerance
)
self.assertAlmostEqual(
self.problem.lookup_tables(0)["constant_input"](np.array([0.2, 0.3])),
np.array([2.0, 3.0]),
self.tolerance,
)
class TestPIMixinEnsemble(TestCase):
......@@ -128,5 +152,4 @@ class TestPIMixinEnsemble(TestCase):
def test_objective_value(self):
objective_value_tol = 1e-6
self.assertTrue(abs(self.problem.objective_value)
< objective_value_tol)
self.assertTrue(abs(self.problem.objective_value) < objective_value_tol)
This diff is collapsed.
from .data_path import data_path
from test_case import TestCase
import logging
from rtctools.optimization.collocated_integrated_optimization_problem import CollocatedIntegratedOptimizationProblem
from rtctools.optimization.modelica_mixin import ModelicaMixin
from rtctools.optimization.timeseries import Timeseries
from casadi import MX
import numpy as np
import logging
import time
import sys
import os
from rtctools.optimization.collocated_integrated_optimization_problem import (
CollocatedIntegratedOptimizationProblem
)
from rtctools.optimization.modelica_mixin import ModelicaMixin
from test_case import TestCase
from .data_path import data_path
logger = logging.getLogger("rtctools")
......@@ -17,8 +17,12 @@ logger = logging.getLogger("rtctools")
class HybridShootingTestProblem(ModelicaMixin, CollocatedIntegratedOptimizationProblem):
def __init__(self, integrated_states):
super().__init__(input_folder=data_path(
), output_folder=data_path(), model_name='HybridShootingTestModel', model_folder=data_path())
super().__init__(
input_folder=data_path(),
output_folder=data_path(),
model_name="HybridShootingTestModel",
model_folder=data_path(),
)
self._integrated_states = integrated_states
......@@ -36,7 +40,7 @@ class HybridShootingTestProblem(ModelicaMixin, CollocatedIntegratedOptimizationP
def bounds(self):
# Variable bounds
return {'u': (-2.0, 2.0)}
return {"u": (-2.0, 2.0)}
def seed(self, ensemble_member):
# No particular seeding
......@@ -44,9 +48,8 @@ class HybridShootingTestProblem(ModelicaMixin, CollocatedIntegratedOptimizationP
def objective(self, ensemble_member):
# Quadratic penalty on state 'x' at final time
xf = self.state_at('x', self.times(
'x')[-1], ensemble_member=ensemble_member)
return xf**2
xf = self.state_at("x", self.times("x")[-1], ensemble_member=ensemble_member)
return xf ** 2
def constraints(self, ensemble_member):
# No additional constraints
......@@ -58,7 +61,7 @@ class HybridShootingTestProblem(ModelicaMixin, CollocatedIntegratedOptimizationP
def compiler_options(self):
compiler_options = super().compiler_options()
compiler_options['cache'] = False
compiler_options["cache"] = False
return compiler_options
......@@ -73,13 +76,14 @@ class TestHybridShooting(TestCase):
def test_objective_value(self):
objective_value_tol = 1e-6
self.assertAlmostLessThan(
abs(self.problem.objective_value), 0.0, objective_value_tol)
abs(self.problem.objective_value), 0.0, objective_value_tol
)
class TestHybridShootingX(TestHybridShooting):
def setUp(self):
self.problem = HybridShootingTestProblem(['x'])
self.problem = HybridShootingTestProblem(["x"])
self.problem.optimize()
self.results = self.problem.extract_results()
self.tolerance = 1e-6
......@@ -88,7 +92,7 @@ class TestHybridShootingX(TestHybridShooting):
class TestHybridShootingW(TestHybridShooting):
def setUp(self):
self.problem = HybridShootingTestProblem(['w'])
self.problem = HybridShootingTestProblem(["w"])
self.problem.optimize()
self.results = self.problem.extract_results()
self.tolerance = 1e-6
......@@ -97,7 +101,7 @@ class TestHybridShootingW(TestHybridShooting):
class TestSingleShooting(TestHybridShooting):
def setUp(self):
self.problem = HybridShootingTestProblem(['x', 'w'])