Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactored Spike Contrast method #1

Merged
merged 34 commits into from
Sep 15, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
1405966
faster mean_firing_rate for a typical use case (#331)
dizcza Jun 22, 2020
91c03c4
Timescale function option to return nan if there are few spikes (<=2)…
morales-gregorio Jun 30, 2020
dea2ec4
Build documentation in Travis (#335)
dizcza Jul 27, 2020
e55a6bf
Removed deprecation warnings from 0.7.0 (#306)
dizcza Jul 27, 2020
0779731
don't use nan
dizcza Aug 1, 2020
97482bc
don't pad with zeros
dizcza Aug 1, 2020
4b20fce
bin_shrink_factor parameter
dizcza Aug 1, 2020
89dcdae
precompute edges for _binning_half_overlap
dizcza Aug 1, 2020
352e8fb
deal with spiketrains of length 1
dizcza Aug 4, 2020
7548c0d
return trace optionally
dizcza Aug 4, 2020
9007517
ASSET optimized probability_matrix_analytical and looping in _jsf_uni…
dizcza Aug 5, 2020
9e67193
Naming convention (binsize -> bin_size, etc.) (#316)
dizcza Aug 7, 2020
fe7b86d
Release v0.8.0 (#340)
dizcza Aug 7, 2020
3abf36f
Merge branch 'master' into pr/spike_contrast
dizcza Aug 19, 2020
c41e06b
all quantities
dizcza Aug 19, 2020
277c8de
python2 issues
dizcza Aug 19, 2020
d56246c
python2 issues again
dizcza Aug 19, 2020
63b1f4c
Update acknowledgments.rst
mdenker Aug 28, 2020
4bd2703
added __all__ in elephant modules (#342)
dizcza Aug 31, 2020
82ffa0d
Added a warning in fanofactor function when the input spiketrains var…
dizcza Aug 31, 2020
9529f14
Merge commit 'refs/pull/338/head' of github.com:NeuralEnsemble/elepha…
dizcza Sep 1, 2020
a8aec14
fixed wrong default min_bin units
dizcza Sep 1, 2020
ff1c8a8
naming
dizcza Sep 1, 2020
0dc27b3
download & unzip API
dizcza Sep 1, 2020
af3f7fd
Feature/inhomogeneous gamma (#339)
pbouss Sep 1, 2020
e7448ea
Added information on citing Elephant to documentation, fixed bib entr…
mdenker Sep 7, 2020
9d869b2
Three surrogate methods: Trial-shifting, Bin Shuffling, ISI dithering…
pbouss Sep 7, 2020
6f342aa
SPADE: New way to count patterns for multiple testing (#347)
pbouss Sep 7, 2020
f932dea
spike synchrony doc; take the first 5 networks to run the test
dizcza Sep 7, 2020
ec1919a
renamed test module
dizcza Sep 7, 2020
352f419
group spike train correlation, dissimilarity, and synchrony
dizcza Sep 7, 2020
ed94d54
tutorials: changed wget to curl for platform compatibility (#350)
pbouss Sep 9, 2020
48fc9b0
Time-domain pairwise Granger causality (#332)
rjurkus Sep 10, 2020
f54e254
Merge branch 'master' into pr/spike_contrast
dizcza Sep 11, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
don't use nan
  • Loading branch information
dizcza committed Aug 1, 2020
commit 0779731f306a3c1207716d6d78f27242bf75ad04
200 changes: 71 additions & 129 deletions elephant/spike_contrast.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,106 +25,61 @@
import numpy as np


def get_theta_and_n_per_bin(spike_trains, t_start, t_stop, bin_size):
def _get_theta_and_n_per_bin(spiketrains, t_start, t_stop, bin_size):
"""
Calculates theta (amount of spikes per bin) and n (amount of active spike trains per bin) of one spike train.

Parameters
----------
spike_trains : np.ndarray
A spike train.
t_start : int
The beginning of the spike train.
t_stop : int
The end of the spike train.
bin_size : int
The time precision used to discretize the spiketrains (binning).

Returns
-------
theta : int
Contains the amount of spikes per bin
n : int
Contains the amount of active spike trains per bin.

Calculates theta (amount of spikes per bin) and n (amount of active spike
trains per bin) of one spike train.
"""
# Smoothing
bin_step = bin_size / 2
# Creating vector with t_start as start and t_end as end with step size bin_step
edges = np.arange(t_start, t_stop+bin_step, bin_step)
# Amount of spike trains
tmp = spike_trains.shape
amount_of_spikes = tmp[1]
histogram = np.zeros((len(edges) - 2, amount_of_spikes))
# Calculate histogram for every spike train
for i in range(0, amount_of_spikes): # for all spike trains
spike_train_i = spike_trains[:, i]
# only use non-nan values
spike_train_i = spike_train_i[~np.isnan(spike_train_i)]
histogram[:, i] = binning_half_overlap(spike_train_i, t_start, t_stop, bin_size)
# Calculate parameter over all spike trains
histogram = np.vstack([
_binning_half_overlap(st[st.nonzero()], t_start=t_start, t_stop=t_stop,
bin_size=bin_size)
for st in spiketrains
])
# Amount of spikes per bin
theta = np.sum(histogram, 1)
# Create matrix with a 1 for every non 0 element
mask = histogram != 0
# Amount of active spike trains
n = np.sum(mask, 1)
theta = histogram.sum(axis=0)
# Amount of active spike trains per bin
n = np.count_nonzero(histogram, axis=0)

return theta, n


def binning_half_overlap(spike_train_i, t_start, t_stop, bin_size):
def _binning_half_overlap(spiketrain, t_start, t_stop, bin_size):
"""
Referring to [1] overlapping the bins creates a better result.

Parameters
----------
spike_train_i : np.array
Contains all spikes of one spiketrain.
t_start : int
The beginning of the spike train.
t_stop : int
The end of the spike train.
bin_size : int
The time precision used to discretize the spike trains (binning).

Returns
-------
histogram : np.ndarray
Contains the histogram of one spike train.
Referring to [1] overlapping the bins creates a better result.
"""
bin_step = bin_size / 2
edges = np.arange(t_start, t_stop+bin_step, bin_step)
histogram, bin_edges = np.histogram(spike_train_i, edges)
histogram[0:len(histogram) - 1] = histogram[0:len(histogram) - 1] + histogram[1:len(histogram)]
return histogram[0:len(histogram) - 1]
edges = np.arange(t_start, t_stop + bin_step, bin_step)
histogram, bin_edges = np.histogram(spiketrain, edges)
histogram = histogram[:-1] + histogram[1:]
return histogram


def spike_contrast(spike_trains_elephant, t_start, t_stop, max_bin_min=0.01):
def spike_contrast(spiketrains, t_start, t_stop, min_bin=0.01):
"""
Calculates the synchrony of several spike trains. The spikes trains do not have to have the same length, the
algorithm takes care of that.

Parameters
----------
spike_trains_elephant : neo.SpikeTrain or np.ndarray
Contains all the spike trains.
t_start : float
The beginning of the spike train.
t_stop : float
The end of the spike train.
max_bin_min : float
The variable bin_min gets calculated by the algorithm, but if the calculated
value is smaller than max_bin_min it takes max_bin_min to not get smaller than it.
Default: 0.01

Returns
-------
synchrony : float
Returns the synchrony of the input spike trains.

Examples
--------
Calculates the synchrony of several spike trains. The spikes trains do not have to have the same length, the
algorithm takes care of that.

Parameters
----------
spiketrains : list of neo.SpikeTrain or list of np.ndarray
Contains all the spike trains.
t_start : float
The beginning of the spike train.
t_stop : float
The end of the spike train.
min_bin : float
The variable bin_min gets calculated by the algorithm, but if the calculated
value is smaller than max_bin_min it takes max_bin_min to not get smaller than it.
Default: 0.01

Returns
-------
synchrony : float
Returns the synchrony of the input spike trains.

Examples
--------
>>> import quantities as pq
>>> import elephant.spike_train_generation
>>> import elephant.spike_contrast
Expand All @@ -133,67 +88,54 @@ def spike_contrast(spike_trains_elephant, t_start, t_stop, max_bin_min=0.01):
>>> spikes_train_2 = homogeneous_poisson_process(50*pq.Hz, t_start=0*pq.ms,
... t_stop=1000*pq.ms, refractory_period = 3*pq.ms)
>>> spike_trains = np.array([spike_train_1, spike_train_2])
>>> print(spike_contrast(spike_trains, 0, 10000))
>>> print(spike_contrast(spiketrains_padded, 0, 10000))

"""
# Pad all spike trains with zeros, so every array has the same length
zero_array = np.zeros((1, spike_trains_elephant.shape[0]))
for i in range(0, spike_trains_elephant.shape[0]):
length_spike_train = spike_trains_elephant[i].shape
zero_array[0][i] = length_spike_train[0]
n_spiketrains = len(spiketrains)
n_spikes_total = sum(map(len, spiketrains))

# Detect the longest array
biggest_array_count = int(np.max(zero_array))
# Great new array where every spike train has the same length
spike_trains_zeros = np.zeros((spike_trains_elephant.shape[0], biggest_array_count))
for i in range(0, spike_trains_elephant.shape[0]):
spike_trains_zeros[i] = np.pad(spike_trains_elephant[i], (0, biggest_array_count -
len(spike_trains_elephant[i])))
# Get the Dimension of the spike train.
tmp = spike_trains_zeros.shape
biggest_array_count = max(map(len, spiketrains))
# pad the spiketrains with NaN
spiketrains_padded = np.vstack([
np.pad(st, pad_width=(0, biggest_array_count - len(st)),
constant_values=np.nan)
for st in spiketrains
])
# Get the transposed matrix for the algorithm
spike_trains = np.zeros((tmp[1], tmp[0]))
for i in range(tmp[0]):
for y in range(tmp[1]):
spike_trains[y][i] = spike_trains_zeros[i][y]
time = t_stop - t_start
# Set zeros to NaN (zero-padding)
spike_trains = np.where(spike_trains == 0, np.nan, spike_trains)
mask = np.isnan(spike_trains)
# Make a masked array
spike_trains_ma = np.ma.MaskedArray(spike_trains, mask)

tmp = spike_trains.shape
amount_of_spikes = int(tmp[1])
duration = t_stop - t_start

# parameter
bin_shrink_factor = 0.9 # bin size decreases by factor 0.9 for each iteration
bin_max = time / 2
isi = np.diff(spike_trains_ma, axis=0)
isi_min = np.min(isi)
bin_min = np.max([isi_min / 2, max_bin_min])
bin_max = duration / 2
isi_min = min(min(np.diff(st)) for st in spiketrains)
bin_min = max(isi_min / 2, min_bin)

# initialization
num_iterations = np.ceil(np.log(bin_min / bin_max) / np.log(bin_shrink_factor))
num_iterations = np.ceil(
np.log(bin_min / bin_max) / np.log(bin_shrink_factor))
num_iterations = int(num_iterations)
active_st = np.zeros((num_iterations, 1))
contrast = np.zeros((num_iterations, 1))
synchrony_curve = np.zeros((num_iterations, 1))
active_st = np.zeros(num_iterations)
contrast = np.zeros(num_iterations)
synchrony_curve = np.zeros(num_iterations)

num_all_spikes = spike_trains_ma.count()
bin_size = bin_max
# for 0, 1, 2, ... num_iterations
for i in range(0, num_iterations):
for iter_id in range(num_iterations):
# Set the new boundaries for the time
time_start = -isi_min
time_end = time + isi_min
time_end = duration + isi_min
# Calculate Theta and n
theta_k, n_k = get_theta_and_n_per_bin(spike_trains, time_start, time_end, bin_size)
theta_k, n_k = _get_theta_and_n_per_bin(spiketrains_padded,
t_start=time_start,
t_stop=time_end,
bin_size=bin_size)

# calculate synchrony_curve = contrast * active_st
active_st[i] = ((np.sum(n_k * theta_k)) / (np.sum(theta_k)) - 1) / (amount_of_spikes - 1)
contrast[i] = (np.sum(np.abs(np.diff(theta_k))) / (num_all_spikes * 2))
active_st[iter_id] = ((np.sum(n_k * theta_k)) / (np.sum(theta_k)) - 1) / (
n_spiketrains - 1)
contrast[iter_id] = (np.sum(np.abs(np.diff(theta_k))) / (n_spikes_total * 2))
# Contrast: sum(|derivation|) / (2*#Spikes)
synchrony_curve[i] = contrast[i] * active_st[i] # synchrony_curve
synchrony_curve[iter_id] = contrast[iter_id] * active_st[iter_id] # synchrony_curve
# New bin size
bin_size *= bin_shrink_factor
# Sync value is maximum of cost function C
Expand Down
101 changes: 66 additions & 35 deletions elephant/test/test_spike_contrast.py
Original file line number Diff line number Diff line change
@@ -1,61 +1,92 @@
import unittest
import elephant.spike_contrast as spc
import elephant.spike_train_generation as stgen

import numpy as np
from numpy.testing import assert_array_equal
from quantities import Hz, ms

import elephant.spike_contrast as spc
import elephant.spike_train_generation as stgen

class TestUM(unittest.TestCase):

def setUp(self):
pass
class TestUM(unittest.TestCase):

def test_spike_contrast(self):
np.random.seed(24) # to make the results reproducible
spike_train_1 = stgen.homogeneous_poisson_process(rate=10*Hz, t_start=0.*ms, t_stop=10000.*ms, as_array=True)
spike_train_2 = stgen.homogeneous_poisson_process(rate=10*Hz, t_start=0.*ms, t_stop=10000.*ms, as_array=True)
spike_train_3 = stgen.homogeneous_poisson_process(rate=10*Hz, t_start=0.*ms, t_stop=10000.*ms, as_array=True)
spike_train_4 = stgen.homogeneous_poisson_process(rate=10*Hz, t_start=0.*ms, t_stop=10000.*ms, as_array=True)
spike_train_5 = stgen.homogeneous_poisson_process(rate=1*Hz, t_start=0.*ms, t_stop=10000.*ms, as_array=True)
spike_train_6 = stgen.homogeneous_poisson_process(rate=1*Hz, t_start=0.*ms, t_stop=10000.*ms, as_array=True)
spike_trains = np.array(
[spike_train_1, spike_train_2, spike_train_3, spike_train_4, spike_train_5, spike_train_6])
self.assertEqual((spc.spike_contrast(spike_trains, 0, 10000)), 0.2098687702924583)
spike_train_1 = stgen.homogeneous_poisson_process(rate=10 * Hz,
t_start=0. * ms,
t_stop=10000. * ms,
as_array=True)
spike_train_2 = stgen.homogeneous_poisson_process(rate=10 * Hz,
t_start=0. * ms,
t_stop=10000. * ms,
as_array=True)
spike_train_3 = stgen.homogeneous_poisson_process(rate=10 * Hz,
t_start=0. * ms,
t_stop=10000. * ms,
as_array=True)
spike_train_4 = stgen.homogeneous_poisson_process(rate=10 * Hz,
t_start=0. * ms,
t_stop=10000. * ms,
as_array=True)
spike_train_5 = stgen.homogeneous_poisson_process(rate=1 * Hz,
t_start=0. * ms,
t_stop=10000. * ms,
as_array=True)
spike_train_6 = stgen.homogeneous_poisson_process(rate=1 * Hz,
t_start=0. * ms,
t_stop=10000. * ms,
as_array=True)
spike_trains = [spike_train_1, spike_train_2, spike_train_3,
spike_train_4, spike_train_5, spike_train_6]
synchrony = spc.spike_contrast(spike_trains, t_start=0, t_stop=10000)
self.assertEqual(synchrony, 0.2098687702924583)

def test_spike_contrast_1(self):
spike_train = stgen.homogeneous_poisson_process(rate=10*Hz, t_start=0.*ms, t_stop=10000.*ms, as_array=True)
np.random.seed(21)
spike_train = stgen.homogeneous_poisson_process(rate=10 * Hz,
t_start=0. * ms,
t_stop=10000. * ms,
as_array=True)
spike_trains = np.array([spike_train, spike_train])
self.assertEqual((spc.spike_contrast(spike_trains, 0, 10000)), 1.0)
synchrony = spc.spike_contrast(spike_trains, t_start=0, t_stop=10000)
self.assertEqual(synchrony, 1.0)

def test_spike_contrast_2(self):
spike_train_1 = stgen.homogeneous_poisson_process(rate=10*Hz, t_start=0.*ms, t_stop=10000.*ms, as_array=True)
spike_train_2 = stgen.homogeneous_poisson_process(rate=10*Hz, t_start=0.*ms, t_stop=10000.*ms, as_array=True)
spike_train_3 = stgen.homogeneous_poisson_process(rate=10*Hz, t_start=0.*ms, t_stop=10000.*ms, as_array=True)
np.random.seed(19)
spike_train_1 = stgen.homogeneous_poisson_process(rate=10 * Hz,
t_start=0. * ms,
t_stop=10000. * ms,
as_array=True)
spike_train_2 = stgen.homogeneous_poisson_process(rate=10 * Hz,
t_start=0. * ms,
t_stop=10000. * ms,
as_array=True)
spike_train_3 = stgen.homogeneous_poisson_process(rate=10 * Hz,
t_start=0. * ms,
t_stop=10000. * ms,
as_array=True)

spike_trains = np.array([spike_train_1, spike_train_2, spike_train_3])
self.assertEqual((spc.spike_contrast(spike_trains, 0, 20000)), 0.5)
synchrony = spc.spike_contrast(spike_trains, t_start=0, t_stop=20000)
self.assertEqual(synchrony, 0.5)

def test_get_theta_and_n_per_bin(self):
spike_trains = np.array([[1, 1, 1],
[2, 2, 2],
[3, 3, 2.5],
[9, 9, 0]])
spike_trains = np.where(spike_trains == 0, np.nan, spike_trains)
bin_size = 5
t_start = 0
t_stop = 10
theta, n = spc.get_theta_and_n_per_bin(spike_trains, t_start, t_stop, bin_size)
expected_theta = np.array([9, 3, 2])
expected_n = np.array([3, 3, 2])
self.assertTrue((theta == expected_theta).all())
self.assertTrue((n == expected_n).all())
[9, 9, 0]]).T
theta, n = spc._get_theta_and_n_per_bin(spike_trains,
t_start=0,
t_stop=10,
bin_size=5)
assert_array_equal(theta, [9, 3, 2])
assert_array_equal(n, [3, 3, 2])

def test_binning_half_overlap(self):
spike_train_i = np.array([1, 2, 3, 9])
t_start = 0
t_stop = 10
bin_size = 5
self.assertTrue(([3, 1, 1] == spc.binning_half_overlap(spike_train_i, t_start, t_stop, bin_size)).all())
spiketrain = np.array([1, 2, 3, 9])
histogram = spc._binning_half_overlap(spiketrain,
t_start=0, t_stop=10, bin_size=5)
assert_array_equal(histogram, [3, 1, 1])


if __name__ == '__main__':
Expand Down