Skip to content

Commit 7f95d56

Browse files
authored
Merge branch 'master' into pre-commit-ci-update-config
2 parents 3c4ec22 + 95f94d5 commit 7f95d56

File tree

15 files changed

+958
-1501
lines changed

15 files changed

+958
-1501
lines changed

.github/workflows/testpydra.yml

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ jobs:
3131
- run: pip install --upgrade build twine
3232
- run: python -m build
3333
- run: twine check dist/*
34-
- uses: actions/upload-artifact@v3
34+
- uses: actions/upload-artifact@v4
3535
with:
3636
name: dist
3737
path: dist/
@@ -40,7 +40,7 @@ jobs:
4040
git clean -fxd
4141
mkdir archive
4242
git archive -o archive/pydra.zip HEAD
43-
- uses: actions/upload-artifact@v3
43+
- uses: actions/upload-artifact@v4
4444
with:
4545
name: archive
4646
path: archive/
@@ -68,13 +68,13 @@ jobs:
6868

6969
steps:
7070
- name: Fetch sdist/wheel
71-
uses: actions/download-artifact@v3
71+
uses: actions/download-artifact@v4
7272
if: matrix.install == 'sdist' || matrix.install == 'wheel'
7373
with:
7474
name: dist
7575
path: dist/
7676
- name: Fetch git archive
77-
uses: actions/download-artifact@v3
77+
uses: actions/download-artifact@v4
7878
if: matrix.install == 'archive'
7979
with:
8080
name: archive

pydra/__init__.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,13 @@
1616
import attr
1717

1818
from . import mark
19-
from .engine import AuditFlag, DockerTask, ShellCommandTask, Submitter, Workflow, specs
19+
from .engine import AuditFlag, ShellCommandTask, Submitter, Workflow, specs
2020

2121
__all__ = (
2222
"Submitter",
2323
"Workflow",
2424
"AuditFlag",
2525
"ShellCommandTask",
26-
"DockerTask",
2726
"specs",
2827
"mark",
2928
)

pydra/engine/__init__.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,11 @@
22

33
from .submitter import Submitter
44
from .core import Workflow
5-
from .task import AuditFlag, ShellCommandTask, DockerTask
5+
from .task import AuditFlag, ShellCommandTask
66
from . import specs
77

88
__all__ = [
99
"AuditFlag",
10-
"DockerTask",
1110
"ShellCommandTask",
1211
"Submitter",
1312
"Workflow",

pydra/engine/core.py

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -430,7 +430,13 @@ def cont_dim(self, cont_dim):
430430
self._cont_dim = cont_dim
431431

432432
def __call__(
433-
self, submitter=None, plugin=None, plugin_kwargs=None, rerun=False, **kwargs
433+
self,
434+
submitter=None,
435+
plugin=None,
436+
plugin_kwargs=None,
437+
rerun=False,
438+
environment=None,
439+
**kwargs,
434440
):
435441
"""Make tasks callable themselves."""
436442
from .submitter import Submitter
@@ -450,9 +456,9 @@ def __call__(
450456
if submitter:
451457
with submitter as sub:
452458
self.inputs = attr.evolve(self.inputs, **kwargs)
453-
res = sub(self)
459+
res = sub(self, environment=environment)
454460
else: # tasks without state could be run without a submitter
455-
res = self._run(rerun=rerun, **kwargs)
461+
res = self._run(rerun=rerun, environment=environment, **kwargs)
456462
return res
457463

458464
def _modify_inputs(self):
@@ -502,7 +508,7 @@ def _populate_filesystem(self, checksum, output_dir):
502508
shutil.rmtree(output_dir)
503509
output_dir.mkdir(parents=False, exist_ok=self.can_resume)
504510

505-
def _run(self, rerun=False, **kwargs):
511+
def _run(self, rerun=False, environment=None, **kwargs):
506512
self.inputs = attr.evolve(self.inputs, **kwargs)
507513
self.inputs.check_fields_input_spec()
508514

@@ -519,6 +525,7 @@ def _run(self, rerun=False, **kwargs):
519525
return result
520526
cwd = os.getcwd()
521527
self._populate_filesystem(checksum, output_dir)
528+
os.chdir(output_dir)
522529
orig_inputs = self._modify_inputs()
523530
result = Result(output=None, runtime=None, errored=False)
524531
self.hooks.pre_run_task(self)
@@ -527,7 +534,7 @@ def _run(self, rerun=False, **kwargs):
527534
self.audit.audit_task(task=self)
528535
try:
529536
self.audit.monitor()
530-
self._run_task()
537+
self._run_task(environment=environment)
531538
result.output = self._collect_outputs(output_dir=output_dir)
532539
except Exception:
533540
etype, eval, etr = sys.exc_info()
@@ -539,7 +546,6 @@ def _run(self, rerun=False, **kwargs):
539546
self.hooks.post_run_task(self, result)
540547
self.audit.finalize_audit(result)
541548
save(output_dir, result=result, task=self)
542-
self.output_ = None
543549
# removing the additional file with the chcksum
544550
(self.cache_dir / f"{self.uid}_info.json").unlink()
545551
# # function etc. shouldn't change anyway, so removing
@@ -552,15 +558,14 @@ def _run(self, rerun=False, **kwargs):
552558
return result
553559

554560
def _collect_outputs(self, output_dir):
555-
run_output = self.output_
556561
output_klass = make_klass(self.output_spec)
557562
output = output_klass(
558563
**{f.name: attr.NOTHING for f in attr.fields(output_klass)}
559564
)
560565
other_output = output.collect_additional_outputs(
561-
self.inputs, output_dir, run_output
566+
self.inputs, output_dir, self.output_
562567
)
563-
return attr.evolve(output, **run_output, **other_output)
568+
return attr.evolve(output, **self.output_, **other_output)
564569

565570
def split(
566571
self,

pydra/engine/environments.py

Lines changed: 157 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,157 @@
1+
from .helpers import execute
2+
3+
from pathlib import Path
4+
5+
6+
class Environment:
7+
"""
8+
Base class for environments that are used to execute tasks.
9+
Right now it is asssumed that the environment, including container images,
10+
are available and are not removed at the end
11+
TODO: add setup and teardown methods
12+
"""
13+
14+
def setup(self):
15+
pass
16+
17+
def execute(self, task):
18+
"""
19+
Execute the task in the environment.
20+
21+
Parameters
22+
----------
23+
task : TaskBase
24+
the task to execute
25+
26+
Returns
27+
-------
28+
output
29+
Output of the task.
30+
"""
31+
raise NotImplementedError
32+
33+
def teardown(self):
34+
pass
35+
36+
37+
class Native(Environment):
38+
"""
39+
Native environment, i.e. the tasks are executed in the current python environment.
40+
"""
41+
42+
def execute(self, task):
43+
keys = ["return_code", "stdout", "stderr"]
44+
values = execute(task.command_args(), strip=task.strip)
45+
output = dict(zip(keys, values))
46+
if output["return_code"]:
47+
msg = f"Error running '{task.name}' task with {task.command_args()}:"
48+
if output["stderr"]:
49+
msg += "\n\nstderr:\n" + output["stderr"]
50+
if output["stdout"]:
51+
msg += "\n\nstdout:\n" + output["stdout"]
52+
raise RuntimeError(msg)
53+
return output
54+
55+
56+
class Container(Environment):
57+
"""
58+
Base class for container environments used by Docker and Singularity.
59+
60+
Parameters
61+
----------
62+
image : str
63+
Name of the container image
64+
tag : str
65+
Tag of the container image
66+
root : str
67+
Base path for mounting host directories into the container
68+
xargs : Union[str, List[str]]
69+
Extra arguments to be passed to the container
70+
"""
71+
72+
def __init__(self, image, tag="latest", root="/mnt/pydra", xargs=None):
73+
self.image = image
74+
self.tag = tag
75+
if xargs is None:
76+
xargs = []
77+
elif isinstance(xargs, str):
78+
xargs = xargs.split()
79+
self.xargs = xargs
80+
self.root = root
81+
82+
def bind(self, loc, mode="ro"):
83+
loc_abs = Path(loc).absolute()
84+
return f"{loc_abs}:{self.root}{loc_abs}:{mode}"
85+
86+
87+
class Docker(Container):
88+
"""Docker environment."""
89+
90+
def execute(self, task):
91+
docker_img = f"{self.image}:{self.tag}"
92+
# mounting all input locations
93+
mounts = task.get_bindings(root=self.root)
94+
95+
docker_args = [
96+
"docker",
97+
"run",
98+
"-v",
99+
self.bind(task.cache_dir, "rw"),
100+
*self.xargs,
101+
]
102+
docker_args.extend(
103+
" ".join(
104+
[f"-v {key}:{val[0]}:{val[1]}" for (key, val) in mounts.items()]
105+
).split()
106+
)
107+
docker_args.extend(["-w", f"{self.root}{task.output_dir}"])
108+
keys = ["return_code", "stdout", "stderr"]
109+
110+
values = execute(
111+
docker_args + [docker_img] + task.command_args(root=self.root),
112+
strip=task.strip,
113+
)
114+
output = dict(zip(keys, values))
115+
if output["return_code"]:
116+
if output["stderr"]:
117+
raise RuntimeError(output["stderr"])
118+
else:
119+
raise RuntimeError(output["stdout"])
120+
return output
121+
122+
123+
class Singularity(Container):
124+
"""Singularity environment."""
125+
126+
def execute(self, task):
127+
singularity_img = f"{self.image}:{self.tag}"
128+
# mounting all input locations
129+
mounts = task.get_bindings(root=self.root)
130+
131+
# todo adding xargsy etc
132+
singularity_args = [
133+
"singularity",
134+
"exec",
135+
"-B",
136+
self.bind(task.cache_dir, "rw"),
137+
*self.xargs,
138+
]
139+
singularity_args.extend(
140+
" ".join(
141+
[f"-B {key}:{val[0]}:{val[1]}" for (key, val) in mounts.items()]
142+
).split()
143+
)
144+
singularity_args.extend(["--pwd", f"{self.root}{task.output_dir}"])
145+
keys = ["return_code", "stdout", "stderr"]
146+
147+
values = execute(
148+
singularity_args + [singularity_img] + task.command_args(root=self.root),
149+
strip=task.strip,
150+
)
151+
output = dict(zip(keys, values))
152+
if output["return_code"]:
153+
if output["stderr"]:
154+
raise RuntimeError(output["stderr"])
155+
else:
156+
raise RuntimeError(output["stdout"])
157+
return output

pydra/engine/specs.py

Lines changed: 0 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -677,37 +677,6 @@ def _check_requires(self, fld, inputs):
677677
return False
678678

679679

680-
@attr.s(auto_attribs=True, kw_only=True)
681-
class ContainerSpec(ShellSpec):
682-
"""Refine the generic command-line specification to container execution."""
683-
684-
image: ty.Union[File, str] = attr.ib(
685-
metadata={"help_string": "image", "mandatory": True}
686-
)
687-
"""The image to be containerized."""
688-
container: ty.Union[File, str, None] = attr.ib(
689-
metadata={"help_string": "container"}
690-
)
691-
"""The container."""
692-
container_xargs: ty.Optional[ty.List[str]] = attr.ib(
693-
default=None, metadata={"help_string": "todo"}
694-
)
695-
696-
697-
@attr.s(auto_attribs=True, kw_only=True)
698-
class DockerSpec(ContainerSpec):
699-
"""Particularize container specifications to the Docker engine."""
700-
701-
container: str = attr.ib("docker", metadata={"help_string": "container"})
702-
703-
704-
@attr.s(auto_attribs=True, kw_only=True)
705-
class SingularitySpec(ContainerSpec):
706-
"""Particularize container specifications to Singularity."""
707-
708-
container: str = attr.ib("singularity", metadata={"help_string": "container type"})
709-
710-
711680
@attr.s
712681
class LazyInterface:
713682
_task: "core.TaskBase" = attr.ib()

pydra/engine/submitter.py

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -36,14 +36,16 @@ def __init__(self, plugin="cf", **kwargs):
3636
raise NotImplementedError(f"No worker for {self.plugin}")
3737
self.worker.loop = self.loop
3838

39-
def __call__(self, runnable, cache_locations=None, rerun=False):
39+
def __call__(self, runnable, cache_locations=None, rerun=False, environment=None):
4040
"""Submitter run function."""
4141
if cache_locations is not None:
4242
runnable.cache_locations = cache_locations
43-
self.loop.run_until_complete(self.submit_from_call(runnable, rerun))
43+
self.loop.run_until_complete(
44+
self.submit_from_call(runnable, rerun, environment)
45+
)
4446
return runnable.result()
4547

46-
async def submit_from_call(self, runnable, rerun):
48+
async def submit_from_call(self, runnable, rerun, environment):
4749
"""
4850
This coroutine should only be called once per Submitter call,
4951
and serves as the bridge between sync/async lands.
@@ -57,7 +59,7 @@ async def submit_from_call(self, runnable, rerun):
5759
Once Python 3.10 is the minimum, this should probably be refactored into using
5860
structural pattern matching.
5961
"""
60-
if is_workflow(runnable):
62+
if is_workflow(runnable): # TODO: env to wf
6163
# connect and calculate the checksum of the graph before running
6264
runnable._connect_and_propagate_to_tasks(override_task_caches=True)
6365
# 0
@@ -75,10 +77,11 @@ async def submit_from_call(self, runnable, rerun):
7577
# 2
7678
if runnable.state is None:
7779
# run_el should always return a coroutine
78-
await self.worker.run_el(runnable, rerun=rerun)
80+
print("in SUBM", environment)
81+
await self.worker.run_el(runnable, rerun=rerun, environment=environment)
7982
# 3
8083
else:
81-
await self.expand_runnable(runnable, wait=True, rerun=rerun)
84+
await self.expand_runnable(runnable, wait=True, rerun=rerun) # TODO
8285
return True
8386

8487
async def expand_runnable(self, runnable, wait=False, rerun=False):

0 commit comments

Comments
 (0)