Description
Dear developers !
I am using 0.99.1 SpikeInterface on windows10, my computer has 64GB RAM, and I meet some problems when I try to learn something from the tutorial. BrokenProcessPool seems to occur everywhere. Indeed I doubt it occurs when I try to use the Variable job_kwargs,wether in .save() , spykingcircus2 or tridesclous2
Actually, serveral weeks ago, I didn't meet BrokenProcessPool when I followed the tutorial. And I'm really need your help.
①In the [Official_Tutorial_SI_0.99_Nov23] Capter 3 Saving and loading SpikeInterface objects
if (base_folder / "preprocessed_compressed.zarr").is_dir():
recording_saved = si.read_zarr(base_folder / "preprocessed_compressed.zarr")
else:
import numcodecs
compressor = numcodecs.Blosc(cname="zstd", clevel=9, shuffle=numcodecs.Blosc.BITSHUFFLE)
recording_saved = recording_sub.save(format="zarr", folder=base_folder / "preprocessed_compressed.zarr",
compressor=compressor,
**job_kwargs)
Click to expand!
BrokenProcessPool Traceback (most recent call last) Cell In[57], line 6 4 import numcodecs 5 compressor = numcodecs.Blosc(cname="zstd", clevel=9, shuffle=numcodecs.Blosc.BITSHUFFLE) ----> 6 recording_saved = recording_sub.save(format="zarr", folder=base_folder / "preprocessed_compressed.zarr", 7 compressor=compressor, 8 **job_kwargs)
File ~\anaconda3\Lib\site-packages\spikeinterface\core\base.py:777, in BaseExtractor.save(self, **kwargs)
775 loaded_extractor = self.save_to_memory(**kwargs)
776 elif format == "zarr":
--> 777 loaded_extractor = self.save_to_zarr(**kwargs)
778 else:
779 loaded_extractor = self.save_to_folder(**kwargs)File ~\anaconda3\Lib\site-packages\spikeinterface\core\base.py:964, in BaseExtractor.save_to_zarr(self, name, folder, storage_options, channel_chunk_size, verbose, zarr_path, **save_kwargs)
962 save_kwargs["storage_options"] = storage_options
963 save_kwargs["channel_chunk_size"] = channel_chunk_size
--> 964 cached = self._save(verbose=verbose, **save_kwargs)
966 # save properties
967 prop_group = zarr_root.create_group("properties")File ~\anaconda3\Lib\site-packages\spikeinterface\core\baserecording.py:514, in BaseRecording._save(self, format, **save_kwargs)
508 zarr_kwargs["compressor"] = compressor = get_default_zarr_compressor()
509 print(
510 f"Using default zarr compressor: {compressor}. To use a different compressor, use the "
511 f"'compressor' argument"
512 )
--> 514 write_traces_to_zarr(self, **zarr_kwargs, **job_kwargs)
516 # save probe
517 if self.get_property("contact_vector") is not None:File ~\anaconda3\Lib\site-packages\spikeinterface\core\core_tools.py:709, in write_traces_to_zarr(recording, zarr_root, zarr_path, storage_options, dataset_paths, channel_chunk_size, dtype, compressor, filters, verbose, auto_cast_uint, **job_kwargs)
705 init_args = (recording, zarr_path, storage_options, dataset_paths, dtype, cast_unsigned)
706 executor = ChunkRecordingExecutor(
707 recording, func, init_func, init_args, verbose=verbose, job_name="write_zarr_recording", **job_kwargs
708 )
--> 709 executor.run()File ~\anaconda3\Lib\site-packages\spikeinterface\core\job_tools.py:399, in ChunkRecordingExecutor.run(self)
396 if self.progress_bar:
397 results = tqdm(results, desc=self.job_name, total=len(all_chunks))
--> 399 for res in results:
400 if self.handle_returns:
401 returns.append(res)File ~\anaconda3\Lib\site-packages\tqdm\notebook.py:254, in tqdm_notebook.iter(self)
252 try:
253 it = super(tqdm_notebook, self).iter()
--> 254 for obj in it:
255 # return super(tqdm...) will not catch exception
256 yield obj
257 # NB: except ... [ as ...] breaks IPython async KeyboardInterruptFile ~\anaconda3\Lib\site-packages\tqdm\std.py:1178, in tqdm.iter(self)
1175 time = self._time
1177 try:
-> 1178 for obj in iterable:
1179 yield obj
1180 # Update and possibly print the progressbar.
1181 # Note: does not call self.update(1) for speed optimisation.File ~\anaconda3\Lib\concurrent\futures\process.py:606, in _chain_from_iterable_of_lists(iterable)
600 def _chain_from_iterable_of_lists(iterable):
601 """
602 Specialized implementation of itertools.chain.from_iterable.
603 Each item in iterable should be a list. This function is
604 careful not to keep references to yielded objects.
605 """
--> 606 for element in iterable:
607 element.reverse()
608 while element:File ~\anaconda3\Lib\concurrent\futures_base.py:619, in Executor.map..result_iterator()
616 while fs:
617 # Careful not to keep a reference to the popped future
618 if timeout is None:
--> 619 yield _result_or_cancel(fs.pop())
620 else:
621 yield _result_or_cancel(fs.pop(), end_time - time.monotonic())File ~\anaconda3\Lib\concurrent\futures_base.py:317, in _result_or_cancel(failed resolving arguments)
315 try:
316 try:
--> 317 return fut.result(timeout)
318 finally:
319 fut.cancel()File ~\anaconda3\Lib\concurrent\futures_base.py:456, in Future.result(self, timeout)
454 raise CancelledError()
455 elif self._state == FINISHED:
--> 456 return self.__get_result()
457 else:
458 raise TimeoutError()File ~\anaconda3\Lib\concurrent\futures_base.py:401, in Future.__get_result(self)
399 if self._exception:
400 try:
--> 401 raise self._exception
402 finally:
403 # Break a reference cycle with the exception in self._exception
404 self = None
BrokenProcessPool: A process in the process pool was terminated abruptly while the future was running or pending.
②In the [Official_Tutorial_SI_0.99_Nov23] Capter 4 Spike sorting Run internal sorter : 'spykingcircus2'
Then when I tried to use installed sorter 'spykingcircus2' and 'tridesclous2' the BrokenProcessPool still occured.
I remained the set of tutorial:
n_cpus = os.cpu_count()
n_jobs = n_cpus - 4
job_kwargs = dict(n_jobs=n_jobs, chunk_duration="1s", progress_bar=True)
When I run the code below
sorting_SC2 = si.run_sorter('spykingcircus2', recording_saved,
output_folder=base_folder / 'results_SC2',
verbose=True, job_kwargs=job_kwargs)
It returns
Click to expand!
detect peaks using locally_exclusive with n_jobs = 16 and chunk_size = 30000 detect peaks using locally_exclusive: 0% 0/300 [00:01 1 sorting_SC2 = si.run_sorter('spykingcircus2', recording_saved, 2 output_folder=base_folder / 'results_SC2', 3 verbose=True, job_kwargs=job_kwargs)
File ~\anaconda3\Lib\site-packages\spikeinterface\sorters\runsorter.py:147, in run_sorter(sorter_name, recording, output_folder, remove_existing_folder, delete_output_folder, verbose, raise_error, docker_image, singularity_image, delete_container_files, with_output, **sorter_params)
140 container_image = singularity_image
141 return run_sorter_container(
142 container_image=container_image,
143 mode=mode,
144 **common_kwargs,
145 )
--> 147 return run_sorter_local(**common_kwargs)File ~\anaconda3\Lib\site-packages\spikeinterface\sorters\runsorter.py:173, in run_sorter_local(sorter_name, recording, output_folder, remove_existing_folder, delete_output_folder, verbose, raise_error, with_output, **sorter_params)
171 SorterClass.set_params_to_folder(recording, output_folder, sorter_params, verbose)
172 SorterClass.setup_recording(recording, output_folder, verbose=verbose)
--> 173 SorterClass.run_from_folder(output_folder, raise_error, verbose)
174 if with_output:
175 sorting = SorterClass.get_result_from_folder(output_folder)File ~\anaconda3\Lib\site-packages\spikeinterface\sorters\basesorter.py:289, in BaseSorter.run_from_folder(cls, output_folder, raise_error, verbose)
286 print(f"{sorter_name} run time {run_time:0.2f}s")
288 if has_error and raise_error:
--> 289 raise SpikeSortingError(
290 f"Spike sorting error trace:\n{log['error_trace']}\n"
291 f"Spike sorting failed. You can inspect the runtime trace in {output_folder}/spikeinterface_log.json."
292 )
294 return run_timeSpikeSortingError: Spike sorting error trace:
Traceback (most recent call last):
File "C:\Users\Melody\anaconda3\Lib\site-packages\spikeinterface\sorters\basesorter.py", line 254, in run_from_folder
SorterClass._run_from_folder(sorter_output_folder, sorter_params, verbose)
File "C:\Users\Melody\anaconda3\Lib\site-packages\spikeinterface\sorters\internal\spyking_circus2.py", line 80, in _run_from_folder
peaks = detect_peaks(recording_f, method="locally_exclusive", **detection_params)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\Melody\anaconda3\Lib\site-packages\spikeinterface\sortingcomponents\peak_detection.py", line 117, in detect_peaks
outs = run_node_pipeline(
^^^^^^^^^^^^^^^^^^
File "C:\Users\Melody\anaconda3\Lib\site-packages\spikeinterface\core\node_pipeline.py", line 469, in run_node_pipeline
processor.run()
File "C:\Users\Melody\anaconda3\Lib\site-packages\spikeinterface\core\job_tools.py", line 399, in run
for res in results:
File "C:\Users\Melody\anaconda3\Lib\site-packages\tqdm\notebook.py", line 254, in iter
for obj in it:
File "C:\Users\Melody\anaconda3\Lib\site-packages\tqdm\std.py", line 1178, in iter
for obj in iterable:
File "C:\Users\Melody\anaconda3\Lib\concurrent\futures\process.py", line 606, in _chain_from_iterable_of_lists
for element in iterable:
File "C:\Users\Melody\anaconda3\Lib\concurrent\futures_base.py", line 619, in result_iterator
yield _result_or_cancel(fs.pop())
^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\Melody\anaconda3\Lib\concurrent\futures_base.py", line 317, in _result_or_cancel
return fut.result(timeout)
^^^^^^^^^^^^^^^^^^^
File "C:\Users\Melody\anaconda3\Lib\concurrent\futures_base.py", line 456, in result
return self.__get_result()
^^^^^^^^^^^^^^^^^^^
File "C:\Users\Melody\anaconda3\Lib\concurrent\futures_base.py", line 401, in __get_result
raise self._exception
concurrent.futures.process.BrokenProcessPool: A process in the process pool was terminated abruptly while the future was running or pending.
Spike sorting failed. You can inspect the runtime trace in C:\Users\Melody\Desktop\test1\results_SC2/spikeinterface_log.json.
③Then I tried the internal sorter 'tridesclous2'
It all remained, I just modify my imput into
sorting_TC2 = si.run_sorter('tridesclous2', recording_saved,
output_folder=base_folder / 'results_TC22',
verbose=True, job_kwargs=job_kwargs)
Then I know something maybe wrong, I got such results:
Click to expand!
detect peaks using locally_exclusive: 0% 0/300 [00:01 1 sorting_TC2 = si.run_sorter('tridesclous2', recording_saved, 2 output_folder=base_folder / 'results_TC22', 3 verbose=True, job_kwargs=job_kwargs)
File ~\anaconda3\Lib\site-packages\spikeinterface\sorters\runsorter.py:147, in run_sorter(sorter_name, recording, output_folder, remove_existing_folder, delete_output_folder, verbose, raise_error, docker_image, singularity_image, delete_container_files, with_output, **sorter_params)
140 container_image = singularity_image
141 return run_sorter_container(
142 container_image=container_image,
143 mode=mode,
144 **common_kwargs,
145 )
--> 147 return run_sorter_local(**common_kwargs)File ~\anaconda3\Lib\site-packages\spikeinterface\sorters\runsorter.py:173, in run_sorter_local(sorter_name, recording, output_folder, remove_existing_folder, delete_output_folder, verbose, raise_error, with_output, **sorter_params)
171 SorterClass.set_params_to_folder(recording, output_folder, sorter_params, verbose)
172 SorterClass.setup_recording(recording, output_folder, verbose=verbose)
--> 173 SorterClass.run_from_folder(output_folder, raise_error, verbose)
174 if with_output:
175 sorting = SorterClass.get_result_from_folder(output_folder)File ~\anaconda3\Lib\site-packages\spikeinterface\sorters\basesorter.py:289, in BaseSorter.run_from_folder(cls, output_folder, raise_error, verbose)
286 print(f"{sorter_name} run time {run_time:0.2f}s")
288 if has_error and raise_error:
--> 289 raise SpikeSortingError(
290 f"Spike sorting error trace:\n{log['error_trace']}\n"
291 f"Spike sorting failed. You can inspect the runtime trace in {output_folder}/spikeinterface_log.json."
292 )
294 return run_timeSpikeSortingError: Spike sorting error trace:
Traceback (most recent call last):
File "C:\Users\Melody\anaconda3\Lib\site-packages\spikeinterface\sorters\basesorter.py", line 254, in run_from_folder
SorterClass._run_from_folder(sorter_output_folder, sorter_params, verbose)
File "C:\Users\Melody\anaconda3\Lib\site-packages\spikeinterface\sorters\internal\tridesclous2.py", line 102, in _run_from_folder
all_peaks = detect_peaks(recording, method="locally_exclusive", **detection_params, **job_kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\Melody\anaconda3\Lib\site-packages\spikeinterface\sortingcomponents\peak_detection.py", line 117, in detect_peaks
outs = run_node_pipeline(
^^^^^^^^^^^^^^^^^^
File "C:\Users\Melody\anaconda3\Lib\site-packages\spikeinterface\core\node_pipeline.py", line 469, in run_node_pipeline
processor.run()
File "C:\Users\Melody\anaconda3\Lib\site-packages\spikeinterface\core\job_tools.py", line 399, in run
for res in results:
File "C:\Users\Melody\anaconda3\Lib\site-packages\tqdm\notebook.py", line 254, in iter
for obj in it:
File "C:\Users\Melody\anaconda3\Lib\site-packages\tqdm\std.py", line 1178, in iter
for obj in iterable:
File "C:\Users\Melody\anaconda3\Lib\concurrent\futures\process.py", line 606, in _chain_from_iterable_of_lists
for element in iterable:
File "C:\Users\Melody\anaconda3\Lib\concurrent\futures_base.py", line 619, in result_iterator
yield _result_or_cancel(fs.pop())
^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\Melody\anaconda3\Lib\concurrent\futures_base.py", line 317, in _result_or_cancel
return fut.result(timeout)
^^^^^^^^^^^^^^^^^^^
File "C:\Users\Melody\anaconda3\Lib\concurrent\futures_base.py", line 456, in result
return self.__get_result()
^^^^^^^^^^^^^^^^^^^
File "C:\Users\Melody\anaconda3\Lib\concurrent\futures_base.py", line 401, in __get_result
raise self._exception
concurrent.futures.process.BrokenProcessPool: A process in the process pool was terminated abruptly while the future was running or pending.
Spike sorting failed. You can inspect the runtime trace in C:\Users\Melody\Desktop\test1\results_TC22/spikeinterface_log.json.