Skip to content

Commit

Permalink
Improved StreamFlow provenance support
Browse files Browse the repository at this point in the history
This commit introduces two new options to the `streamflow prov` command:

- The `--add-file` option allows user to add additional files and
  folders to the archive, potentially defining custom properties;

- The `--add-property` option allows users to add/update properties in
  the provenance manifest, if present.

Plus, this commit corrects the way `docker` CWL configuration is
propagated to inner steps.
  • Loading branch information
GlassOfWhiskey committed Apr 25, 2023
1 parent e10b9f7 commit 2f4a2c8
Show file tree
Hide file tree
Showing 8 changed files with 358 additions and 66 deletions.
6 changes: 4 additions & 2 deletions streamflow/config/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,9 +99,11 @@ def get(
current_node = current_node["children"][part]
return current_node.get(name)

def propagate(self, path: PurePosixPath, name: str) -> Any | None:
def propagate(
self, path: PurePosixPath, name: str, default: Any | None = None
) -> Any | None:
current_node = self.filesystem
value = None
value = default
for part in path.parts:
if part not in current_node["children"]:
return value
Expand Down
9 changes: 7 additions & 2 deletions streamflow/core/provenance.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from __future__ import annotations

from abc import abstractmethod
from typing import MutableSequence, TYPE_CHECKING
from typing import MutableMapping, MutableSequence, TYPE_CHECKING

from streamflow.core.persistence import DatabaseLoadingContext

Expand All @@ -23,6 +23,11 @@ def __init__(

@abstractmethod
async def create_archive(
self, outdir: str, filename: str | None, config: str | None
self,
outdir: str,
filename: str | None,
config: str | None,
additional_files: MutableSequence[MutableMapping[str, str]] | None,
additional_properties: MutableSequence[MutableMapping[str, str]] | None,
):
...
2 changes: 1 addition & 1 deletion streamflow/cwl/translator.py
Original file line number Diff line number Diff line change
Expand Up @@ -1515,7 +1515,7 @@ def _translate_command_line_tool(
targets.append(
_process_docker_requirement(
config_dir=os.path.dirname(self.context.config["path"]),
config=self.workflow_config.get(
config=self.workflow_config.propagate(
path=PurePosixPath(name_prefix),
name="docker",
default=CWLDockerTranslatorConfig(
Expand Down
4 changes: 4 additions & 0 deletions streamflow/deployment/connector/kubernetes.py
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,10 @@ async def _copy_remote_to_remote(
for writer in writers
)
)
# Close writers
await asyncio.gather(
*(asyncio.create_task(writer.close()) for writer in writers)
)

async def _get_container(self, location: Location) -> tuple[str, V1Container]:
pod_name, container_name = location.name.split(":")
Expand Down
6 changes: 3 additions & 3 deletions streamflow/deployment/deployment_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -196,9 +196,9 @@ async def undeploy(self, deployment_name: str):
logger.info(f"COMPLETED Undeployment of {deployment_name}")
self.events_map[deployment_name].set()
# Remove the current environment from all the other dependency graphs
for name, deps in {
k: v for k, v in self.dependency_graph.items() if k != deployment_name
}.items():
for name, deps in (
(k, v) for k, v in self.dependency_graph.items() if k != deployment_name
):
deps.discard(deployment_name)
# If there are no more dependencies, undeploy the environment
if len(deps) == 0:
Expand Down
2 changes: 2 additions & 0 deletions streamflow/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,8 @@ async def _async_prov(args: argparse.Namespace):
outdir=args.outdir,
filename=args.name,
config=args.file if os.path.exists(args.file) else None,
additional_files=args.add_file,
additional_properties=args.add_property,
)
finally:
await context.close()
Expand Down
42 changes: 41 additions & 1 deletion streamflow/parser.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,31 @@
import argparse
import os
import re


UNESCAPED_COMMA = re.compile(r"(?<!\\),")
UNESCAPED_EQUAL = re.compile(r"(?<!\\)=")


class _KeyValueAction(argparse.Action):
def __call__(self, _parser, namespace, values, option_string=None):
items = getattr(namespace, self.dest) or []
items.append(
{
k: v
for k, v in (
UNESCAPED_EQUAL.split(val, maxsplit=1)
for val in UNESCAPED_COMMA.split(values)
)
}
)
setattr(namespace, self.dest, items)


parser = argparse.ArgumentParser(description="StreamFlow Command Line")
subparsers = parser.add_subparsers(dest="context")


# streamflow list
list_parser = subparsers.add_parser("list", help="List the executed workflows")
list_parser.add_argument(
Expand All @@ -16,7 +38,6 @@
list_parser.add_argument(
"name",
metavar="NAME",
nargs="?",
type=str,
help="List all executions for the given workflow",
)
Expand All @@ -25,9 +46,28 @@
prov_parser = subparsers.add_parser(
"prov", help="Generate a provenance archive for an executed workflow"
)
prov_parser.register("action", "key_value", _KeyValueAction)
prov_parser.add_argument(
"workflow", metavar="WORKFLOW", type=str, help="Name of the workflow to process"
)
prov_parser.add_argument(
"--add-file",
action="key_value",
help="Add an external file to the provenance archive. File properties are specified as comma-separated "
"key-value pairs (key1=value1,key2=value2). A `src` property (mandatory) specifies where the file is "
"located. A `dst` property (default: /) contains a POSIX path specifying where the file should be placed "
"in the archive file system (the root folder '/' corresponds to the root of the archive). Additional "
'properties can be specified as strings or JSON objects (e.g., about={\\"@id\\":\\"./\\"}) and their meaning '
"depends on the selected provenance type.",
)
prov_parser.add_argument(
"--add-property",
action="key_value",
help="Add a property to the archive manifest (if present). Properties are specified as comma-separated "
"key-value pairs (key1=value1,key2=value2) and can be specified as strings or JSON objects (e.g., "
'\\./.license={\\"@id\\":\\"LICENSE\\"}). The way in which property keys map to manifest objects '
"depends on the selected provenance type.",
)
prov_parser.add_argument(
"--all",
"-a",
Expand Down
Loading

0 comments on commit 2f4a2c8

Please sign in to comment.