Skip to content

Enhance _cwt.py by introducing a configurable hop size parameter #804

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 14 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 39 additions & 8 deletions pywt/_cwt.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ def next_fast_len(n):
return 2**ceil(np.log2(n))


def cwt(data, scales, wavelet, sampling_period=1., method='conv', axis=-1):
def cwt(data, scales, wavelet, hop_size=1, sampling_period=1., method='conv', axis=-1):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hop_size is new so should be documented. Including an example may be useful too. As well as the reference to the paper.

"""
cwt(data, scales, wavelet)
cwt(data, scales, wavelet, hop_size)

One dimensional Continuous Wavelet Transform.

Expand All @@ -41,6 +41,14 @@ def cwt(data, scales, wavelet, sampling_period=1., method='conv', axis=-1):
``sampling_period`` is given in seconds.
wavelet : Wavelet object or name
Wavelet to use
hop_size : int
Specifies the down-sampling factor applied on temporal axis during the transform.
The output is sampled every hop_size samples, rather than at every consecutive sample.
For example:
A signal of length 1024 yields 1024 output samples when hop_size=1;
512 output samples when hop_size=2;
256 output samples when hop_size=4.
hop_size must be a positive integer (≥1).
sampling_period : float
Sampling period for the frequencies output (optional).
The values computed for ``coefs`` are independent of the choice of
Expand Down Expand Up @@ -73,7 +81,7 @@ def cwt(data, scales, wavelet, sampling_period=1., method='conv', axis=-1):

Notes
-----
Size of coefficients arrays depends on the length of the input array and
Size of coefficients arrays depends on the length of the input array, the given hop_size and
the length of given scales.

Examples
Expand All @@ -83,7 +91,7 @@ def cwt(data, scales, wavelet, sampling_period=1., method='conv', axis=-1):
>>> import matplotlib.pyplot as plt
>>> x = np.arange(512)
>>> y = np.sin(2*np.pi*x/32)
>>> coef, freqs=pywt.cwt(y,np.arange(1,129),'gaus1')
>>> coef, freqs=pywt.cwt(y,np.arange(1,129),1, 'gaus1')
>>> plt.matshow(coef)
>>> plt.show()

Expand All @@ -93,7 +101,7 @@ def cwt(data, scales, wavelet, sampling_period=1., method='conv', axis=-1):
>>> t = np.linspace(-1, 1, 200, endpoint=False)
>>> sig = np.cos(2 * np.pi * 7 * t) + np.real(np.exp(-7*(t-0.4)**2)*np.exp(1j*2*np.pi*2*(t-0.4)))
>>> widths = np.arange(1, 31)
>>> cwtmatr, freqs = pywt.cwt(sig, widths, 'mexh')
>>> cwtmatr, freqs = pywt.cwt(sig, widths,2, 'mexh')
>>> plt.imshow(cwtmatr, extent=[-1, 1, 1, 31], cmap='PRGn', aspect='auto',
... vmax=abs(cwtmatr).max(), vmin=-abs(cwtmatr).max())
>>> plt.show()
Expand All @@ -112,9 +120,19 @@ def cwt(data, scales, wavelet, sampling_period=1., method='conv', axis=-1):

if not np.isscalar(axis):
raise AxisError("axis must be a scalar.")
# Ensure hop_size is a positive integer
if not isinstance(hop_size, int) or hop_size <= 0:
raise ValueError(f"Invalid hop_size: {hop_size}. It must be a positive integer.")

dt_out = dt_cplx if wavelet.complex_cwt else dt
out = np.empty((np.size(scales),) + data.shape, dtype=dt_out)

# out length of transform when applying down sampling
# hop_size is only applied for 1D data
if data.ndim == 1:
downsampled_length = int(len(data) // hop_size)
data_sampled = np.empty((data.shape[0], downsampled_length))
out = np.empty((np.size(scales), downsampled_length), dtype=dt_out)

precision = 10
int_psi, x = integrate_wavelet(wavelet, precision=precision)
int_psi = np.conj(int_psi) if wavelet.complex_cwt else int_psi
Expand All @@ -137,6 +155,8 @@ def cwt(data, scales, wavelet, sampling_period=1., method='conv', axis=-1):
# reshape to (n_batch, data.shape[-1])
data_shape_pre = data.shape
data = data.reshape((-1, data.shape[-1]))
if data.ndim == 1:
data_sampled_shape_pre = data_sampled.shape

for i, scale in enumerate(scales):
step = x[1] - x[0]
Expand Down Expand Up @@ -173,11 +193,18 @@ def cwt(data, scales, wavelet, sampling_period=1., method='conv', axis=-1):
conv = np.fft.ifft(fft_wav * fft_data, axis=-1)
conv = conv[..., :data.shape[-1] + int_psi_scale.size - 1]

coef = - np.sqrt(scale) * np.diff(conv, axis=-1)
coef_temp = - np.sqrt(scale) * np.diff(conv, axis=-1)

# Apply time downsampling
if data.ndim == 1:
coef = coef_temp[::int(hop_size)] # Selecting every `hop_size`-th sample
if data.ndim > 1:
coef = coef_temp
if out.dtype.kind != 'c':
coef = coef.real

# transform axis is always -1 due to the data reshape above
d = (coef.shape[-1] - data.shape[-1]) / 2.
d = (coef.shape[-1] - data_sampled.shape[-1]) / 2.
if d > 0:
coef = coef[..., floor(d):-ceil(d)]
elif d < 0:
Expand All @@ -187,6 +214,10 @@ def cwt(data, scales, wavelet, sampling_period=1., method='conv', axis=-1):
# restore original data shape and axis position
coef = coef.reshape(data_shape_pre)
coef = coef.swapaxes(axis, -1)
# if data.ndim == 1:
# # restore original data shape and axis position
# coef = coef.reshape(data_sampled_shape_pre)
# coef = coef.swapaxes(axis, -1)
out[i, ...] = coef

frequencies = scale2frequency(wavelet, scales, precision)
Expand Down
10 changes: 6 additions & 4 deletions pywt/tests/test_cwt_wavelets.py
Original file line number Diff line number Diff line change
Expand Up @@ -379,17 +379,18 @@ def test_cwt_complex(dtype, tol, method):
dt = time[1] - time[0]
wavelet = 'cmor1.5-1.0'
scales = np.arange(1, 32)
hop_size = 1

# real-valued tranfsorm as a reference
[cfs, f] = pywt.cwt(sst, scales, wavelet, dt, method=method)
[cfs, f] = pywt.cwt(sst, scales, wavelet, hop_size, dt, method=method)

# verify same precision
assert_equal(cfs.real.dtype, sst.dtype)

# complex-valued transform equals sum of the transforms of the real
# and imaginary components
sst_complex = sst + 1j*sst
[cfs_complex, f] = pywt.cwt(sst_complex, scales, wavelet, dt,
[cfs_complex, f] = pywt.cwt(sst_complex, scales, wavelet, hop_size, dt,
method=method)
assert_allclose(cfs + 1j*cfs, cfs_complex, atol=tol, rtol=tol)
# verify dtype is preserved
Expand All @@ -407,12 +408,13 @@ def test_cwt_batch(axis, method):
dt = time[1] - time[0]
wavelet = 'cmor1.5-1.0'
scales = np.arange(1, 32)
hop_size = 1

# non-batch transform as reference
[cfs1, f] = pywt.cwt(sst1, scales, wavelet, dt, method=method, axis=axis)
[cfs1, f] = pywt.cwt(sst1, scales, wavelet, hop_size, dt, method=method, axis=axis)

shape_in = sst.shape
[cfs, f] = pywt.cwt(sst, scales, wavelet, dt, method=method, axis=axis)
[cfs, f] = pywt.cwt(sst, scales, wavelet, hop_size, dt, method=method, axis=axis)

# shape of input is not modified
assert_equal(shape_in, sst.shape)
Expand Down