Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Updates to timepoint thresh #62

Merged
merged 5 commits into from
Mar 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion src/dspeed/processors/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,11 @@
from .soft_pileup_corr import soft_pileup_corr, soft_pileup_corr_bl
from .svm import svm_predict
from .time_over_threshold import time_over_threshold
from .time_point_thresh import interpolated_time_point_thresh, time_point_thresh
from .time_point_thresh import (
interpolated_time_point_thresh,
multi_time_point_thresh,
time_point_thresh,
)
from .transfer_function_convolver import transfer_function_convolver
from .trap_filters import asym_trap_filter, trap_filter, trap_norm, trap_pickoff
from .upsampler import interpolating_upsampler, upsampler
Expand Down Expand Up @@ -143,6 +147,7 @@
"svm_predict",
"time_point_thresh",
"interpolated_time_point_thresh",
"multi_time_point_thresh",
"asym_trap_filter",
"trap_filter",
"trap_norm",
Expand Down
202 changes: 194 additions & 8 deletions src/dspeed/processors/time_point_thresh.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ def time_point_thresh(
t_out[0] = i
return
else:
for i in range(int(t_start), 1, -1):
for i in range(int(t_start), 0, -1):
if w_in[i - 1] < a_threshold <= w_in[i]:
t_out[0] = i
return
Expand Down Expand Up @@ -115,13 +115,20 @@ def interpolated_time_point_thresh(

* ``i`` -- integer `t_in`; equivalent to
:func:`~.dsp.processors.fixed_sample_pickoff`
* ``f`` -- floor; interpolated values are at previous neighbor, so
threshold crossing is at next neighbor
* ``c`` -- ceiling, interpolated values are at next neighbor, so
threshold crossing is at previous neighbor
* ``b`` -- before; closest integer sample before threshold crossing
* ``a`` -- after; closest integer sample after threshold crossing
* ``r`` -- round; round to nearest integer sample to threshold crossing
* ``l`` -- linear interpolation

The following modes are meant to mirror the options
dspeed.upsampler

* ``f`` -- floor; interpolated values are at previous neighbor.
Equivalent to ``a``
* ``c`` -- ceiling, interpolated values are at next neighbor.
Equivalent to ``b``
* ``n`` -- nearest-neighbor interpolation; threshold crossing is
half-way between samples
* ``l`` -- linear interpolation
* ``h`` -- Hermite cubic spline interpolation (*not implemented*)
* ``s`` -- natural cubic spline interpolation (*not implemented*)
t_out
Expand Down Expand Up @@ -169,10 +176,15 @@ def interpolated_time_point_thresh(

if mode_in == ord("i"): # return index before crossing
t_out[0] = i_cross
elif mode_in == ord("f"): # return index after crossing
elif mode_in in (ord("a"), ord("f")): # return index after crossing
t_out[0] = i_cross + 1
elif mode_in == ord("c"): # return index before crossing
elif mode_in in (ord("b"), ord("c")): # return index before crossing
t_out[0] = i_cross
elif mode_in == ord("r"): # return closest index to crossing
if abs(a_threshold - w_in[i_cross]) < abs(a_threshold - w_in[i_cross + 1]):
t_out[0] = i_cross
else:
t_out[0] = i_cross + 1
elif mode_in == ord("n"): # nearest-neighbor; return half-way between samps
t_out[0] = i_cross + 0.5
elif mode_in == ord("l"): # linear
Expand All @@ -181,3 +193,177 @@ def interpolated_time_point_thresh(
)
else:
raise DSPFatal("Unrecognized interpolation mode")


@guvectorize(
[
"void(float32[:], float32[:], float32, float32, char, float32[:])",
"void(float64[:], float64[:], float64, float64, char, float64[:])",
],
"(n),(m),(),(),()->(m)",
**nb_kwargs,
)
def multi_time_point_thresh(
w_in: np.ndarray,
a_threshold: np.ndarray,
t_start: int,
polarity: int,
mode_in: np.int8,
t_out: np.ndarray,
) -> None:
"""Find the time where the waveform value crosses the threshold

Search performed walking either forward or backward from the starting
index. Use interpolation to estimate a time between samples. Interpolation
mode selected with `mode_in`.

Parameters
----------
w_in
the input waveform.
a_threshold
list of threshold values.
t_start
the starting index.
polarity
is the average slope of the waveform up (>0) or down (<0) in the
search region; only sign matters, not value. Raise Exception if 0.
mode_in
Character selecting which interpolation method to use. Note this
must be passed as a ``int8``, e.g. ``ord('i')``. Options:

* ``i`` -- integer `t_in`; equivalent to
:func:`~.dsp.processors.fixed_sample_pickoff`
* ``b`` -- before; closest integer sample before threshold crossing
* ``a`` -- after; closest integer sample after threshold crossing
* ``r`` -- round; round to nearest integer sample to threshold crossing
* ``l`` -- linear interpolation

The following modes are meant to mirror the options
dspeed.upsampler

* ``f`` -- floor; interpolated values are at previous neighbor.
Equivalent to ``a``
* ``c`` -- ceiling, interpolated values are at next neighbor.
Equivalent to ``b``
* ``n`` -- nearest-neighbor interpolation; threshold crossing is
half-way between samples
* ``h`` -- Hermite cubic spline interpolation (*not implemented*)
* ``s`` -- natural cubic spline interpolation (*not implemented*)
t_out
the index where the waveform value crosses the threshold.

JSON Configuration Example
--------------------------

.. code-block :: json

"tp_0": {
"function": "time_point_thresh",
"module": "dspeed.processors",
"args": ["wf_atrap", "bl_std", "tp_start", 0, "'l'", "tp_0"],
"unit": "ns"
}
"""
t_out[:] = np.nan

if np.isnan(w_in).any() or np.isnan(a_threshold).any() or np.isnan(t_start):
return

if t_start < 0 or t_start >= len(w_in):
return

# make polarity +/- 1
if polarity > 0:
polarity = 1
elif polarity < 0:
polarity = -1
else:
raise DSPFatal("polarity cannot be 0")

sorted_idx = np.argsort(a_threshold)

# Get initial values for search
t_start = int(t_start)
a_start = w_in[t_start]
i_start = len(sorted_idx)
for i in range(len(sorted_idx)):
if a_threshold[sorted_idx[i]] >= a_start:
i_start = i
break

# Search for timepoints at larger values
i_tp = i_start
if i_tp < len(sorted_idx):
idx = sorted_idx[i_tp]
for i_wf in range(t_start, len(w_in) - 1 if polarity > 0 else -1, polarity):
if i_tp >= len(sorted_idx):
break
while w_in[i_wf] <= a_threshold[idx] < w_in[i_wf + polarity]:
if mode_in == ord("i"): # return index closest to start of search
t_out[idx] = i_wf
elif mode_in in (ord("a"), ord("f")): # return index after crossing
t_out[idx] = i_wf if polarity < 0 else i_wf + 1
elif mode_in in (ord("b"), ord("c")): # return index before crossing
t_out[idx] = i_wf if polarity > 0 else i_wf - 1
elif mode_in == ord("r"): # round; return closest index
if (
a_threshold[idx] - w_in[i_wf]
< w_in[i_wf + polarity] - a_threshold[sorted_idx[i_tp]]
):
t_out[idx] = i_wf
else:
t_out[idx] = i_wf + polarity
elif mode_in == ord(
"n"
): # nearest-neighbor; return half-way between samps
t_out[idx] = i_wf + 0.5 * polarity
elif mode_in == ord("l"): # linear
t_out[idx] = i_wf + (a_threshold[idx] - w_in[i_wf]) / (
w_in[i_wf + polarity] - w_in[i_wf]
)
else:
raise DSPFatal("Unrecognized interpolation mode")
i_tp += 1
if i_tp >= len(sorted_idx):
break
idx = sorted_idx[i_tp]

# Search for timepoints at smaller values
i_tp = i_start - 1
if i_tp >= 0:
idx = sorted_idx[i_tp]
for i_wf in range(
t_start - 1, len(w_in) - 1 if polarity < 0 else -1, -polarity
):
if i_tp < 0:
break
while w_in[i_wf] <= a_threshold[idx] < w_in[i_wf + polarity]:
if mode_in == ord("i"): # return index closest to start of search
t_out[idx] = i_wf
elif mode_in in (ord("a"), ord("f")): # return index after crossing
t_out[idx] = i_wf if polarity < 0 else i_wf + 1
elif mode_in in (ord("b"), ord("c")): # return index before crossing
t_out[idx] = i_wf if polarity > 0 else i_wf - 1
elif mode_in == ord("r"): # round; return closest index
if (
a_threshold[idx] - w_in[i_wf]
< w_in[i_wf + polarity] - a_threshold[idx]
):
t_out[idx] = i_wf
else:
t_out[idx] = i_wf + polarity
elif mode_in == ord(
"n"
): # nearest-neighbor; return half-way between samps
t_out[idx] = i_wf + 0.5 * polarity
elif mode_in == ord("l"): # linear
t_out[idx] = i_wf + (a_threshold[idx] - w_in[i_wf]) / (
w_in[i_wf + polarity] - w_in[i_wf]
)
else:
raise DSPFatal("Unrecognized interpolation mode")
i_tp -= 1
if i_tp < 0:
break
idx = sorted_idx[i_tp]