Skip to content

Commit 5c1a5e9

Browse files
authored
Support for unequal length timeseries in itakura parallelogram (aeon-toolkit#2647)
1 parent 55f5724 commit 5c1a5e9

File tree

2 files changed

+82
-42
lines changed

2 files changed

+82
-42
lines changed

aeon/distances/elastic/_bounding_matrix.py

+51-34
Original file line numberDiff line numberDiff line change
@@ -63,44 +63,61 @@ def create_bounding_matrix(
6363
def _itakura_parallelogram(x_size: int, y_size: int, max_slope_percent: float):
6464
"""Itakura parallelogram bounding matrix.
6565
66-
This code was adapted from tslearn. This link to the original code line 974:
67-
https://github.com/tslearn-team/tslearn/blob/main/tslearn/metrics/dtw_variants.py
66+
This code was adapted from pyts. This link to the original code:
67+
https://pyts.readthedocs.io/en/latest/_modules/pyts/metrics/dtw.html#itakura_parallelogram
6868
"""
69-
if x_size != y_size:
70-
raise ValueError(
71-
"""Itakura parallelogram does not support unequal length time series.
72-
Please consider using a full bounding matrix or a sakoe chiba bounding matrix
73-
instead."""
74-
)
7569
one_percent = min(x_size, y_size) / 100
7670
max_slope = math.floor((max_slope_percent * one_percent) * 100)
7771
min_slope = 1 / float(max_slope)
78-
max_slope *= float(x_size) / float(y_size)
79-
min_slope *= float(x_size) / float(y_size)
80-
81-
lower_bound = np.empty((2, y_size))
82-
lower_bound[0] = min_slope * np.arange(y_size)
83-
lower_bound[1] = (
84-
(x_size - 1) - max_slope * (y_size - 1) + max_slope * np.arange(y_size)
85-
)
86-
lower_bound_ = np.empty(y_size)
87-
for i in range(y_size):
88-
lower_bound_[i] = max(round(lower_bound[0, i], 2), round(lower_bound[1, i], 2))
89-
lower_bound_ = np.ceil(lower_bound_)
90-
91-
upper_bound = np.empty((2, y_size))
92-
upper_bound[0] = max_slope * np.arange(y_size)
93-
upper_bound[1] = (
94-
(x_size - 1) - min_slope * (y_size - 1) + min_slope * np.arange(y_size)
95-
)
96-
upper_bound_ = np.empty(y_size)
97-
for i in range(y_size):
98-
upper_bound_[i] = min(round(upper_bound[0, i], 2), round(upper_bound[1, i], 2))
99-
upper_bound_ = np.floor(upper_bound_ + 1)
100-
101-
bounding_matrix = np.full((x_size, y_size), False)
102-
for i in range(y_size):
103-
bounding_matrix[int(lower_bound_[i]) : int(upper_bound_[i]), i] = True
72+
max_slope *= float(y_size - 1) / float(x_size - 2)
73+
max_slope = max(max_slope, 1.0)
74+
75+
min_slope *= float(y_size - 2) / float(x_size - 1)
76+
min_slope = min(min_slope, 1.0)
77+
78+
centered_scale = np.arange(x_size) - x_size + 1
79+
80+
lower_bound = np.empty(x_size, dtype=np.float64)
81+
upper_bound = np.empty(x_size, dtype=np.float64)
82+
83+
for i in range(x_size):
84+
lb0 = min_slope * i
85+
lb1 = max_slope * centered_scale[i] + y_size - 1
86+
lower_bound[i] = math.ceil(max(round(lb0, 2), round(lb1, 2)))
87+
88+
ub0 = max_slope * i + 1
89+
ub1 = min_slope * centered_scale[i] + y_size
90+
upper_bound[i] = math.floor(min(round(ub0, 2), round(ub1, 2)))
91+
92+
if max_slope == 1.0:
93+
if y_size > x_size:
94+
for i in range(x_size - 1):
95+
upper_bound[i] = lower_bound[i + 1]
96+
else:
97+
for i in range(x_size):
98+
upper_bound[i] = lower_bound[i] + 1
99+
100+
for i in range(x_size):
101+
if lower_bound[i] < 0:
102+
lower_bound[i] = 0
103+
if lower_bound[i] > y_size:
104+
lower_bound[i] = y_size
105+
if upper_bound[i] < 0:
106+
upper_bound[i] = 0
107+
if upper_bound[i] > y_size:
108+
upper_bound[i] = y_size
109+
110+
bounding_matrix = np.empty((x_size, y_size), dtype=np.bool_)
111+
for i in range(x_size):
112+
for j in range(y_size):
113+
bounding_matrix[i, j] = False
114+
115+
for i in range(x_size):
116+
start = int(lower_bound[i])
117+
end = int(upper_bound[i])
118+
for j in range(start, end):
119+
bounding_matrix[i, j] = True
120+
104121
return bounding_matrix
105122

106123

aeon/distances/elastic/tests/test_bounding.py

+31-8
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
"""Test for bounding matrix."""
22

33
import numpy as np
4-
import pytest
54

65
from aeon.distances import create_bounding_matrix
76

@@ -37,10 +36,34 @@ def test_itakura_parallelogram():
3736
matrix = create_bounding_matrix(10, 10, itakura_max_slope=0.2)
3837
assert isinstance(matrix, np.ndarray)
3938

40-
with pytest.raises(
41-
ValueError,
42-
match="""Itakura parallelogram does not support unequal length time series.
43-
Please consider using a full bounding matrix or a sakoe chiba bounding matrix
44-
instead.""",
45-
):
46-
create_bounding_matrix(5, 10, itakura_max_slope=0.2)
39+
expected_result_5_7 = np.array(
40+
[
41+
[True, False, False, False, False, False, False],
42+
[False, True, True, True, True, False, False],
43+
[False, False, True, True, True, False, False],
44+
[False, False, True, True, True, True, False],
45+
[False, False, False, False, False, False, True],
46+
]
47+
)
48+
49+
expected_result_7_5 = np.array(
50+
[
51+
[True, False, False, False, False],
52+
[False, True, False, False, False],
53+
[False, True, True, True, False],
54+
[False, True, True, True, False],
55+
[False, True, True, True, False],
56+
[False, False, False, True, False],
57+
[False, False, False, False, True],
58+
]
59+
)
60+
61+
matrix = create_bounding_matrix(5, 7, itakura_max_slope=0.5)
62+
assert isinstance(matrix, np.ndarray)
63+
assert matrix.shape == (5, 7)
64+
assert np.array_equal(matrix, expected_result_5_7)
65+
66+
matrix = create_bounding_matrix(7, 5, itakura_max_slope=0.5)
67+
assert isinstance(matrix, np.ndarray)
68+
assert matrix.shape == (7, 5)
69+
assert np.array_equal(matrix, expected_result_7_5)

0 commit comments

Comments
 (0)