Skip to content

[WIP] Add Cooperative Group variant of GPU-STUMP #266

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 11 commits into from
Closed
2 changes: 1 addition & 1 deletion environment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ dependencies:
- python>=3.6
- numpy>=1.15
- scipy>=1.5
- numba>=0.48
- numba>=0.53.1
- pandas>=0.20.0
- flake8>=3.7.7
- flake8-docstrings>=1.5.0
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
numpy>=1.15
scipy>=1.5
numba>=0.48
numba>=0.53.1
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ def get_extras_require():
"maintainer_email": "seanmylaw@gmail.com",
"license": "BSD-3",
"packages": ["stumpy"],
"install_requires": ["numpy >= 1.15", "scipy >= 1.5", "numba >= 0.48"],
"install_requires": ["numpy >= 1.15", "scipy >= 1.5", "numba >= 0.53.1"],
"ext_modules": [],
"cmdclass": {},
"tests_require": ["pytest"],
Expand Down
250 changes: 224 additions & 26 deletions stumpy/gpu_aamp.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,176 @@ def _compute_and_update_PI_kernel(
indices[j, 0] = i


@cuda.jit(
"(i8, i8, f8[:], f8[:], i8, f8[:], f8[:], f8[:], b1[:], b1[:],"
"f8[:], f8[:], i8, b1, i8, f8[:, :], i8[:, :])"
)
def _compute_and_update_PI_kernel_cg(
range_start,
range_stop,
T_A,
T_B,
m,
QT_even,
QT_odd,
QT_first,
T_A_subseq_isfinite,
T_B_subseq_isfinite,
T_A_subseq_squared,
T_B_subseq_squared,
k,
ignore_trivial,
excl_zone,
profile,
indices,
):
"""
A Numba CUDA kernel to update the non-normalized (i.e., without z-normalization)
matrix profile and matrix profile indices

Parameters
----------
i : int
sliding window `i`

T_A : ndarray
The time series or sequence for which to compute the dot product

T_B : ndarray
The time series or sequence that will be used to annotate T_A. For every
subsequence in T_A, its nearest neighbor in T_B will be recorded.

m : int
Window size

QT_even : ndarray
The input QT array (dot product between the query sequence,`Q`, and
time series, `T`) to use when `i` is even

QT_odd : ndarray
The input QT array (dot product between the query sequence,`Q`, and
time series, `T`) to use when `i` is odd

QT_first : ndarray
Dot product between the first query sequence,`Q`, and time series, `T`

T_A_subseq_isfinite : ndarray
A boolean array that indicates whether a subsequence in `T_A` contains a
`np.nan`/`np.inf` value (False)

T_B_subseq_isfinite : ndarray
A boolean array that indicates whether a subsequence in `T_B` contains a
`np.nan`/`np.inf` value (False)

T_A_subseq_squared : ndarray
The squared subsequences of `T_A`

T_B_subseq_squared : ndarray
The squared subsequences of `T_B`

k : int
The total number of sliding windows to iterate over

ignore_trivial : bool
Set to `True` if this is a self-join. Otherwise, for AB-join, set this to
`False`.

excl_zone : int
The half width for the exclusion zone relative to the current
sliding window

profile : ndarray
Matrix profile. The first column consists of the global matrix profile,
the second column consists of the left matrix profile, and the third
column consists of the right matrix profile.

indices : ndarray
The first column consists of the matrix profile indices, the second
column consists of the left matrix profile indices, and the third
column consists of the right matrix profile indices.

compute_QT : bool
A boolean flag for whether or not to compute QT

Returns
-------
None

Notes
-----
`arXiv:1901.05708 \
<https://arxiv.org/pdf/1901.05708.pdf>`__

See Algorithm 1

Note that we have extended this algorithm for AB-joins as well.

`DOI: 10.1109/ICDM.2016.0085 \
<https://www.cs.ucr.edu/~eamonn/STOMP_GPU_final_submission_camera_ready.pdf>`__

See Table II, Figure 5, and Figure 6
"""
start = cuda.grid(1)
stride = cuda.gridsize(1)

g = cuda.cg.this_grid()

first_row = True

for i in range(range_start - 1, range_stop):

if i % 2 == 0:
QT_out = QT_even
QT_in = QT_odd
else:
QT_out = QT_odd
QT_in = QT_even

# Only compute QT for rows after the first row
if first_row:
compute_QT = False
first_row = False
else:
compute_QT = True

for j in range(start, QT_out.shape[0], stride):
zone_start = max(0, j - excl_zone)
zone_stop = min(k, j + excl_zone)

if compute_QT:
QT_out[j] = (
QT_in[j - 1]
- T_B[i - 1] * T_A[j - 1]
+ T_B[i + m - 1] * T_A[j + m - 1]
)

QT_out[0] = QT_first[i]

if not T_B_subseq_isfinite[i] or not T_A_subseq_isfinite[j]:
D = np.inf
else:
D = T_B_subseq_squared[i] + T_A_subseq_squared[j] - 2.0 * QT_out[j]

if D < config.STUMPY_D_SQUARED_THRESHOLD:
D = 0

if ignore_trivial:
if i <= zone_stop and i >= zone_start:
D = np.inf
if D < profile[j, 1] and i < j:
profile[j, 1] = D
indices[j, 1] = i
if D < profile[j, 2] and i > j:
profile[j, 2] = D
indices[j, 2] = i

if D < profile[j, 0]:
profile[j, 0] = D
indices[j, 0] = i

g.sync()


def _gpu_aamp(
T_A_fname,
T_B_fname,
Expand All @@ -183,6 +353,7 @@ def _gpu_aamp(
ignore_trivial=True,
range_start=1,
device_id=0,
cooperative=False,
):
"""
A Numba CUDA version of AAMP for parallel computation of the non-normalized (i.e.,
Expand Down Expand Up @@ -272,9 +443,6 @@ def _gpu_aamp(

See Table II, Figure 5, and Figure 6
"""
threads_per_block = config.STUMPY_THREADS_PER_BLOCK
blocks_per_grid = math.ceil(k / threads_per_block)

T_A = np.load(T_A_fname, allow_pickle=False)
T_B = np.load(T_B_fname, allow_pickle=False)
QT = np.load(QT_fname, allow_pickle=False)
Expand All @@ -285,6 +453,13 @@ def _gpu_aamp(
T_B_subseq_squared = np.load(T_B_subseq_squared_fname, allow_pickle=False)

with cuda.gpus[device_id]:
threads_per_block = config.STUMPY_THREADS_PER_BLOCK
if cooperative:
defn = _compute_and_update_PI_kernel_cg.definition
blocks_per_grid = defn.max_cooperative_grid_blocks(threads_per_block)
else:
blocks_per_grid = math.ceil(k / threads_per_block)

device_T_A = cuda.to_device(T_A)
device_T_A_subseq_isfinite = cuda.to_device(T_A_subseq_isfinite)
device_T_A_subseq_squared = cuda.to_device(T_A_subseq_squared)
Expand All @@ -305,29 +480,30 @@ def _gpu_aamp(

device_profile = cuda.to_device(profile)
device_indices = cuda.to_device(indices)
_compute_and_update_PI_kernel[blocks_per_grid, threads_per_block](
range_start - 1,
device_T_A,
device_T_B,
m,
device_QT_even,
device_QT_odd,
device_QT_first,
device_T_A_subseq_isfinite,
device_T_B_subseq_isfinite,
device_T_A_subseq_squared,
device_T_B_subseq_squared,
k,
ignore_trivial,
excl_zone,
device_profile,
device_indices,
False,
)

for i in range(range_start, range_stop):
if cooperative:
_compute_and_update_PI_kernel_cg[blocks_per_grid, threads_per_block](
range_start,
range_stop,
device_T_A,
device_T_B,
m,
device_QT_even,
device_QT_odd,
device_QT_first,
device_T_A_subseq_isfinite,
device_T_B_subseq_isfinite,
device_T_A_subseq_squared,
device_T_B_subseq_squared,
k,
ignore_trivial,
excl_zone,
device_profile,
device_indices,
)
else:
_compute_and_update_PI_kernel[blocks_per_grid, threads_per_block](
i,
range_start - 1,
device_T_A,
device_T_B,
m,
Expand All @@ -343,9 +519,30 @@ def _gpu_aamp(
excl_zone,
device_profile,
device_indices,
True,
False,
)

for i in range(range_start, range_stop):
_compute_and_update_PI_kernel[blocks_per_grid, threads_per_block](
i,
device_T_A,
device_T_B,
m,
device_QT_even,
device_QT_odd,
device_QT_first,
device_T_A_subseq_isfinite,
device_T_B_subseq_isfinite,
device_T_A_subseq_squared,
device_T_B_subseq_squared,
k,
ignore_trivial,
excl_zone,
device_profile,
device_indices,
True,
)

profile = device_profile.copy_to_host()
indices = device_indices.copy_to_host()
profile = np.sqrt(profile)
Expand All @@ -356,7 +553,7 @@ def _gpu_aamp(
return profile_fname, indices_fname


def gpu_aamp(T_A, m, T_B=None, ignore_trivial=True, device_id=0):
def gpu_aamp(T_A, m, T_B=None, ignore_trivial=True, device_id=0, cooperative=False):
"""
Compute the non-normalized (i.e., without z-normalization) matrix profile with one
or more GPU devices
Expand Down Expand Up @@ -512,6 +709,7 @@ def gpu_aamp(T_A, m, T_B=None, ignore_trivial=True, device_id=0):
ignore_trivial,
start + 1,
device_ids[idx],
cooperative,
),
)
else:
Expand Down
Loading