-
Notifications
You must be signed in to change notification settings - Fork 38
implement data compression using mtscomp
for openephys and spikeglx for neuropixels data
#89
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -4,6 +4,7 @@ | |
import numpy as np | ||
import inspect | ||
import importlib | ||
import gc | ||
from decimal import Decimal | ||
|
||
from element_interface.utils import find_root_directory, find_full_path, dict_to_uuid | ||
|
@@ -273,6 +274,10 @@ def make(self, key): | |
self.EphysFile.insert([{**key, | ||
'file_path': fp.relative_to(root_dir).as_posix()} | ||
for fp in probe_data.recording_info['recording_files']]) | ||
Comment on lines
274
to
276
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The generator is even more efficient that then the list self.EphysFile.insert({**key,
'file_path': fp.relative_to(root_dir).as_posix()}
for fp in probe_data.recording_info['recording_files']) |
||
# explicitly garbage collect "dataset" | ||
# as these may have large memory footprint and may not be cleared fast enough | ||
del probe_data, dataset | ||
gc.collect() | ||
else: | ||
raise NotImplementedError(f'Processing ephys files from' | ||
f' acquisition software of type {acq_software} is' | ||
|
@@ -862,10 +867,17 @@ def get_spikeglx_meta_filepath(ephys_recording_key): | |
def get_openephys_probe_data(ephys_recording_key): | ||
inserted_probe_serial_number = (ProbeInsertion * probe.Probe | ||
& ephys_recording_key).fetch1('probe') | ||
sess_dir = find_full_path(get_ephys_root_data_dir(), | ||
session_dir = find_full_path(get_ephys_root_data_dir(), | ||
get_session_directory(ephys_recording_key)) | ||
loaded_oe = openephys.OpenEphys(sess_dir) | ||
return loaded_oe.probes[inserted_probe_serial_number] | ||
loaded_oe = openephys.OpenEphys(session_dir) | ||
probe_data = loaded_oe.probes[inserted_probe_serial_number] | ||
|
||
# explicitly garbage collect "loaded_oe" | ||
# as these may have large memory footprint and may not be cleared fast enough | ||
del loaded_oe | ||
gc.collect() | ||
|
||
return probe_data | ||
|
||
|
||
def get_neuropixels_channel2electrode_map(ephys_recording_key, acq_software): | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -4,6 +4,7 @@ | |
import numpy as np | ||
import inspect | ||
import importlib | ||
import gc | ||
from decimal import Decimal | ||
|
||
from element_interface.utils import find_root_directory, find_full_path, dict_to_uuid | ||
|
@@ -325,6 +326,10 @@ def make(self, key): | |
self.EphysFile.insert([{**key, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. also, replace list comprehension with generator. |
||
'file_path': fp.relative_to(root_dir).as_posix()} | ||
for fp in probe_data.recording_info['recording_files']]) | ||
# explicitly garbage collect "dataset" | ||
# as these may have large memory footprint and may not be cleared fast enough | ||
del probe_data, dataset | ||
gc.collect() | ||
else: | ||
raise NotImplementedError(f'Processing ephys files from' | ||
f' acquisition software of type {acq_software} is' | ||
|
@@ -877,7 +882,14 @@ def get_openephys_probe_data(ephys_recording_key): | |
session_dir = find_full_path(get_ephys_root_data_dir(), | ||
get_session_directory(ephys_recording_key)) | ||
loaded_oe = openephys.OpenEphys(session_dir) | ||
return loaded_oe.probes[inserted_probe_serial_number] | ||
probe_data = loaded_oe.probes[inserted_probe_serial_number] | ||
|
||
# explicitly garbage collect "loaded_oe" | ||
# as these may have large memory footprint and may not be cleared fast enough | ||
del loaded_oe | ||
gc.collect() | ||
|
||
return probe_data | ||
|
||
|
||
def get_neuropixels_channel2electrode_map(ephys_recording_key, acq_software): | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,8 +1,10 @@ | ||
from datetime import datetime | ||
import numpy as np | ||
import pathlib | ||
import logging | ||
from .utils import convert_to_number | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
AP_GAIN = 80 # For NP 2.0 probes; APGain = 80 for all AP (LF is computed from AP) | ||
|
||
|
@@ -159,6 +161,69 @@ def validate_file(self, file_type='ap'): | |
if file_size != meta.meta['fileSizeBytes']: | ||
raise IOError(f'File size error! {file_path} may be corrupted or in transfer?') | ||
|
||
def compress(self): | ||
from mtscomp import compress as mts_compress | ||
|
||
ap_file = self.root_dir / (self.root_name + '.ap.bin') | ||
lfp_file = self.root_dir / (self.root_name + '.lf.bin') | ||
|
||
meta_mapping = {'ap': self.apmeta, 'lfp': self.lfmeta} | ||
|
||
compressed_files = [] | ||
for bin_fp, band_type in zip([ap_file, lfp_file], ['ap', 'lfp']): | ||
if not bin_fp.exists(): | ||
raise FileNotFoundError(f'Compression error - "{bin_fp}" does not exist') | ||
cbin_fp = bin_fp.parent / f'{bin_fp.stem}.cbin' | ||
ch_fp = bin_fp.parent / f'{bin_fp.stem}.ch' | ||
|
||
if cbin_fp.exists(): | ||
assert ch_fp.exists() | ||
logger.info(f'Compressed file exists ({cbin_fp}), skipping...') | ||
continue | ||
|
||
try: | ||
mts_compress(bin_fp, cbin_fp, ch_fp, | ||
sample_rate=meta_mapping[band_type]['sample_rate'], | ||
n_channels=meta_mapping[band_type]['num_channels'], | ||
dtype=np.memmap(bin_fp).dtype) | ||
except Exception as e: | ||
cbin_fp.unlink(missing_ok=True) | ||
ch_fp.unlink(missing_ok=True) | ||
raise e | ||
else: | ||
compressed_files.append((cbin_fp, ch_fp)) | ||
|
||
return compressed_files | ||
|
||
def decompress(self): | ||
from mtscomp import decompress as mts_decompress | ||
|
||
ap_file = self.root_dir / (self.root_name + '.ap.bin') | ||
lfp_file = self.root_dir / (self.root_name + '.lf.bin') | ||
|
||
decompressed_files = [] | ||
for bin_fp, band_type in zip([ap_file, lfp_file], ['ap', 'lfp']): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. for bin_fp, band_type in ((ap_file, 'ap'), (lfp_file, 'lfp')): |
||
if bin_fp.exists(): | ||
logger.info(f'Decompressed file exists ({bin_fp}), skipping...') | ||
continue | ||
|
||
cbin_fp = bin_fp.parent / f'{bin_fp.stem}.cbin' | ||
ch_fp = bin_fp.parent / f'{bin_fp.stem}.ch' | ||
|
||
if not cbin_fp.exists(): | ||
raise FileNotFoundError(f'Decompression error - "{cbin_fp}" does not exist') | ||
|
||
try: | ||
decomp_arr = mts_decompress(cbin_fp, ch_fp) | ||
decomp_arr.tofile(bin_fp) | ||
except Exception as e: | ||
bin_fp.unlink(missing_ok=True) | ||
raise e | ||
else: | ||
decompressed_files.append(bin_fp) | ||
|
||
return decompressed_files | ||
|
||
|
||
class SpikeGLXMeta: | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
has been this shown to improve performance compared to normal garbage collection?