Skip to content

Commit

Permalink
style: pre-commit fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
pre-commit-ci[bot] committed Mar 21, 2024
1 parent 6538d61 commit 676e0de
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 31 deletions.
6 changes: 5 additions & 1 deletion src/dspeed/processors/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,11 @@
from .soft_pileup_corr import soft_pileup_corr, soft_pileup_corr_bl
from .svm import svm_predict
from .time_over_threshold import time_over_threshold
from .time_point_thresh import interpolated_time_point_thresh, time_point_thresh, multi_time_point_thresh
from .time_point_thresh import (
interpolated_time_point_thresh,
multi_time_point_thresh,
time_point_thresh,
)
from .transfer_function_convolver import transfer_function_convolver
from .trap_filters import asym_trap_filter, trap_filter, trap_norm, trap_pickoff
from .upsampler import interpolating_upsampler, upsampler
Expand Down
75 changes: 45 additions & 30 deletions src/dspeed/processors/time_point_thresh.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ def interpolated_time_point_thresh(
* ``a`` -- after; closest integer sample after threshold crossing
* ``r`` -- round; round to nearest integer sample to threshold crossing
* ``l`` -- linear interpolation
The following modes are meant to mirror the options
The following modes are meant to mirror the options
dspeed.upsampler
* ``f`` -- floor; interpolated values are at previous neighbor.
Equivalent to ``a``
Expand Down Expand Up @@ -179,7 +179,7 @@ def interpolated_time_point_thresh(
elif mode_in in (ord("b"), ord("c")): # return index before crossing
t_out[0] = i_cross
elif mode_in == ord("r"): # return closest index to crossing
if abs(a_threshold - w_in[i_cross]) < abs(a_threshold - w_in[i_cross+1]):
if abs(a_threshold - w_in[i_cross]) < abs(a_threshold - w_in[i_cross + 1]):
t_out[0] = i_cross
else:
t_out[0] = i_cross + 1
Expand Down Expand Up @@ -236,7 +236,7 @@ def multi_time_point_thresh(
* ``a`` -- after; closest integer sample after threshold crossing
* ``r`` -- round; round to nearest integer sample to threshold crossing
* ``l`` -- linear interpolation
The following modes are meant to mirror the options
The following modes are meant to mirror the options
dspeed.upsampler
* ``f`` -- floor; interpolated values are at previous neighbor.
Equivalent to ``a``
Expand All @@ -263,20 +263,19 @@ def multi_time_point_thresh(
"""
t_out[:] = np.nan

if (
np.isnan(w_in).any()
or np.isnan(a_threshold).any()
or np.isnan(t_start)
):
if np.isnan(w_in).any() or np.isnan(a_threshold).any() or np.isnan(t_start):
return

if t_start < 0 or t_start >= len(w_in):
return

# make polarity +/- 1
if polarity>0: polarity = 1
elif polarity<0: polarity = -1
else: raise DSPFatal("polarity cannot be 0")
if polarity > 0:
polarity = 1
elif polarity < 0:
polarity = -1
else:
raise DSPFatal("polarity cannot be 0")

sorted_idx = np.argsort(a_threshold)

Expand All @@ -291,60 +290,76 @@ def multi_time_point_thresh(

# Search for timepoints at larger values
i_tp = i_start
if i_tp<len(sorted_idx):
if i_tp < len(sorted_idx):
idx = sorted_idx[i_tp]
for i_wf in range(t_start, len(w_in)-1 if polarity>0 else -1, polarity):
if i_tp >= len(sorted_idx): break
for i_wf in range(t_start, len(w_in) - 1 if polarity > 0 else -1, polarity):
if i_tp >= len(sorted_idx):
break
while w_in[i_wf] <= a_threshold[idx] < w_in[i_wf + polarity]:
if mode_in == ord("i"): # return index closest to start of search
t_out[idx] = i_wf
elif mode_in in (ord("a"), ord("f")): # return index after crossing
t_out[idx] = i_wf if polarity<0 else i_wf + 1
t_out[idx] = i_wf if polarity < 0 else i_wf + 1
elif mode_in in (ord("b"), ord("c")): # return index before crossing
t_out[idx] = i_wf if polarity>0 else i_wf - 1
t_out[idx] = i_wf if polarity > 0 else i_wf - 1
elif mode_in == ord("r"): # round; return closest index
if a_threshold[idx] - w_in[i_wf] < w_in[i_wf+polarity] - a_threshold[sorted_idx[i_tp]]:
if (
a_threshold[idx] - w_in[i_wf]
< w_in[i_wf + polarity] - a_threshold[sorted_idx[i_tp]]
):
t_out[idx] = i_wf
else:
t_out[idx] = i_wf + polarity
elif mode_in == ord("n"): # nearest-neighbor; return half-way between samps
t_out[idx] = i_wf + 0.5*polarity
elif mode_in == ord(
"n"
): # nearest-neighbor; return half-way between samps
t_out[idx] = i_wf + 0.5 * polarity
elif mode_in == ord("l"): # linear
t_out[idx] = i_wf + (a_threshold[idx] - w_in[i_wf]) / (
w_in[i_wf + polarity] - w_in[i_wf]
)
else:
raise DSPFatal("Unrecognized interpolation mode")
i_tp += 1
if i_tp >= len(sorted_idx): break
if i_tp >= len(sorted_idx):
break
idx = sorted_idx[i_tp]

# Search for timepoints at smaller values
i_tp = i_start-1
i_tp = i_start - 1
if i_tp >= 0:
idx = sorted_idx[i_tp]
for i_wf in range(t_start-1, len(w_in)-1 if polarity<0 else -1, -polarity):
if i_tp < 0: break
while w_in[i_wf] <= a_threshold[idx] < w_in[i_wf+polarity]:
for i_wf in range(
t_start - 1, len(w_in) - 1 if polarity < 0 else -1, -polarity
):
if i_tp < 0:
break
while w_in[i_wf] <= a_threshold[idx] < w_in[i_wf + polarity]:
if mode_in == ord("i"): # return index closest to start of search
t_out[idx] = i_wf
elif mode_in in (ord("a"), ord("f")): # return index after crossing
t_out[idx] = i_wf if polarity<0 else i_wf + 1
t_out[idx] = i_wf if polarity < 0 else i_wf + 1
elif mode_in in (ord("b"), ord("c")): # return index before crossing
t_out[idx] = i_wf if polarity>0 else i_wf - 1
t_out[idx] = i_wf if polarity > 0 else i_wf - 1
elif mode_in == ord("r"): # round; return closest index
if a_threshold[idx] - w_in[i_wf] < w_in[i_wf + polarity] - a_threshold[idx]:
if (
a_threshold[idx] - w_in[i_wf]
< w_in[i_wf + polarity] - a_threshold[idx]
):
t_out[idx] = i_wf
else:
t_out[idx] = i_wf + polarity
elif mode_in == ord("n"): # nearest-neighbor; return half-way between samps
t_out[idx] = i_wf + 0.5*polarity
elif mode_in == ord(
"n"
): # nearest-neighbor; return half-way between samps
t_out[idx] = i_wf + 0.5 * polarity
elif mode_in == ord("l"): # linear
t_out[idx] = i_wf + (a_threshold[idx] - w_in[i_wf]) / (
w_in[i_wf + polarity] - w_in[i_wf]
)
else:
raise DSPFatal("Unrecognized interpolation mode")
i_tp -= 1
if i_tp < 0: break
if i_tp < 0:
break
idx = sorted_idx[i_tp]

0 comments on commit 676e0de

Please sign in to comment.