Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Nm database #350

Merged
merged 33 commits into from
Aug 1, 2024
Merged
Show file tree
Hide file tree
Changes from 23 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
c3e60c7
first version of database integration
SamedVossberg Jun 20, 2024
6f7d7e5
debug db commit
SamedVossberg Jun 20, 2024
5493b28
debug paths, adjust timing for lsl
SamedVossberg Jun 20, 2024
1742b93
check permission of github workflow
SamedVossberg Jun 20, 2024
39d7a1e
fixing permission
SamedVossberg Jun 20, 2024
edcac19
fixing gh workflow permission
SamedVossberg Jun 20, 2024
348f398
set permissions for file and dir
SamedVossberg Jun 20, 2024
4ba784a
checking for paths before setting permissions
SamedVossberg Jun 20, 2024
87edcc9
fixing permissions
SamedVossberg Jun 20, 2024
434168d
another try fixing gh workflow permissions
SamedVossberg Jun 20, 2024
d073663
adding time_idx to db path to resolve conflicts
SamedVossberg Jun 25, 2024
7e86324
refactor the database
SamedVossberg Jun 25, 2024
b181749
possibility to save feature_df as csv
SamedVossberg Jun 26, 2024
4964333
auto delete old example dbs
SamedVossberg Jun 26, 2024
37813d4
debug db handling
SamedVossberg Jun 26, 2024
f91cbcc
conditional create table in run method, delete permission adjustments…
SamedVossberg Jun 26, 2024
e586cfa
debug times
SamedVossberg Jun 26, 2024
d1c6ecd
changed paths for dbs and db creation
SamedVossberg Jun 26, 2024
16fa9d5
clean up nm_database
SamedVossberg Jun 27, 2024
a1bd366
resolve nm_steam file name
SamedVossberg Jun 27, 2024
e315fca
Merge branch 'main' into nm_database
SamedVossberg Jun 27, 2024
aa3250a
added typehints, added save_interval param to run
SamedVossberg Jun 28, 2024
8aac64d
add last commit to db
SamedVossberg Jun 28, 2024
f577d57
Toni review
toni-neurosc Jul 26, 2024
7a1cf33
Fix db bug
toni-neurosc Jul 26, 2024
ccfcc88
- Replace spaces with underscores for freq band
toni-neurosc Jul 29, 2024
1c181e5
Change "folder_name" parameter for "prefix" downstream of nm.Stream
toni-neurosc Jul 29, 2024
de64766
Generate unique filenames for new database files
toni-neurosc Jul 29, 2024
f101788
Format and fix Ruff warnings
toni-neurosc Jul 29, 2024
765a182
change default params
timonmerk Aug 1, 2024
d037a05
add simple test for nm_db write
timonmerk Aug 1, 2024
e058b96
add catch to create test folder
timonmerk Aug 1, 2024
c0d9ff1
simplify nm_database create out_dir
toni-neurosc Aug 1, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ coverage.xml
*.py,cover
.hypothesis/
.pytest_cache/
/test_data

# Translations
*.mo
Expand Down
2 changes: 1 addition & 1 deletion examples/plot_0_first_demo.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ def generate_random_walk(NUM_CHANNELS, TIME_DATA_SAMPLES):
line_noise=50,
)

features = stream.run(data)
features = stream.run(data, save_csv=True)

# %%
# Feature Analysis
Expand Down
1 change: 1 addition & 0 deletions examples/plot_1_example_BIDS.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@
data=data,
out_path_root=PATH_OUT,
folder_name=RUN_NAME,
save_csv = True,
)

# %%
Expand Down
2 changes: 1 addition & 1 deletion examples/plot_3_example_sharpwave_analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,7 @@
verbose=True,
)

df_features = stream.run(data=data[:, :30000])
df_features = stream.run(data=data[:, :30000], save_csv=True)

# %%
# We can then plot two exemplary features, prominence and interval, and see that the movement amplitude can be clustered with those two features alone:
Expand Down
1 change: 1 addition & 0 deletions examples/plot_4_example_gridPointProjection.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@
data=data[:, :int(sfreq*5)],
out_path_root=PATH_OUT,
folder_name=RUN_NAME,
save_csv=True,
)

# %%
Expand Down
107 changes: 107 additions & 0 deletions py_neuromodulation/nm_database.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
import os
import sqlite3
from pathlib import Path
import numpy as np
import pandas as pd
from py_neuromodulation.nm_types import _PathLike

class NMDatabase:
"""
Class to create a database and insert data into it.
Parameters
----------
out_path_root : _PathLike
The root path to save the database.
folder_name : str
The folder name to save the database.
csv_path : str, optional
The path to save the csv file. If not provided, it will be saved in the same folder as the database.
"""
def __init__(
self,
out_path_root: _PathLike,
folder_name: str,
csv_path: _PathLike | None = None):
self.out_path_root = out_path_root
self.folder_name = folder_name
self.db_path = Path(out_path_root, folder_name, "stream.db")
if csv_path is None:
self.csv_path = Path(out_path_root, folder_name, f"stream.csv")
else:
self.csv_path = Path(csv_path)

if os.path.exists(self.db_path):
SamedVossberg marked this conversation as resolved.
Show resolved Hide resolved
os.remove(self.db_path)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's better to throw an error here than straigh-up deleting a previous file if it exists, this could lead to unwanted data losses

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very elegant solution with the incremental file names!


self.conn = sqlite3.connect(self.db_path, isolation_level=None)
self.cursor = self.conn.cursor()

def infer_type(self, value):
"""Infer the type of the value to create the table schema.
Parameters
----------
value : int, float, str
The value to infer the type."""

if isinstance(value, (int, float)):
return "REAL"
elif isinstance(value, str):
return "TEXT"
else:
return "BLOB"

def cast_values(self, feature_dict: dict):
"""Cast the int values of the dictionary to float.
Parameters
----------
feature_dict : dict
The dictionary to cast the values."""
for key, value in feature_dict.items():
if isinstance(value, (int, float, np.int64)):
feature_dict[key] = float(value)
return feature_dict

def create_table(self, feature_dict: dict):
"""
Create a table in the database.
Parameters
----------
feature_dict : dict
The dictionary with the feature names and values.
"""
columns_schema = ", ".join([f'"{column}" {self.infer_type(value)}' for column, value in feature_dict.items()])
self.cursor.execute(f"CREATE TABLE IF NOT EXISTS stream_table ({columns_schema})")

def insert_data(self, feature_dict: dict):
"""
Insert data into the database.
Parameters
----------
feature_dict : dict
The dictionary with the feature names and values.
"""
columns = ", ".join([f'"{column}"' for column in feature_dict.keys()])
placeholders = ", ".join(["?" for _ in feature_dict])
insert_sql = f"INSERT INTO stream_table ({columns}) VALUES ({placeholders})"
values = tuple(feature_dict.values())
self.cursor.execute(insert_sql, values)

def commit(self):
self.conn.commit()

def fetch_all(self):
""""
Fetch all the data from the database.
Returns
-------
pd.DataFrame
The data in a pandas DataFrame.
"""
return pd.read_sql_query("SELECT * FROM stream_table", self.conn)

def save_as_csv(self):
df = self.fetch_all()
df.to_csv(self.csv_path, index=False)

def close(self):
self.conn.close()
47 changes: 38 additions & 9 deletions py_neuromodulation/nm_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import pandas as pd
from pathlib import Path
from py_neuromodulation.nm_stream_abc import NMStream
from py_neuromodulation.nm_database import NMDatabase
from py_neuromodulation.nm_types import _PathLike
from py_neuromodulation import logger

Expand All @@ -14,7 +15,7 @@

class _GenericStream(NMStream):
"""_GenericStream base class.
This class can be inherited for different types of offline streams
This class can be inhereted for different types of offline streams

Parameters
----------
Expand Down Expand Up @@ -84,10 +85,13 @@ def _run(
is_stream_lsl: bool = True,
stream_lsl_name: str = None,
plot_lsl: bool = False,
save_csv: bool = False,
save_interval: int = 10,
) -> pd.DataFrame:
from py_neuromodulation.nm_generator import raw_data_generator
# from py_neuromodulation.nm_database import NMDatabase

if not is_stream_lsl:
from py_neuromodulation.nm_generator import raw_data_generator
generator = raw_data_generator(
data=data,
settings=self.settings,
Expand Down Expand Up @@ -117,9 +121,11 @@ def _run(

generator = self.lsl_stream.get_next_batch()

l_features: list[dict] = []
last_time = None

buff_cnt: int = 0
db = NMDatabase(out_path_root, folder_name)
data_acquired = False
while True:
next_item = next(generator, None)

Expand All @@ -134,23 +140,38 @@ def _run(
data_batch.astype(np.float64)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I realized we don't need this, mne_lsl returns the data in np.float64 format and astype actually makes a whole copy of the data, which is not good for performance

)
if is_stream_lsl:
feature_dict["time"] = time_[-1]
if self.verbose:
if last_time is not None:
logger.debug("%.3f seconds of new data processed", time_[-1] - last_time)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think DEBUG level messages should depend on verbose, since DEBUG usually is disabled by default unless you set the level of logging to DEBUG explicitely

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch

feature_dict["time"] = time_[-1] - last_time
else:
feature_dict["time"] = 0
last_time = time_[-1]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the recording of the time is not quite right here. The logger should be outputting the seconds processed for the first batch, and the recorded time should be either the start or the end of the batch, not the length of it. I have attempted a fix in my commit, check it out.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But here the intention was to say how much data was processed in the new batch, since this can actually vary with LSL

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, did you already push the commit?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not yet, I'm doing a bunch of other changes. My point here is, there are 2 things you might want to calculate:

  • How much data you have process, in terms of time period: for this you of course take the difference between the last time of the previous batch, and the first of the current batch. You display it in the logger for sure, but you can also save it, but where?
  • The timestamp of each sample: Since the timestamp for each single data point gets lost during feature calculation (due to resampling, averaging, and other transformations) you would default to using either the first timestamp of the batch, the last, or maybe the average.

Now the question would be, which of these belongs in the features_df["time"] field? Back in the office I proposed having 2 separate columns, even 3 (an additional 1 for batch number or processing timestamp).

else:
feature_dict["time"] = np.ceil(time_[-1] * 1000 +1 ).astype(int)
feature_dict["time"] = np.ceil(time_[-1] * 1000 +1 )
logger.info("Time: %.2f", feature_dict["time"]/1000)


self._add_target(feature_dict, data_batch)
buff_cnt += 1

l_features.append(feature_dict)
feature_dict = db.cast_values(feature_dict)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that casting is not necessary, we should ensure that the feature outputs are always np.float64, and even if casting is necessary, I don't think the logic for casting should be inside of the database class

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree

if not data_acquired:
db.create_table(feature_dict)
data_acquired = True
db.insert_data(feature_dict)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I moved the table creation logic into the database class


feature_df = pd.DataFrame(l_features)
if buff_cnt >= save_interval:
db.commit()
buff_cnt = 0

self.save_after_stream(out_path_root, folder_name, feature_df)
db.commit()
feature_df = db.fetch_all()


self.save_after_stream(out_path_root, folder_name, feature_df, save_csv=save_csv)

db.close()

return feature_df

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if returning the dataframe makes a lot of sense anymore, since it's only holding the 1 row when save_csv = False. I know it's needed for some examples, but maybe for offline analysis or custom analysis we should provide a different interface.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but I think it's still nice interface to kind of see the structure what features were calculated for which channels

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem is, for long recordings, the moment the recording finishes it will try to load the entire database in memory and if it's too big it will hand or crash. Luckily thanks to the db itself this wouldn't cause a data-loss, but it's kinda awkward. For now I have added a return_df parameter True by default.

Expand Down Expand Up @@ -295,7 +316,9 @@ def run(
folder_name: str = "sub",
stream_lsl: bool = False,
stream_lsl_name: str = None,
save_csv: bool = False,
plot_lsl: bool = False,
save_interval: float = 1.0,
) -> pd.DataFrame:
"""Call run function for offline stream.

Expand All @@ -314,6 +337,10 @@ def run(
stream name, by default None
plot_lsl : bool, optional
plot data with mne_lsl stream_viewer
save_csv : bool, optional
save csv file, by default False
save_interval : int, optional
save interval in number of samples, by default 10

Returns
-------
Expand Down Expand Up @@ -343,5 +370,7 @@ def run(
folder_name,
is_stream_lsl=stream_lsl,
stream_lsl_name=stream_lsl_name,
plot_lsl=plot_lsl,
save_csv = save_csv,
plot_lsl = plot_lsl,
save_interval = save_interval,
)
4 changes: 4 additions & 0 deletions py_neuromodulation/nm_stream_abc.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ def save_after_stream(
out_path_root: _PathLike = "",
folder_name: str = "sub",
feature_arr: pd.DataFrame | None = None,
save_csv : bool = False,
) -> None:
"""Save features, settings, nm_channels and sidecar after run"""

Expand All @@ -155,6 +156,9 @@ def save_after_stream(

self.save_sidecar(out_path_root, folder_name)

if not save_csv:
feature_arr = feature_arr.head()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's better to not load the whole dataframe and subset it here, we can read just the first line from the SQLite database and pass it to this function


if feature_arr is not None:
self.save_features(out_path_root, folder_name, feature_arr)

Expand Down
6 changes: 3 additions & 3 deletions tests/test_all_features.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ def test_all_features_random_array():
arr = np.random.random([2, 2000])
stream = get_example_stream(arr)

df = stream.run(arr)
df = stream.run(arr, out_path_root="./test_data", folder_name="test_all_features_random_array")
timonmerk marked this conversation as resolved.
Show resolved Hide resolved

assert df.shape[0] != 0 # terrible test

Expand All @@ -38,7 +38,7 @@ def test_all_features_zero_array():
stream = get_example_stream(arr)
stream.settings.features.fooof = False # Can't use fooof with zero values (log(0) undefined)

df = stream.run(arr)
df = stream.run(arr, out_path_root="./test_data", folder_name="test_all_features_zero_array")

assert df.shape[0] != 0 # terrible test

Expand All @@ -50,6 +50,6 @@ def test_all_features_NaN_array():
stream = get_example_stream(arr)
stream.settings.features.fooof = False # Can't use fooof nan values

df = stream.run(arr)
df = stream.run(arr, out_path_root="./test_data", folder_name="test_all_features_NaN_array")

assert df.shape[0] != 0 # terrible test
2 changes: 1 addition & 1 deletion tests/test_bispectra.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,6 @@ def test_bispectrum():
coord_names=coord_names,
)

features = stream.run(np.expand_dims(data[3, :], axis=0))
features = stream.run(np.expand_dims(data[3, :], axis=0), out_path_root="./test_data", folder_name="test_bispectrum")

assert features["ECOG_RIGHT_1_Bispectrum_phase_mean_whole_fband_range"].sum() != 0
12 changes: 6 additions & 6 deletions tests/test_feature_sampling_rates.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ def test_different_sampling_rate_100Hz():
sfreq=1000, nm_channels=nm_channels, settings=settings, verbose=True
)

df = stream.run(arr_test)
df = stream.run(arr_test, out_path_root="./test_data", folder_name="test_different_sampling_rate_100Hz")

# check the difference between time points
#print(df["time"].iloc[:2])
Expand All @@ -44,7 +44,7 @@ def test_different_sampling_rate_10Hz():
sfreq=1000, nm_channels=nm_channels, settings=settings, verbose=True
)

df = stream.run(arr_test)
df = stream.run(arr_test, out_path_root="./test_data", folder_name="test_different_sampling_rate_10Hz")

# check the difference between time points

Expand All @@ -62,7 +62,7 @@ def test_different_sampling_rate_1Hz():
sfreq=1000, nm_channels=nm_channels, settings=settings, verbose=True
)

df = stream.run(arr_test)
df = stream.run(arr_test, out_path_root="./test_data", folder_name="test_different_sampling_rate_1Hz")

# check the difference between time points

Expand All @@ -80,7 +80,7 @@ def test_different_sampling_rate_0DOT1Hz():
sfreq=1000, nm_channels=nm_channels, settings=settings, verbose=True
)

df = stream.run(arr_test)
df = stream.run(arr_test, out_path_root="./test_data", folder_name="test_different_sampling_rate_0DOT1Hz")

# check the difference between time points

Expand Down Expand Up @@ -114,7 +114,7 @@ def test_different_segment_lengths():
sfreq=1000, nm_channels=nm_channels, settings=settings, verbose=True
)

df_seglength_800 = stream.run(arr_test)
df_seglength_800 = stream.run(arr_test, out_path_root="./test_data", folder_name="test_different_segment_lengths_800")

segment_length_features_ms = 1000

Expand All @@ -128,7 +128,7 @@ def test_different_segment_lengths():
sfreq=1000, nm_channels=nm_channels, settings=settings, verbose=True
)

df_seglength_1000 = stream.run(arr_test)
df_seglength_1000 = stream.run(arr_test, out_path_root="./test_data", folder_name="test_different_segment_lengths_1000")
# check the difference between time points

print(df_seglength_1000.columns)
Expand Down
2 changes: 1 addition & 1 deletion tests/test_lsl_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ def test_offline_lsl(setup_default_stream_fast_compute, setup_lsl_player, setup_

data, stream = setup_default_stream_fast_compute

features = stream.run(stream_lsl = True, plot_lsl= False, stream_lsl_name='offline_test')
features = stream.run(stream_lsl = True, plot_lsl= False, stream_lsl_name='offline_test', out_path_root="./test_data", folder_name="test_offline_lsl")
# check sfreq
assert raw.info['sfreq'] == stream.sfreq, "Expected same sampling frequency in the stream and input file"
assert player.player.info['sfreq'] == stream.sfreq, "Expected same sampling frequency in the stream and player"
Expand Down
2 changes: 1 addition & 1 deletion tests/test_nan_values.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ def test_stream_with_none_data():

stream = nm.Stream(fs, data)

features = stream.run(data)
features = stream.run(data, out_path_root="./test_data", folder_name="test_stream_with_none_data")

# assert if all features if name ch0 are None
assert len(
Expand Down
Loading