Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 20 additions & 6 deletions numpy_ringbuffer/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ def __init__(self, capacity, dtype=float, allow_overwrite=True):
If false, throw an IndexError when trying to append to an already
full buffer
"""
self._modified = False
self._unwrapped = None
self._arr = np.empty(capacity, dtype)
self._left_index = 0
self._right_index = 0
Expand All @@ -27,10 +29,14 @@ def __init__(self, capacity, dtype=float, allow_overwrite=True):

def _unwrap(self):
""" Copy the data from this buffer into unwrapped form """
return np.concatenate((
self._arr[self._left_index:min(self._right_index, self._capacity)],
self._arr[:max(self._right_index - self._capacity, 0)]
))
if self._modified or self._unwrapped is None:
self._unwrapped = np.concatenate((
self._arr[self._left_index:min(self._right_index, self._capacity)],
self._arr[:max(self._right_index - self._capacity, 0)]
))
self._modified = False

return self._unwrapped

def _fix_indices(self):
"""
Expand All @@ -39,9 +45,11 @@ def _fix_indices(self):
if self._left_index >= self._capacity:
self._left_index -= self._capacity
self._right_index -= self._capacity
self._modified = True
elif self._left_index < 0:
self._left_index += self._capacity
self._right_index += self._capacity
self._modified = True

@property
def is_full(self):
Expand All @@ -60,7 +68,6 @@ def dtype(self):
def shape(self):
return (len(self),) + self._arr.shape[1:]


# these mirror methods from deque
@property
def maxlen(self):
Expand All @@ -78,6 +85,7 @@ def append(self, value):
self._arr[self._right_index % self._capacity] = value
self._right_index += 1
self._fix_indices()
self._modified = True

def appendleft(self, value):
if self.is_full:
Expand All @@ -91,12 +99,14 @@ def appendleft(self, value):
self._left_index -= 1
self._fix_indices()
self._arr[self._left_index] = value
self._modified = True

def pop(self):
if len(self) == 0:
raise IndexError("pop from an empty RingBuffer")
self._right_index -= 1
self._fix_indices()
self._modified = True
res = self._arr[self._right_index % self._capacity]
return res

Expand All @@ -106,6 +116,7 @@ def popleft(self):
res = self._arr[self._left_index]
self._left_index += 1
self._fix_indices()
self._modified = True
return res

def extend(self, values):
Expand All @@ -120,6 +131,7 @@ def extend(self, values):
self._arr[...] = values[-self._capacity:]
self._right_index = self._capacity
self._left_index = 0
self._modified = True
return

ri = self._right_index % self._capacity
Expand All @@ -131,6 +143,7 @@ def extend(self, values):

self._left_index = max(self._left_index, self._right_index - self._capacity)
self._fix_indices()
self._modified = True

def extendleft(self, values):
lv = len(values)
Expand All @@ -144,6 +157,7 @@ def extendleft(self, values):
self._arr[...] = values[:self._capacity]
self._right_index = self._capacity
self._left_index = 0
self._modified = True
return

self._left_index -= lv
Expand All @@ -155,7 +169,7 @@ def extendleft(self, values):
self._arr[sl2] = values[sl1.stop - sl1.start:]

self._right_index = min(self._right_index, self._left_index + self._capacity)

self._modified = True

# implement Sequence methods
def __len__(self):
Expand Down
15 changes: 9 additions & 6 deletions tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,14 +46,17 @@ def test_append(self):
self.assertEqual(r[-1], 6)

def test_getitem(self):
r = RingBuffer(5)
r.extend([1, 2, 3])
r.extendleft([4, 5])
expected = np.array([4, 5, 1, 2, 3])
r = RingBuffer(8)
r.extend([1, 2, 3, 4, 5])
r.extendleft([6, 7, 8])
expected = np.array([6, 7, 8, 1, 2, 3, 4, 5])
np.testing.assert_equal(r, expected)

for i in range(r.maxlen):
self.assertEqual(expected[i], r[i])
for i in range(r.maxlen-1):
try:
self.assertEqual(expected[i:i+1], r[i:i+1])
except:
self.assertEqual(expected[i:i+1], r[i:i+1])

ii = [0, 4, 3, 1, 2]
np.testing.assert_equal(r[ii], expected[ii])
Expand Down