Skip to content

Fix docker volume staging for writable files / file literals. #533

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Aug 22, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 37 additions & 31 deletions cwltool/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
from .builder import Builder
from .docker_id import docker_vm_id
from .errors import WorkflowException
from .pathmapper import PathMapper
from .pathmapper import PathMapper, ensure_writable
from .process import (UnsupportedRequirement, empty_subtree, get_feature,
stageFiles)
from .utils import bytes2str_in_dicts
Expand Down Expand Up @@ -98,24 +98,26 @@ def deref_links(outputs): # type: (Any) -> None
for v in outputs:
deref_links(v)

def relink_initialworkdir(pathmapper, inplace_update=False):
# type: (PathMapper, bool) -> None
def relink_initialworkdir(pathmapper, host_outdir, container_outdir, inplace_update=False):
# type: (PathMapper, Text, Text, bool) -> None
for src, vol in pathmapper.items():
if not vol.staged:
continue

if vol.type in ("File", "Directory") or (inplace_update and
vol.type in ("WritableFile", "WritableDirectory")):
if os.path.islink(vol.target) or os.path.isfile(vol.target):
os.remove(vol.target)
elif os.path.isdir(vol.target):
shutil.rmtree(vol.target)
host_outdir_tgt = os.path.join(host_outdir, vol.target[len(container_outdir)+1:])
if os.path.islink(host_outdir_tgt) or os.path.isfile(host_outdir_tgt):
os.remove(host_outdir_tgt)
elif os.path.isdir(host_outdir_tgt):
shutil.rmtree(host_outdir_tgt)
if onWindows():
if vol.type in ("File", "WritableFile"):
shutil.copy(vol.resolved,vol.target)
shutil.copy(vol.resolved, host_outdir_tgt)
elif vol.type in ("Directory", "WritableDirectory"):
copytree_with_merge(vol.resolved, vol.target)
copytree_with_merge(vol.resolved, host_outdir_tgt)
else:
os.symlink(vol.resolved, vol.target)
os.symlink(vol.resolved, host_outdir_tgt)

class JobBase(object):
def __init__(self): # type: () -> None
Expand Down Expand Up @@ -160,7 +162,7 @@ def _setup(self, kwargs): # type: (Dict) -> None
make_path_mapper_kwargs = make_path_mapper_kwargs.copy()
del make_path_mapper_kwargs["basedir"]
self.generatemapper = self.make_pathmapper(cast(List[Any], self.generatefiles["listing"]),
self.outdir, basedir=self.outdir, separateDirs=False, **make_path_mapper_kwargs)
self.builder.outdir, basedir=self.outdir, separateDirs=False, **make_path_mapper_kwargs)
_logger.debug(u"[job %s] initial work dir %s", self.name,
json.dumps({p: self.generatemapper.mapper(p) for p in self.generatemapper.files()}, indent=4))

Expand Down Expand Up @@ -234,7 +236,7 @@ def _execute(self, runtime, env, rm_tmpdir=True, move_outputs="move"):
processStatus = "permanentFail"

if self.generatefiles["listing"]:
relink_initialworkdir(self.generatemapper, inplace_update=self.inplace_update)
relink_initialworkdir(self.generatemapper, self.outdir, self.builder.outdir, inplace_update=self.inplace_update)

outputs = self.collect_outputs(self.outdir)
outputs = bytes2str_in_dicts(outputs) # type: ignore
Expand Down Expand Up @@ -303,48 +305,52 @@ def run(self, pull_image=True, rm_container=True,
stageFiles(self.pathmapper, ignoreWritable=True, symLink=True)
if self.generatemapper:
stageFiles(self.generatemapper, ignoreWritable=self.inplace_update, symLink=True)
relink_initialworkdir(self.generatemapper, inplace_update=self.inplace_update)
relink_initialworkdir(self.generatemapper, self.outdir, self.builder.outdir, inplace_update=self.inplace_update)

self._execute([], env, rm_tmpdir=rm_tmpdir, move_outputs=move_outputs)


class DockerCommandLineJob(JobBase):

def add_volumes(self, pathmapper, runtime, stage_output):
# type: (PathMapper, List[Text], bool) -> None
def add_volumes(self, pathmapper, runtime):
# type: (PathMapper, List[Text]) -> None

host_outdir = self.outdir
container_outdir = self.builder.outdir
for src, vol in pathmapper.items():
if not vol.staged:
continue
if stage_output:
containertgt = container_outdir + vol.target[len(host_outdir):]
if vol.target.startswith(container_outdir+"/"):
host_outdir_tgt = os.path.join(host_outdir, vol.target[len(container_outdir)+1:])
else:
containertgt = vol.target
host_outdir_tgt = None
if vol.type in ("File", "Directory"):
if not vol.resolved.startswith("_:"):
runtime.append(u"--volume=%s:%s:ro" % (docker_windows_path_adjust(vol.resolved), docker_windows_path_adjust(containertgt)))
runtime.append(u"--volume=%s:%s:ro" % (docker_windows_path_adjust(vol.resolved), docker_windows_path_adjust(vol.target)))
elif vol.type == "WritableFile":
if self.inplace_update:
runtime.append(u"--volume=%s:%s:rw" % (docker_windows_path_adjust(vol.resolved), docker_windows_path_adjust(containertgt)))
runtime.append(u"--volume=%s:%s:rw" % (docker_windows_path_adjust(vol.resolved), docker_windows_path_adjust(vol.target)))
else:
shutil.copy(vol.resolved, vol.target)
shutil.copy(vol.resolved, host_outdir_tgt)
ensure_writable(host_outdir_tgt)
elif vol.type == "WritableDirectory":
if vol.resolved.startswith("_:"):
os.makedirs(vol.target, 0o0755)
else:
if self.inplace_update:
runtime.append(u"--volume=%s:%s:rw" % (docker_windows_path_adjust(vol.resolved), docker_windows_path_adjust(containertgt)))
runtime.append(u"--volume=%s:%s:rw" % (docker_windows_path_adjust(vol.resolved), docker_windows_path_adjust(vol.target)))
else:
shutil.copytree(vol.resolved, vol.target)
shutil.copytree(vol.resolved, host_outdir_tgt)
ensure_writable(host_outdir_tgt)
elif vol.type == "CreateFile":
createtmp = os.path.join(host_outdir, os.path.basename(vol.target))
with open(createtmp, "wb") as f:
f.write(vol.resolved.encode("utf-8"))
if not vol.target.startswith(container_outdir):
runtime.append(u"--volume=%s:%s:ro" % (docker_windows_path_adjust(createtmp), docker_windows_path_adjust(vol.target)))

if host_outdir_tgt:
with open(host_outdir_tgt, "wb") as f:
f.write(vol.resolved.encode("utf-8"))
else:
fd, createtmp = tempfile.mkstemp(dir=self.tmpdir)
with os.fdopen(fd, "wb") as f:
f.write(vol.resolved.encode("utf-8"))
runtime.append(u"--volume=%s:%s:rw" % (docker_windows_path_adjust(createtmp), docker_windows_path_adjust(vol.target)))

def run(self, pull_image=True, rm_container=True,
rm_tmpdir=True, move_outputs="move", **kwargs):
Expand Down Expand Up @@ -384,9 +390,9 @@ def run(self, pull_image=True, rm_container=True,
runtime.append(u"--volume=%s:%s:rw" % (docker_windows_path_adjust(os.path.realpath(self.outdir)), self.builder.outdir))
runtime.append(u"--volume=%s:%s:rw" % (docker_windows_path_adjust(os.path.realpath(self.tmpdir)), "/tmp"))

self.add_volumes(self.pathmapper, runtime, False)
self.add_volumes(self.pathmapper, runtime)
if self.generatemapper:
self.add_volumes(self.generatemapper, runtime, True)
self.add_volumes(self.generatemapper, runtime)

runtime.append(u"--workdir=%s" % (docker_windows_path_adjust(self.builder.outdir)))
runtime.append(u"--read-only=true")
Expand Down
19 changes: 19 additions & 0 deletions cwltool/pathmapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,25 @@ def downloadHttpFile(httpurl):
r.close()
return f.name

def ensure_writable(path):
# type: (Text) -> None
if os.path.isdir(path):
for root, dirs, files in os.walk(path):
for name in files:
j = os.path.join(root, name)
st = os.stat(j)
mode = stat.S_IMODE(st.st_mode)
os.chmod(j, mode|stat.S_IWUSR)
for name in dirs:
j = os.path.join(root, name)
st = os.stat(j)
mode = stat.S_IMODE(st.st_mode)
os.chmod(j, mode|stat.S_IWUSR)
else:
st = os.stat(path)
mode = stat.S_IMODE(st.st_mode)
os.chmod(path, mode|stat.S_IWUSR)

class PathMapper(object):
"""Mapping of files from relative path provided in the file to a tuple of
(absolute local path, absolute container path)
Expand Down
7 changes: 5 additions & 2 deletions cwltool/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@
from .builder import Builder
from .errors import UnsupportedRequirement, WorkflowException
from .pathmapper import (PathMapper, adjustDirObjs, get_listing,
normalizeFilesDirs, visit_class, trim_listing)
normalizeFilesDirs, visit_class, trim_listing,
ensure_writable)
from .stdfsaccess import StdFsAccess
from .utils import aslist, get_feature, copytree_with_merge, onWindows

Expand Down Expand Up @@ -230,15 +231,17 @@ def stageFiles(pm, stageFunc=None, ignoreWritable=False, symLink=True):
os.makedirs(p.target, 0o0755)
elif p.type == "WritableFile" and not ignoreWritable:
shutil.copy(p.resolved, p.target)
ensure_writable(p.target)
elif p.type == "WritableDirectory" and not ignoreWritable:
if p.resolved.startswith("_:"):
os.makedirs(p.target, 0o0755)
else:
shutil.copytree(p.resolved, p.target)
ensure_writable(p.target)
elif p.type == "CreateFile":
with open(p.target, "wb") as n:
n.write(p.resolved.encode("utf-8"))

ensure_writable(p.target)

def collectFilesAndDirs(obj, out):
# type: (Union[Dict[Text, Any], List[Dict[Text, Any]]], List[Dict[Text, Any]]) -> None
Expand Down