Skip to content

ENH: Numerically stable Cython functions roll_cov and roll_corr #8326

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 3 commits into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
302 changes: 253 additions & 49 deletions pandas/algos.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -1087,7 +1087,7 @@ def ewmcov(ndarray[double_t] input_x, ndarray[double_t] input_y,
sum_wt = 1.
sum_wt2 = 1.
old_wt = 1.

for i from 1 <= i < N:
cur_x = input_x[i]
cur_y = input_y[i]
Expand Down Expand Up @@ -1117,7 +1117,7 @@ def ewmcov(ndarray[double_t] input_x, ndarray[double_t] input_y,
elif is_observation:
mean_x = cur_x
mean_y = cur_y

if nobs >= minp:
if not bias:
numerator = sum_wt * sum_wt
Expand Down Expand Up @@ -1252,56 +1252,265 @@ def nancorr_spearman(ndarray[float64_t, ndim=2] mat, Py_ssize_t minp=1):

return result


#----------------------------------------------------------------------
# Rolling variance
# Rolling correlation

def roll_var(ndarray[double_t] input, int win, int minp, int ddof=1):
def roll_corr(ndarray[double_t] x, ndarray[double_t] y, int win, int minp,
int ddof=1):
"""
Numerically stable implementation using Welford's method.
Numerically stable implementation using a Welford-like method, and
detection of exactly matching sequences and exactly zero denominators.
"""
cdef double val, prev, mean_x = 0, ssqdm_x = 0, nobs = 0, delta
cdef Py_ssize_t i
cdef Py_ssize_t N = len(input)
cdef double val_x, val_y, prev_x, prev_y, rep_x = NaN, rep_y = NaN
cdef double mean_x = 0, mean_y = 0, ssqdm_x = 0, ssqdm_y = 0
cdef double sproddm_xy = 0, delta_x, delta_y
cdef Py_ssize_t nobs = 0, nrep_x = 0, nrep_y = 0, ndup = 0, i, N = len(x)
cdef bint val_not_nan, prev_not_nan

cdef ndarray[double_t] output = np.empty(N, dtype=float)

minp = _check_minp(win, minp, N)

# Check for windows larger than array, addresses #7297
win = min(win, N)
for i from 0 <= i < N:
val_x = x[i]
val_y = y[i]
val_not_nan = val_x == val_x and val_y == val_y
if i < win:
prev_x = prev_y = NaN
prev_not_nan = 0
else:
prev_x = x[i - win]
prev_y = y[i - win]
prev_not_nan = prev_x == prev_x and prev_y == prev_y

# First, count the number of observations and consecutive repeats
if prev_not_nan:
# prev is not NaN, removing an observation...
if nobs == nrep_x:
# ...and removing a repeat from x
nrep_x -= 1
if nrep_x == 0:
rep_x = NaN
if nobs == nrep_y:
# ...and removing a repeat from y
nrep_y -= 1
if nrep_y == 0:
rep_y = NaN
if nobs == ndup:
# ...and removing a duplicate
ndup -= 1
nobs -= 1
if val_not_nan:
# val is not NaN, adding an observation
if val_x == rep_x:
# ...and adding a repeat to x
nrep_x += 1
else:
# ...and resetting x repeats
nrep_x = 1
rep_x = val_x
if val_y == rep_y:
# ...and adding a repeat to y
nrep_y += 1
else:
# ...and resetting y repeats
nrep_y = 1
rep_y = val_y
if val_x == val_y:
# ...and adding a duplicate
ndup += 1
else:
# ...and resetting duplicates
ndup = 0
nobs += 1

# Over the first window, observations can only be added, never removed
for i from 0 <= i < win:
val = input[i]
# Then, compute the new mean, sums of squared differences from the
# mean and sum of product of differences from the mean
if nobs == nrep_x and nobs == nrep_y:
if nobs > 0:
mean_x = rep_x
mean_y = rep_y
else:
mean_x = mean_y = 0
ssqdm_x = ssqdm_y = sproddm_xy = 0
elif val_not_nan:
# Adding one observation...
if prev_not_nan:
# ...and removing another
delta_x = val_x - prev_x
prev_x -= mean_x
if nobs == nrep_x:
mean_x = rep_x
val_x = 0
ssqdm_x = 0
else:
mean_x += delta_x / nobs
val_x -= mean_x
ssqdm_x += (val_x + prev_x) * delta_x
delta_y = val_y - prev_y
prev_y -= mean_y
if nobs == nrep_y:
mean_y = rep_y
val_y = 0
ssqdm_y = 0
else:
mean_y += delta_y / nobs
val_y -= mean_y
ssqdm_y += (val_y + prev_y) * delta_y
sproddm_xy += (delta_x * (val_y + prev_y) +
delta_y * (val_x + prev_x)) * 0.5
else:
# ...and not removing any
if nobs == nrep_x:
delta_x = 0
mean_x = rep_x
ssqdm_x = 0
else:
delta_x = val_x - mean_x
mean_x += delta_x / nobs
ssqdm_x += delta_x * (val_x - mean_x)
if nobs == nrep_y:
delta_y = 0
mean_y = rep_y
ssqdm_y = 0
else:
delta_y = val_y - mean_y
mean_y += delta_y / nobs
ssqdm_y += delta_y * (val_y - mean_y)
sproddm_xy += ((val_x - mean_x) * delta_y +
(val_y - mean_y) * delta_x) * 0.5
elif prev_not_nan:
# Adding no new observation, but removing one
if nobs == nrep_x:
delta_x = 0
mean_x = rep_x
ssqdm_x = 0
else:
delta_x = prev_x - mean_x
mean_x -= delta_x / nobs
ssqdm_x -= delta_x * (prev_x - mean_x)
if nobs == nrep_y:
delta_y = 0
mean_y = rep_y
ssqdm_y = 0
else:
delta_y = prev_y - mean_y
mean_y -= delta_y / nobs
ssqdm_y -= delta_y * (prev_y - mean_y)
sproddm_xy -= ((prev_x - mean_x) * delta_y +
(prev_y - mean_y) * delta_x) * 0.5
# Correlation is unchanged if no observation is added or removed

# Finally, compute and write the rolling correlation to output
if nobs < minp or nobs < ddof or ssqdm_x <= 0 or ssqdm_y <= 0:
output[i] = NaN
elif nobs == ndup:
output[i] = 1.0
else:
output[i] = sproddm_xy / sqrt(ssqdm_x * ssqdm_y)

# Not NaN
if val == val:
nobs += 1
delta = (val - mean_x)
mean_x += delta / nobs
ssqdm_x += delta * (val - mean_x)

if (nobs >= minp) and (nobs > ddof):
#pathological case
if nobs == 1:
val = 0
return output


#----------------------------------------------------------------------
# Rolling covariance

def roll_cov(ndarray[double_t] x, ndarray[double_t] y, int win, int minp,
int ddof=1):
"""
Numerically stable implementation using a Welford-like method.
"""
cdef double val_x, val_y, prev_x, prev_y, delta_x, delta_y
cdef double mean_x = 0, mean_y = 0, sproddm_xy = 0
cdef Py_ssize_t nobs = 0, i, N = len(x)
cdef bint val_not_nan, prev_not_nan

cdef ndarray[double_t] output = np.empty(N, dtype=float)

minp = _check_minp(win, minp, N)

for i from 0 <= i < N:
val_x = x[i]
val_y = y[i]
val_not_nan = val_x == val_x and val_y == val_y
if i < win:
prev_x = prev_y = NaN
prev_not_nan = 0
else:
prev_x = x[i - win]
prev_y = y[i - win]
prev_not_nan = prev_x == prev_x and prev_y == prev_y

if val_not_nan:
# Adding one observation...
if prev_not_nan:
# ...and removing another
delta_x = val_x - prev_x
prev_x -= mean_x
mean_x += delta_x / nobs
val_x -= mean_x
delta_y = val_y - prev_y
prev_y -= mean_y
mean_y += delta_y / nobs
val_y -= mean_y
sproddm_xy += (delta_x * (val_y + prev_y) +
delta_y * (val_x + prev_x)) * 0.5
else:
# ...and not removing any
nobs += 1
delta_x = val_x - mean_x
mean_x += delta_x / nobs
delta_y = val_y - mean_y
mean_y += delta_y / nobs
sproddm_xy += ((val_x - mean_x) * delta_y +
(val_y - mean_y) * delta_x) * 0.5
elif prev_not_nan:
# Adding no new observation, but removing one
nobs -= 1
if nobs:
delta_x = prev_x - mean_x
mean_x -= delta_x / nobs
delta_y = prev_y - mean_y
mean_y -= delta_y / nobs
sproddm_xy -= ((prev_x - mean_x) * delta_y +
(prev_y - mean_y) * delta_x) * 0.5
else:
val = ssqdm_x / (nobs - ddof)
if val < 0:
val = 0
mean_x = mean_y = sproddm_xy = 0
# Covariance is unchanged if no observation is added or removed

# Finally, compute and write the rolling covariance to output
if nobs >= minp and nobs > ddof:
output[i] = sproddm_xy / (nobs - ddof)
else:
val = NaN
output[i] = NaN

return output


#----------------------------------------------------------------------
# Rolling variance

def roll_var(ndarray[double_t] input, int win, int minp, int ddof=1):
"""
Numerically stable implementation using Welford's method.
"""
cdef double val, prev, mean_x = 0, ssqdm_x = 0, nobs = 0, delta
cdef Py_ssize_t i
cdef Py_ssize_t N = len(input)

output[i] = val
cdef ndarray[double_t] output = np.empty(N, dtype=float)

# After the first window, observations can both be added and removed
for i from win <= i < N:
minp = _check_minp(win, minp, N)

for i from 0 <= i < N:
val = input[i]
prev = input[i - win]
prev = NaN if i < win else input[i - win]

if val == val:
# Adding one observation...
if prev == prev:
# Adding one observation and removing another one
# ...and removing another
delta = val - prev
prev -= mean_x
mean_x += delta / nobs
Expand All @@ -1310,33 +1519,28 @@ def roll_var(ndarray[double_t] input, int win, int minp, int ddof=1):
else:
# Adding one observation and not removing any
nobs += 1
delta = (val - mean_x)
delta = val - mean_x
mean_x += delta / nobs
ssqdm_x += delta * (val - mean_x)
elif prev == prev:
# Adding no new observation, but removing one
nobs -= 1
if nobs:
delta = (prev - mean_x)
mean_x -= delta / nobs
delta = prev - mean_x
mean_x -= delta / nobs
ssqdm_x -= delta * (prev - mean_x)
else:
mean_x = 0
ssqdm_x = 0
mean_x = ssqdm_x = 0
# Variance is unchanged if no observation is added or removed

if (nobs >= minp) and (nobs > ddof):
#pathological case
if nobs == 1:
val = 0
else:
val = ssqdm_x / (nobs - ddof)
if val < 0:
val = 0
else:
val = NaN
# Sum of squared differences from the mean must be non-negative
ssqdm_x = 0 if ssqdm_x < 0 else ssqdm_x

output[i] = val
# Finally, compute and write the rolling variance to output
if nobs >= minp and nobs > ddof:
output[i] = ssqdm_x / (nobs - ddof)
else:
output[i] = NaN

return output

Expand Down