Skip to content

Some Problems about BrokenProcessPool in SpikeTutorials 0.99 #2310

Closed
@holawa

Description

@holawa

Dear developers !
I am using 0.99.1 SpikeInterface on windows10, my computer has 64GB RAM, and I meet some problems when I try to learn something from the tutorial. BrokenProcessPool seems to occur everywhere. Indeed I doubt it occurs when I try to use the Variable job_kwargs,wether in .save() , spykingcircus2 or tridesclous2
Actually, serveral weeks ago, I didn't meet BrokenProcessPool when I followed the tutorial. And I'm really need your help.

①In the [Official_Tutorial_SI_0.99_Nov23] Capter 3 Saving and loading SpikeInterface objects


if (base_folder / "preprocessed_compressed.zarr").is_dir():
    recording_saved = si.read_zarr(base_folder / "preprocessed_compressed.zarr")
else:
    import numcodecs
    compressor = numcodecs.Blosc(cname="zstd", clevel=9, shuffle=numcodecs.Blosc.BITSHUFFLE)
    recording_saved = recording_sub.save(format="zarr", folder=base_folder / "preprocessed_compressed.zarr",
                                         compressor=compressor,
                                         **job_kwargs)
When I run the code block above, it returns BrokenProcessPool information
Click to expand!

BrokenProcessPool                         Traceback (most recent call last)
Cell In[57], line 6
      4 import numcodecs
      5 compressor = numcodecs.Blosc(cname="zstd", clevel=9, shuffle=numcodecs.Blosc.BITSHUFFLE)
----> 6 recording_saved = recording_sub.save(format="zarr", folder=base_folder / "preprocessed_compressed.zarr",
      7                                      compressor=compressor,
      8                                      **job_kwargs)

File ~\anaconda3\Lib\site-packages\spikeinterface\core\base.py:777, in BaseExtractor.save(self, **kwargs)
775 loaded_extractor = self.save_to_memory(**kwargs)
776 elif format == "zarr":
--> 777 loaded_extractor = self.save_to_zarr(**kwargs)
778 else:
779 loaded_extractor = self.save_to_folder(**kwargs)

File ~\anaconda3\Lib\site-packages\spikeinterface\core\base.py:964, in BaseExtractor.save_to_zarr(self, name, folder, storage_options, channel_chunk_size, verbose, zarr_path, **save_kwargs)
962 save_kwargs["storage_options"] = storage_options
963 save_kwargs["channel_chunk_size"] = channel_chunk_size
--> 964 cached = self._save(verbose=verbose, **save_kwargs)
966 # save properties
967 prop_group = zarr_root.create_group("properties")

File ~\anaconda3\Lib\site-packages\spikeinterface\core\baserecording.py:514, in BaseRecording._save(self, format, **save_kwargs)
508 zarr_kwargs["compressor"] = compressor = get_default_zarr_compressor()
509 print(
510 f"Using default zarr compressor: {compressor}. To use a different compressor, use the "
511 f"'compressor' argument"
512 )
--> 514 write_traces_to_zarr(self, **zarr_kwargs, **job_kwargs)
516 # save probe
517 if self.get_property("contact_vector") is not None:

File ~\anaconda3\Lib\site-packages\spikeinterface\core\core_tools.py:709, in write_traces_to_zarr(recording, zarr_root, zarr_path, storage_options, dataset_paths, channel_chunk_size, dtype, compressor, filters, verbose, auto_cast_uint, **job_kwargs)
705 init_args = (recording, zarr_path, storage_options, dataset_paths, dtype, cast_unsigned)
706 executor = ChunkRecordingExecutor(
707 recording, func, init_func, init_args, verbose=verbose, job_name="write_zarr_recording", **job_kwargs
708 )
--> 709 executor.run()

File ~\anaconda3\Lib\site-packages\spikeinterface\core\job_tools.py:399, in ChunkRecordingExecutor.run(self)
396 if self.progress_bar:
397 results = tqdm(results, desc=self.job_name, total=len(all_chunks))
--> 399 for res in results:
400 if self.handle_returns:
401 returns.append(res)

File ~\anaconda3\Lib\site-packages\tqdm\notebook.py:254, in tqdm_notebook.iter(self)
252 try:
253 it = super(tqdm_notebook, self).iter()
--> 254 for obj in it:
255 # return super(tqdm...) will not catch exception
256 yield obj
257 # NB: except ... [ as ...] breaks IPython async KeyboardInterrupt

File ~\anaconda3\Lib\site-packages\tqdm\std.py:1178, in tqdm.iter(self)
1175 time = self._time
1177 try:
-> 1178 for obj in iterable:
1179 yield obj
1180 # Update and possibly print the progressbar.
1181 # Note: does not call self.update(1) for speed optimisation.

File ~\anaconda3\Lib\concurrent\futures\process.py:606, in _chain_from_iterable_of_lists(iterable)
600 def _chain_from_iterable_of_lists(iterable):
601 """
602 Specialized implementation of itertools.chain.from_iterable.
603 Each item in iterable should be a list. This function is
604 careful not to keep references to yielded objects.
605 """
--> 606 for element in iterable:
607 element.reverse()
608 while element:

File ~\anaconda3\Lib\concurrent\futures_base.py:619, in Executor.map..result_iterator()
616 while fs:
617 # Careful not to keep a reference to the popped future
618 if timeout is None:
--> 619 yield _result_or_cancel(fs.pop())
620 else:
621 yield _result_or_cancel(fs.pop(), end_time - time.monotonic())

File ~\anaconda3\Lib\concurrent\futures_base.py:317, in _result_or_cancel(failed resolving arguments)
315 try:
316 try:
--> 317 return fut.result(timeout)
318 finally:
319 fut.cancel()

File ~\anaconda3\Lib\concurrent\futures_base.py:456, in Future.result(self, timeout)
454 raise CancelledError()
455 elif self._state == FINISHED:
--> 456 return self.__get_result()
457 else:
458 raise TimeoutError()

File ~\anaconda3\Lib\concurrent\futures_base.py:401, in Future.__get_result(self)
399 if self._exception:
400 try:
--> 401 raise self._exception
402 finally:
403 # Break a reference cycle with the exception in self._exception
404 self = None

BrokenProcessPool: A process in the process pool was terminated abruptly while the future was running or pending.

②In the [Official_Tutorial_SI_0.99_Nov23] Capter 4 Spike sorting Run internal sorter : 'spykingcircus2'

Then when I tried to use installed sorter 'spykingcircus2' and 'tridesclous2' the BrokenProcessPool still occured.
I remained the set of tutorial:


n_cpus = os.cpu_count()
n_jobs = n_cpus - 4
job_kwargs = dict(n_jobs=n_jobs, chunk_duration="1s", progress_bar=True)

When I run the code below


sorting_SC2 = si.run_sorter('spykingcircus2', recording_saved, 
                            output_folder=base_folder / 'results_SC2',
                            verbose=True, job_kwargs=job_kwargs)

It returns

Click to expand!

detect peaks using locally_exclusive with n_jobs = 16 and chunk_size = 30000
detect peaks using locally_exclusive: 0%
0/300 [00:01 1 sorting_SC2 = si.run_sorter('spykingcircus2', recording_saved, 
      2                             output_folder=base_folder / 'results_SC2',
      3                             verbose=True, job_kwargs=job_kwargs)

File ~\anaconda3\Lib\site-packages\spikeinterface\sorters\runsorter.py:147, in run_sorter(sorter_name, recording, output_folder, remove_existing_folder, delete_output_folder, verbose, raise_error, docker_image, singularity_image, delete_container_files, with_output, **sorter_params)
140 container_image = singularity_image
141 return run_sorter_container(
142 container_image=container_image,
143 mode=mode,
144 **common_kwargs,
145 )
--> 147 return run_sorter_local(**common_kwargs)

File ~\anaconda3\Lib\site-packages\spikeinterface\sorters\runsorter.py:173, in run_sorter_local(sorter_name, recording, output_folder, remove_existing_folder, delete_output_folder, verbose, raise_error, with_output, **sorter_params)
171 SorterClass.set_params_to_folder(recording, output_folder, sorter_params, verbose)
172 SorterClass.setup_recording(recording, output_folder, verbose=verbose)
--> 173 SorterClass.run_from_folder(output_folder, raise_error, verbose)
174 if with_output:
175 sorting = SorterClass.get_result_from_folder(output_folder)

File ~\anaconda3\Lib\site-packages\spikeinterface\sorters\basesorter.py:289, in BaseSorter.run_from_folder(cls, output_folder, raise_error, verbose)
286 print(f"{sorter_name} run time {run_time:0.2f}s")
288 if has_error and raise_error:
--> 289 raise SpikeSortingError(
290 f"Spike sorting error trace:\n{log['error_trace']}\n"
291 f"Spike sorting failed. You can inspect the runtime trace in {output_folder}/spikeinterface_log.json."
292 )
294 return run_time

SpikeSortingError: Spike sorting error trace:
Traceback (most recent call last):
File "C:\Users\Melody\anaconda3\Lib\site-packages\spikeinterface\sorters\basesorter.py", line 254, in run_from_folder
SorterClass._run_from_folder(sorter_output_folder, sorter_params, verbose)
File "C:\Users\Melody\anaconda3\Lib\site-packages\spikeinterface\sorters\internal\spyking_circus2.py", line 80, in _run_from_folder
peaks = detect_peaks(recording_f, method="locally_exclusive", **detection_params)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\Melody\anaconda3\Lib\site-packages\spikeinterface\sortingcomponents\peak_detection.py", line 117, in detect_peaks
outs = run_node_pipeline(
^^^^^^^^^^^^^^^^^^
File "C:\Users\Melody\anaconda3\Lib\site-packages\spikeinterface\core\node_pipeline.py", line 469, in run_node_pipeline
processor.run()
File "C:\Users\Melody\anaconda3\Lib\site-packages\spikeinterface\core\job_tools.py", line 399, in run
for res in results:
File "C:\Users\Melody\anaconda3\Lib\site-packages\tqdm\notebook.py", line 254, in iter
for obj in it:
File "C:\Users\Melody\anaconda3\Lib\site-packages\tqdm\std.py", line 1178, in iter
for obj in iterable:
File "C:\Users\Melody\anaconda3\Lib\concurrent\futures\process.py", line 606, in _chain_from_iterable_of_lists
for element in iterable:
File "C:\Users\Melody\anaconda3\Lib\concurrent\futures_base.py", line 619, in result_iterator
yield _result_or_cancel(fs.pop())
^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\Melody\anaconda3\Lib\concurrent\futures_base.py", line 317, in _result_or_cancel
return fut.result(timeout)
^^^^^^^^^^^^^^^^^^^
File "C:\Users\Melody\anaconda3\Lib\concurrent\futures_base.py", line 456, in result
return self.__get_result()
^^^^^^^^^^^^^^^^^^^
File "C:\Users\Melody\anaconda3\Lib\concurrent\futures_base.py", line 401, in __get_result
raise self._exception
concurrent.futures.process.BrokenProcessPool: A process in the process pool was terminated abruptly while the future was running or pending.

Spike sorting failed. You can inspect the runtime trace in C:\Users\Melody\Desktop\test1\results_SC2/spikeinterface_log.json.

③Then I tried the internal sorter 'tridesclous2'

It all remained, I just modify my imput into


sorting_TC2 = si.run_sorter('tridesclous2', recording_saved, 
                            output_folder=base_folder / 'results_TC22',
                            verbose=True, job_kwargs=job_kwargs)

Then I know something maybe wrong, I got such results:

Click to expand!

detect peaks using locally_exclusive: 0%
0/300 [00:01 1 sorting_TC2 = si.run_sorter('tridesclous2', recording_saved, 
      2                             output_folder=base_folder / 'results_TC22',
      3                             verbose=True, job_kwargs=job_kwargs)

File ~\anaconda3\Lib\site-packages\spikeinterface\sorters\runsorter.py:147, in run_sorter(sorter_name, recording, output_folder, remove_existing_folder, delete_output_folder, verbose, raise_error, docker_image, singularity_image, delete_container_files, with_output, **sorter_params)
140 container_image = singularity_image
141 return run_sorter_container(
142 container_image=container_image,
143 mode=mode,
144 **common_kwargs,
145 )
--> 147 return run_sorter_local(**common_kwargs)

File ~\anaconda3\Lib\site-packages\spikeinterface\sorters\runsorter.py:173, in run_sorter_local(sorter_name, recording, output_folder, remove_existing_folder, delete_output_folder, verbose, raise_error, with_output, **sorter_params)
171 SorterClass.set_params_to_folder(recording, output_folder, sorter_params, verbose)
172 SorterClass.setup_recording(recording, output_folder, verbose=verbose)
--> 173 SorterClass.run_from_folder(output_folder, raise_error, verbose)
174 if with_output:
175 sorting = SorterClass.get_result_from_folder(output_folder)

File ~\anaconda3\Lib\site-packages\spikeinterface\sorters\basesorter.py:289, in BaseSorter.run_from_folder(cls, output_folder, raise_error, verbose)
286 print(f"{sorter_name} run time {run_time:0.2f}s")
288 if has_error and raise_error:
--> 289 raise SpikeSortingError(
290 f"Spike sorting error trace:\n{log['error_trace']}\n"
291 f"Spike sorting failed. You can inspect the runtime trace in {output_folder}/spikeinterface_log.json."
292 )
294 return run_time

SpikeSortingError: Spike sorting error trace:
Traceback (most recent call last):
File "C:\Users\Melody\anaconda3\Lib\site-packages\spikeinterface\sorters\basesorter.py", line 254, in run_from_folder
SorterClass._run_from_folder(sorter_output_folder, sorter_params, verbose)
File "C:\Users\Melody\anaconda3\Lib\site-packages\spikeinterface\sorters\internal\tridesclous2.py", line 102, in _run_from_folder
all_peaks = detect_peaks(recording, method="locally_exclusive", **detection_params, **job_kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\Melody\anaconda3\Lib\site-packages\spikeinterface\sortingcomponents\peak_detection.py", line 117, in detect_peaks
outs = run_node_pipeline(
^^^^^^^^^^^^^^^^^^
File "C:\Users\Melody\anaconda3\Lib\site-packages\spikeinterface\core\node_pipeline.py", line 469, in run_node_pipeline
processor.run()
File "C:\Users\Melody\anaconda3\Lib\site-packages\spikeinterface\core\job_tools.py", line 399, in run
for res in results:
File "C:\Users\Melody\anaconda3\Lib\site-packages\tqdm\notebook.py", line 254, in iter
for obj in it:
File "C:\Users\Melody\anaconda3\Lib\site-packages\tqdm\std.py", line 1178, in iter
for obj in iterable:
File "C:\Users\Melody\anaconda3\Lib\concurrent\futures\process.py", line 606, in _chain_from_iterable_of_lists
for element in iterable:
File "C:\Users\Melody\anaconda3\Lib\concurrent\futures_base.py", line 619, in result_iterator
yield _result_or_cancel(fs.pop())
^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\Melody\anaconda3\Lib\concurrent\futures_base.py", line 317, in _result_or_cancel
return fut.result(timeout)
^^^^^^^^^^^^^^^^^^^
File "C:\Users\Melody\anaconda3\Lib\concurrent\futures_base.py", line 456, in result
return self.__get_result()
^^^^^^^^^^^^^^^^^^^
File "C:\Users\Melody\anaconda3\Lib\concurrent\futures_base.py", line 401, in __get_result
raise self._exception
concurrent.futures.process.BrokenProcessPool: A process in the process pool was terminated abruptly while the future was running or pending.

Spike sorting failed. You can inspect the runtime trace in C:\Users\Melody\Desktop\test1\results_TC22/spikeinterface_log.json.

If I run the sorters in Docker, such as kilosort2, mountainsort4, tridecircus, ironclust, etc. They're all good, except when I run a huge(maybe) data in spykingsort image(the tutorial data is OK with no error), I meet the same problem with #895.However,I feel sorry that I am just a new learner , I can't manage to solve the problem by setting the parameters (just because I don't know where to set , use whith function to set).

Metadata

Metadata

Assignees

No one assigned

    Labels

    concurrencyRelated to parallel processingquestionGeneral question regarding SI

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions