Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed a number of issues since initial code release. #5

Open
wants to merge 27 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
4484301
Removing old Google and TF1 code.
MalcolmSlaney Feb 24, 2024
96caaff
Removing bazel temp directories
MalcolmSlaney Feb 24, 2024
f04f1ac
Removing bazel work files
MalcolmSlaney Feb 24, 2024
f550768
Adding .gitignore (for bazel and other temp files)
MalcolmSlaney Feb 24, 2024
4269809
Minor cleanup of cleanups.
MalcolmSlaney Feb 24, 2024
6597e99
More cleanups.
MalcolmSlaney Feb 24, 2024
fcdb48d
Remove matplotlib.use since it doesn't seen necessary any more.
MalcolmSlaney Feb 26, 2024
aa358bf
Adding assertions to make sure LDA data is good.
MalcolmSlaney Mar 18, 2024
283b413
More checks for finite results
MalcolmSlaney Mar 18, 2024
d6d152f
Make sure CCA results are finite
MalcolmSlaney Mar 19, 2024
d2db489
Mitigate possible roundoff error.
MalcolmSlaney Mar 19, 2024
10057c7
Fix LDA roundoff errror. Fix Bazel test error.
MalcolmSlaney Mar 21, 2024
1a361fd
Various fixes to all test run under MacOSX.
MalcolmSlaney Mar 22, 2024
a36d66f
Add Bazel Test message.
MalcolmSlaney Mar 22, 2024
abcf499
Addjng information about fork from Google.
MalcolmSlaney Mar 22, 2024
dc55c64
Additional assertions, looking for bad (zero) data
MalcolmSlaney Mar 22, 2024
484c24a
Error checking message
MalcolmSlaney Mar 22, 2024
3fd1e43
Several fixes for latest demo data.
MalcolmSlaney Mar 26, 2024
ae7722b
Adding first bits of code for realtime LSL interface.
MalcolmSlaney Jun 17, 2024
8b28701
Make correlation power normalization more robust.
MalcolmSlaney Jun 17, 2024
64b95fb
More local files to ignore and not push to master repository
MalcolmSlaney Jun 17, 2024
319132f
Added code to read from the streams. Many bug fixes.
MalcolmSlaney Jun 18, 2024
9924609
Adding some comments so I know how this old code works.
MalcolmSlaney Jun 25, 2024
f35ca13
Adding comments about the data buffer.
MalcolmSlaney Jun 25, 2024
dfb5078
Adding decimation to the pipeline, before adding context.
MalcolmSlaney Jun 26, 2024
6ea1bec
Fixing channel number parsing. Adding test for string parsing, end to…
MalcolmSlaney Jun 27, 2024
764bcbf
Cleaning up comments when testing from a string parameter.
MalcolmSlaney Jun 27, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Added code to read from the streams. Many bug fixes.
  • Loading branch information
MalcolmSlaney committed Jun 18, 2024
commit 319132f9bfc56efe927932eeadeafe7a6d9da5a1
160 changes: 123 additions & 37 deletions telluride_decoding/realtime.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import math
import threading
import time

from dataclasses import dataclass
from typing import List, Optional, Tuple
Expand Down Expand Up @@ -50,27 +52,45 @@ def add_data(self, new_data: np.ndarray):
self._buffer_time += new_data.shape[0]

def get_data(self, frame_time: int, frame_count: int):
frame_count = min(frame_count, self._buffer_time-frame_time)
if frame_count <= 0:
return None
if not self._buffer_count:
logging.warning('get_data warning: No data yet')
return None
if frame_time >= self._buffer_time:
logging.warning(f'get_data warning: Too far in the future ({frame_time})')
return None # Too far in the future
if frame_time < self._buffer_time - self._buffer_count:
logging.warning(f'get_data warning: Too far in the past ({frame_time})')
return None # Too far in the past

buffer_start = frame_time % self._buffer_count
if buffer_start >= self._buffer_index:
buffer_end = min(self._buffer_count, self._buffer_index + frame_count)
first_part = self._data[buffer_start:buffer_end, :]
first_start = frame_time % self._buffer_count
if first_start >= self._buffer_index:
# Get the piece that is forward of the buffer index.
first_end = min(self._buffer_count, first_start + frame_count)
first_part = self._data[first_start:first_end, :]
assert first_part.shape[0], (f'{frame_time}, {self._buffer_count},'
f'{self._buffer_index}, {first_start},'
f' {first_end}, {frame_count}')
frame_count -= first_part.shape[0]
buffer_start = 0
first_start = 0
else:
first_part = None

second_start = first_start
if frame_count > 0:
frame_count = min(frame_count, self._buffer_index)
second_part = self._data[buffer_start:frame_count, :]
# Now get the part that is at the start of the buffer, before the index
frame_count = min(frame_count, self._buffer_index-second_start)
second_part = self._data[int(second_start):int(second_start+frame_count),
:]

if first_part is None:
assert second_part.shape[0], (f'{frame_time}, {self._buffer_count}'
f'{self._buffer_index}, {second_start}, '
f'{frame_count}')
return second_part
assert second_part.shape[0]
return np.concatenate((first_part, second_part), axis=0)
return first_part

Expand All @@ -86,14 +106,15 @@ def __init__(self, sample_rate: float, buffer_count: Optional[int] = None,
if sample_rate <= 0:
logging.error(f'Sample rate for {self._name} TimeStream can not '
f'be {sample_rate}')
self._sample_rate = float(sample_rate)
self._start_time = 0
self._end_time = 0
buffer_count = buffer_count or int(sample_rate)
self._sample_rate = float(sample_rate) # Samples per second
self._start_time = 0 # in Seconds
self._end_time = 0 # in Seconds
buffer_count = int(buffer_count or sample_rate)
super().__init__(buffer_count, dtype=dtype)

def get_data_at_time(self, time: float, frame_count: int):
return super().get_data(math.floor(time*self._sample_rate), frame_count)
return super().get_data(int((time-self.start_time)*self._sample_rate),
frame_count)

def add_data_at_time(self, data, timestamp):
if self._start_time == 0:
Expand All @@ -108,25 +129,36 @@ def add_data_at_time(self, data, timestamp):
f'{delta_samples} samples gap.')
self.add_data(data)
self._end_time += data.shape[0]/self._sample_rate
# print(f'{self._name}: {self._start_time}, {self._end_time}, {data.shape}')

@property
def sample_rate(self):
return self._sample_rate

@property
def start_time(self):
"""Returns last data time received in seconds."""
return self._start_time

@property
def end_time(self):
"""Returns first data time seen in seconds."""
return self._end_time


def end_stream_time(time_streams: List[TimeStream]):
"""Go through all the listed TimeStream objects and retrieve the latest time
for which all streams have good data."""
return min([ts.end_time for ts in time_streams])
return min([ts.end_time for ts in time_streams if ts])


def start_stream_time(time_streams: List[TimeStream]):
"""Go through all the listed TimeStream objects and retrieve the last time
for which any streams has good data."""
times = [ts.start_time for ts in time_streams if ts]
print('Start times:', times)
if 0 in times:
return 0
return max(times)


############## Python Lab Stream Layer #################################
Expand Down Expand Up @@ -189,13 +221,52 @@ def open_stream(name: str, debug: bool = False):
return inlet

@dataclass
class BrainItems:
class BrainItem:
name: str
lsl: pylsl.StreamInlet
stream: TimeStream
stream: Optional[TimeStream] = None
thread: Optional[threading.Thread] = None
lock:Optional[threading.Lock] = None


def read_stream_thread(brain_item: BrainItem):
print('Starting thread for stream', brain_item.name)
brain_item.lock = threading.Lock()

inlet = brain_item.lsl
ts = brain_item.stream
while True:
timestamp, data = read_from_inlet(inlet, timeout=0.01)
if not timestamp:
continue
# print(f'Read from {brain_item.name} inlet returned', data.shape, 'at', timestamp)
if 'Marker' in brain_item.name:
print(f'Marker found at {timestamp}: {data[0][0]}')
if ts:
with brain_item.lock:
endtime = ts.add_data_at_time(data, timestamp)


all_stream_names = ['MyAudioStream', 'actiCHamp-18110006', 'NextSense',
'MarkerSTR_audio']


def read_streamed_data(brain_items: List[BrainItem], start_time: float,
duration: float):
all_streams = [bi.stream for bi in brain_items.values() if bi.stream]
while end_stream_time(all_streams) < start_time + duration:
print('pausing..', end='')
time.sleep(.1)

all_stream_names = ['MyAudioStream', 'actiCHamp-18110006', 'NextSense']
results = []
for bi in brain_items.values():
stream = bi.stream
if stream:
frame_count = int(stream.sample_rate * duration)
with bi.lock:
data = stream.get_data_at_time(start_time, frame_count)
results.append(data)
return results


def main():
Expand All @@ -205,7 +276,8 @@ def main():
# iterate over found streams, creating specialized inlet objects that will
# handle plotting the data
for info in streams:
print(f'Type: {info.type()}, name: {info.name()}, sr={info.nominal_srate()}')
print(f'Type: {info.type()}, name: {info.name()}, '
f'sr={info.nominal_srate()}')
# if info.type() == "Markers":
# if (
# info.nominal_srate() != pylsl.IRREGULAR_RATE
Expand All @@ -227,29 +299,43 @@ def main():
inlet = open_stream(name)
info = inlet.info()
print(f'The {name} sample rate is {info.nominal_srate()}Hz')
ts = TimeStream(sample_rate=info.nominal_srate(), name=name)
all_streams[name] = BrainItems(name, inlet, ts)

for _ in range(10):
for brain_item in all_streams.values():
inlet = brain_item.lsl
ts = brain_item.stream
while True:
timestamp, data = read_from_inlet(inlet, timeout=0.01)
if not timestamp:
break
print(f'Read from {name} inlet returned', data.shape, 'at', timestamp)
endtime = ts.add_data_at_time(data, timestamp)
break
print('\n')
if info.nominal_srate() > 0:
ts = TimeStream(sample_rate=info.nominal_srate(),
buffer_count=4*info.nominal_srate(),
name=name)
else:
ts = 0
my_stream = BrainItem(name, inlet, ts)
thread = threading.Thread(target=read_stream_thread, args=[my_stream,],
daemon=True)
my_stream.thread = thread
all_streams[name] = my_stream
thread.start()

all_data_streams = [bi.stream for bi in all_streams.values() if bi.stream]
all_stream_objects = [bi.stream for bi in all_streams.values()]

start_time = 0
while start_time == 0:
start_time = start_stream_time(all_stream_objects)
time.sleep(1)

window_size = 0.1

for _ in range(300):
results = read_streamed_data(all_streams, start_time, .10)
# print(results)
print(start_time, [d.shape for d in results])
time.sleep(1)
start_time += 1

for brain_item in all_streams.values():
ts = brain_item.stream
print(f'TimeStream {brain_item.name}')
print(f' Sample rate: {ts.sample_rate}')
print(f' Total seconds recorded: {ts.end_time - ts.start_time}s')
ts = brain_item.stream
if ts:
print(f' Sample rate: {ts.sample_rate}')
print(f' Total seconds recorded: {ts.end_time - ts.start_time}s')

all_stream_objects = [bi.stream for bi in all_streams.values()]
print('Latest stream time is', end_stream_time(all_stream_objects))

if __name__ == '__main__':
Expand Down
3 changes: 2 additions & 1 deletion test/realtime_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ def test_datastream(self):
np.testing.assert_equal(ds._data,
np.array([[0, 1], [2, 3], [4, 5],
[6, 7], [0, 0], [0, 0]]))
self.assertFalse(ds.get_data(4, 2))

ds.add_data(b+8)
np.testing.assert_equal(ds._data,
Expand All @@ -23,7 +24,7 @@ def test_datastream(self):
d = ds.get_data(5, 4)
np.testing.assert_equal(d, np.asarray([[10, 11], [12, 13], [14, 15]]))

self.assertFalse(ds.get_data(8, 4))
self.assertFalse(ds.get_data(16, 4))

ground_truth = np.concatenate((b, b+8), axis=0)
for i in range(2, 10):
Expand Down