Description
I am trying to split a simple workflow consisting of 3 sequential steps (bias correction, registration and resampling) on multiple T1w images in a BIDS dataset. The code is provided below and consists of 3 ANTs tasks with a custom reader
task attached upfront.
The workflow fetches the files fine, processes the bias correction step in parallel but then fails with a rather generic AttributeError: 'Input' object has no attribute 'files_hash'
. The code listing and full traceback are available below.
It is worth noting that the workflow runs fine if I provide a reader fetching a single file (read_one_bidsfile
) and remove the split and combine steps. It also works if I provide the list of files manually to the workflow and don't use a reader task.
Code:
@task
@annotate({"return": {"T1w": list[str]}})
def read_bids_dataset(dataset_path: PathLike):
"""Read all T1w files from a BIDS dataset."""
return sorted(Path(dataset_path).rglob("*_T1w.nii.gz"))
@task
@annotate({"return": {"T1w": str}})
def read_one_bidsfile(dataset_path: PathLike):
"""Read one T1w file from a BIDS dataset."""
return next(Path(dataset_path).rglob("*_T1w.nii.gz"))
def t1_linear(**kwargs) -> Workflow:
from pydra.tasks import ants
workflow = Workflow(name=name, input_spec=["dataset_path", "mni_template", "ref_template"], **kwargs)
# workflow.add(read_one_bidsfile(name="reader", dataset_path=workflow.lzin.dataset_path))
workflow.add(read_bids_dataset(name="reader", dataset_path=workflow.lzin.dataset_path))
workflow.add(
ants.N4BiasFieldCorrection(name="bias_correction", input_image=workflow.reader.lzout.T1w))
workflow.bias_correction.split("input_image")
workflow.add(
ants.RegistrationSyNQuick(
name="registration",
dimensionality=3,
transform_type="a",
moving_image=workflow.bias_correction.lzout.output_image,
fixed_image=workflow.lzin.mni_template,
)
)
workflow.add(
ants.ApplyTransforms(
name="transform",
dimensionality=3,
moving_image=workflow.registration.lzout.warped_moving_image,
fixed_image=workflow.lzin.ref_template,
output_datatype="short",
)
)
workflow.combine("bias_correction.input_image")
workflow.set_output({
"output_image": workflow.transform.lzout.output_image,
"output_transform": workflow.registration.lzout.affine_transform,
})
return workflow
Traceback:
Traceback (most recent call last):
File "/Users/ghislain.vaillant/Projects/pydra-ants/test.py", line 238, in <module>
sub(wf)
File "/Users/ghislain.vaillant/Projects/pydra-ants/.hatch/pydra-ants/lib/python3.11/site-packages/pydra/engine/submitter.py", line 42, in __call__
self.loop.run_until_complete(self.submit_from_call(runnable, rerun))
File "/Users/ghislain.vaillant/.asdf/installs/python/3.11.3/lib/python3.11/asyncio/base_events.py", line 653, in run_until_complete
return future.result()
^^^^^^^^^^^^^^^
File "/Users/ghislain.vaillant/Projects/pydra-ants/.hatch/pydra-ants/lib/python3.11/site-packages/pydra/engine/submitter.py", line 68, in submit_from_call
await runnable._run(self, rerun=rerun)
File "/Users/ghislain.vaillant/Projects/pydra-ants/.hatch/pydra-ants/lib/python3.11/site-packages/pydra/engine/core.py", line 1124, in _run
await self._run_task(submitter, rerun=rerun)
File "/Users/ghislain.vaillant/Projects/pydra-ants/.hatch/pydra-ants/lib/python3.11/site-packages/pydra/engine/core.py", line 1152, in _run_task
await submitter.expand_workflow(self, rerun=rerun)
File "/Users/ghislain.vaillant/Projects/pydra-ants/.hatch/pydra-ants/lib/python3.11/site-packages/pydra/engine/submitter.py", line 195, in expand_workflow
tasks, follow_err = get_runnable_tasks(graph_copy)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/ghislain.vaillant/Projects/pydra-ants/.hatch/pydra-ants/lib/python3.11/site-packages/pydra/engine/submitter.py", line 236, in get_runnable_tasks
_is_runnable = is_runnable(graph, tsk)
^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/ghislain.vaillant/Projects/pydra-ants/.hatch/pydra-ants/lib/python3.11/site-packages/pydra/engine/submitter.py", line 266, in is_runnable
is_done = pred.done
^^^^^^^^^
File "/Users/ghislain.vaillant/Projects/pydra-ants/.hatch/pydra-ants/lib/python3.11/site-packages/pydra/engine/core.py", line 229, in __getattr__
return self.__getattribute__(name)
^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/ghislain.vaillant/Projects/pydra-ants/.hatch/pydra-ants/lib/python3.11/site-packages/pydra/engine/core.py", line 667, in done
_result = self.result()
^^^^^^^^^^^^^
File "/Users/ghislain.vaillant/Projects/pydra-ants/.hatch/pydra-ants/lib/python3.11/site-packages/pydra/engine/core.py", line 746, in result
checksum = self.checksum_states(state_index=ind)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/ghislain.vaillant/Projects/pydra-ants/.hatch/pydra-ants/lib/python3.11/site-packages/pydra/engine/core.py", line 285, in checksum_states
setattr(inputs_copy, key.split(".")[1], val)
File "/Users/ghislain.vaillant/Projects/pydra-ants/.hatch/pydra-ants/lib/python3.11/site-packages/pydra/engine/specs.py", line 105, in __setattr__
self.files_hash[name] = {}
^^^^^^^^^^^^^^^
AttributeError: 'Input' object has no attribute 'files_hash'
Task was destroyed but it is pending!
task: <Task pending name='Task-4' coro=<ConcurrentFuturesWorker.exec_as_coro() running at /Users/ghislain.vaillant/Projects/pydra-ants/.hatch/pydra-ants/lib/python3.11/site-packages/pydra/engine/workers.py:175> wait_for=<Future pending cb=[_chain_future.<locals>._call_check_cancel() at /Users/ghislain.vaillant/.asdf/installs/python/3.11.3/lib/python3.11/asyncio/futures.py:387, Task.task_wakeup()]>>
Task was destroyed but it is pending!
task: <Task pending name='Task-5' coro=<ConcurrentFuturesWorker.exec_as_coro() running at /Users/ghislain.vaillant/Projects/pydra-ants/.hatch/pydra-ants/lib/python3.11/site-packages/pydra/engine/workers.py:175> wait_for=<Future pending cb=[_chain_future.<locals>._call_check_cancel() at /Users/ghislain.vaillant/.asdf/installs/python/3.11.3/lib/python3.11/asyncio/futures.py:387, Task.task_wakeup()]>>
Task was destroyed but it is pending!
task: <Task pending name='Task-6' coro=<ConcurrentFuturesWorker.exec_as_coro() done, defined at /Users/ghislain.vaillant/Projects/pydra-ants/.hatch/pydra-ants/lib/python3.11/site-packages/pydra/engine/workers.py:169> wait_for=<Future pending cb=[_chain_future.<locals>._call_check_cancel() at /Users/ghislain.vaillant/.asdf/installs/python/3.11.3/lib/python3.11/asyncio/futures.py:387, Task.task_wakeup()]>>