Commit 1c596b67 authored by Tjerk Vreeken's avatar Tjerk Vreeken

Add convenience method for merging bounds

A common desire is to update bounds on variables based on some
additional source of data. When doing so, one wants to derive the
intersection of the existing bounds and the bounds following from the
additional data. When the types of these bounds do not match, this
becomes a difficult task, resulting in users setting bounds looser than
might be possible.

The convience method added in this commit handles such type conversions
for the user. It handles upcasting floats to Timeseries, and also
handles various combinations of vector bounds (as 2D Timeseries or
1D NumPy arrays).

Note that this convenience does come at the cost of performance. A
simple min/max merge on bounds which are known to be scalar can be done
in about 350 ns, whereas the method added in this commit takes about 10
times as long with 3.5 us. Merging a combination Timeseries and NumPy
arrays can easily take 30-40 us per call.

Closes #1074
parent c5e61101
......@@ -13,6 +13,10 @@ from .timeseries import Timeseries
logger = logging.getLogger("rtctools")
# Typical type for a bound on a variable
BT = Union[float, np.ndarray, Timeseries]
class OptimizationProblem(metaclass=ABCMeta):
"""
Base class for all optimization problems.
......@@ -376,6 +380,97 @@ class OptimizationProblem(metaclass=ABCMeta):
"""
return AliasDict(self.alias_relation)
@staticmethod
def merge_bounds(a: Tuple[BT, BT], b: Tuple[BT, BT]) -> Tuple[BT, BT]:
"""
Returns a pair of bounds which is the intersection of the two pairs of
bounds given as input.
:param a: First pair ``(upper, lower)`` bounds
:param b: Second pair ``(upper, lower)`` bounds
:returns: A pair of ``(upper, lower)`` bounds which is the
intersection of the two input bounds.
"""
a, A = a
b, B = b
# Make sure we are dealing with the correct types
if __debug__:
for v in (a, A, b, B):
if isinstance(v, np.ndarray):
assert v.ndim == 1
assert np.issubdtype(v.dtype, np.number)
else:
assert isinstance(v, (float, int, Timeseries))
all_bounds = [a, A, b, B]
# First make sure that we treat single element vectors as scalars
for i, v in enumerate(all_bounds):
if isinstance(v, np.ndarray) and np.prod(v.shape) == 1:
all_bounds[i] = v.item()
# Upcast lower bounds to be of equal type, and upper bounds as well.
for i, j in [(0, 2), (2, 0), (1, 3), (3, 1)]:
v1 = all_bounds[i]
v2 = all_bounds[j]
# We only check for v1 being of a "smaller" type than v2, as we
# know we will encounter the reverse as well.
if isinstance(v1, type(v2)):
# Same type, nothing to do.
continue
elif isinstance(v1, (int, float)) and isinstance(v2, Timeseries):
all_bounds[i] = Timeseries(v2.times, np.full_like(v2.values, v1))
elif isinstance(v1, np.ndarray) and isinstance(v2, Timeseries):
if v2.values.ndim != 2 or len(v1) != v2.values.shape[1]:
raise Exception(
"Mismatching vector size when upcasting to Timeseries, {} vs. {}.".format(v1, v2))
all_bounds[i] = Timeseries(v2.times, np.broadcast_to(v1, v2.values.shape))
elif isinstance(v1, (int, float)) and isinstance(v2, np.ndarray):
all_bounds[i] = np.full_like(v2, v1)
a, A, b, B = all_bounds
assert isinstance(a, type(b))
assert isinstance(A, type(B))
# Merge the bounds
m, M = None, None
if isinstance(a, np.ndarray):
if not a.shape == b.shape:
raise Exception("Cannot merge vector minimum bounds of non-equal size")
m = np.maximum(a, b)
elif isinstance(a, Timeseries):
if len(a.times) != len(b.times):
raise Exception("Cannot merge Timeseries minimum bounds with different lengths")
elif not np.all(a.times == b.times):
raise Exception("Cannot merge Timeseries minimum bounds with non-equal times")
elif not a.values.shape == b.values.shape:
raise Exception("Cannot merge vector Timeseries minimum bounds of non-equal size")
m = Timeseries(a.times, np.maximum(a.values, b.values))
else:
m = max(a, b)
if isinstance(A, np.ndarray):
if not A.shape == B.shape:
raise Exception("Cannot merge vector maximum bounds of non-equal size")
M = np.minimum(A, B)
elif isinstance(A, Timeseries):
if len(A.times) != len(B.times):
raise Exception("Cannot merge Timeseries maximum bounds with different lengths")
elif not np.all(A.times == B.times):
raise Exception("Cannot merge Timeseries maximum bounds with non-equal times")
elif not A.values.shape == B.values.shape:
raise Exception("Cannot merge vector Timeseries maximum bounds of non-equal size")
M = Timeseries(A.times, np.minimum(A.values, B.values))
else:
M = min(A, B)
return m, M
def bounds(self) -> AliasDict:
"""
Returns variable bounds as a dictionary mapping variable names to a pair of bounds.
......
import unittest
import numpy as np
from rtctools.optimization.optimization_problem import OptimizationProblem
from rtctools.optimization.timeseries import Timeseries
class TestMergeBounds(unittest.TestCase):
def test_merge_equal_types(self):
m, M = OptimizationProblem.merge_bounds((1, 3), (2, 4))
self.assertEqual(m, 2)
self.assertEqual(M, 3)
m, M = OptimizationProblem.merge_bounds((np.array([2, 1, 0.5]), 3),
(np.array([1, 2, 0.0]), 4))
self.assertTrue(np.array_equal(m, np.array([2, 2, 0.5])))
self.assertEqual(M, 3)
m, M = OptimizationProblem.merge_bounds((1, Timeseries([1, 2, 3], [1, 4, 1])),
(2, Timeseries([1, 2, 3], [1.5, 2, 3])))
self.assertEqual(m, 2)
self.assertTrue(np.array_equal(M.times, np.array([1, 2, 3])))
self.assertTrue(np.array_equal(M.values, np.array([1, 2, 1])))
def test_simple_merge_with_upcast(self):
m, M = OptimizationProblem.merge_bounds((2, 3), (Timeseries([1, 2, 3], [1, 4, 1]), 4))
self.assertTrue(np.array_equal(m.values, np.array([2, 4, 2])))
self.assertEqual(M, 3)
m, M = OptimizationProblem.merge_bounds((2, 3), (np.array([1, 4, 1]), 4))
self.assertTrue(np.array_equal(m, np.array([2, 4, 2])))
self.assertEqual(M, 3)
def test_checks_and_exceptions(self):
# Timeseries times/values lengths are not equal
with self.assertRaisesRegex(Exception, "different lengths"):
m, M = OptimizationProblem.merge_bounds((1, Timeseries([1, 2, 3], [1, 4, 1])),
(2, Timeseries([2, 3], [2, 3])))
# Timeseries times are not equal
with self.assertRaisesRegex(Exception, "non-equal times"):
m, M = OptimizationProblem.merge_bounds((1, Timeseries([1, 2, 3], [1, 4, 1])),
(2, Timeseries([2, 3, 4], [2, 3, 1])))
# Mismatching vector sizes
with self.assertRaisesRegex(Exception, "non-equal size"):
m, M = OptimizationProblem.merge_bounds((np.array([2, 1, 0.5]), 2),
(np.array([1, 2]), 4))
# 1D Timeseries merge with 2D Timeseries
with self.assertRaisesRegex(Exception, "non-equal size"):
m, M = OptimizationProblem.merge_bounds((1, Timeseries([1, 2, 3], [1, 4, 1])),
(2, Timeseries([1, 2, 3], [[2, 3, 1],
[2, 3, 1]])))
# 2D Timeseries merge with differently shaped other 2D Timeseries
with self.assertRaisesRegex(Exception, "non-equal size"):
m, M = OptimizationProblem.merge_bounds((1, Timeseries([1, 2, 3], [[1, 4, 1],
[1, 4, 1]])),
(2, Timeseries([1, 2, 3], [[2, 3, 1],
[2, 3, 1],
[2, 3, 1]])))
# Vector with 2D Timeseries mismatch
with self.assertRaisesRegex(Exception, "vector size when upcasting"):
m, M = OptimizationProblem.merge_bounds((np.array([2, 1, 3]), 2),
(Timeseries([1, 2, 3], [[1, 4],
[1.5, 2],
[0, 0]]), 4))
def test_upcast_2d(self):
m, M = OptimizationProblem.merge_bounds((np.array([2, 1]), 2),
(Timeseries([1, 2, 3], [[1, 4],
[1.5, 2],
[0, 0]]), 4))
self.assertTrue(np.array_equal(m.values, np.array([[2, 4],
[2, 2],
[2, 1]])))
def test_upcast_single_element_vector_as_float(self):
m, M = OptimizationProblem.merge_bounds((np.array([2]), 2),
(Timeseries([1, 2, 3], [[1, 4],
[1.5, 2],
[0, 0]]), 4))
self.assertTrue(np.array_equal(m.values, np.array([[2, 4],
[2, 2],
[2, 2]])))
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment