diff --git a/.github/workflows/ci-tests.yaml b/.github/workflows/ci-tests.yaml index e40d65bed..8e42d0058 100644 --- a/.github/workflows/ci-tests.yaml +++ b/.github/workflows/ci-tests.yaml @@ -24,9 +24,9 @@ jobs: - uses: github/codeql-action/analyze@v2 cwl-conformance: name: "CWL conformance tests" - runs-on: ubuntu-22.04 strategy: matrix: + on: [ "ubuntu-22.04" ] python: [ "3.8", "3.9", "3.10", "3.11" ] version: [ "v1.0", "v1.1", "v1.2" ] include: @@ -39,6 +39,10 @@ jobs: - commit: "ad6f77c648ae9f0eaa2fd53ba7032b0523e12de8" exclude: "docker_entrypoint,modify_file_content" version: "v1.2" + - on: "macos-12" + python: "3.11" + version: "v1.2" + runs-on: ${{ matrix.on }} steps: - uses: actions/checkout@v3 - uses: actions/setup-python@v4 @@ -47,6 +51,8 @@ jobs: - uses: actions/setup-node@v3 with: node-version: "15" + - uses: douglascamata/setup-docker-macos-action@v1-alpha + if: ${{ startsWith(matrix.on, 'macos-') }} - uses: docker/setup-qemu-action@v2 - name: "Install Streamflow" run: | @@ -89,10 +95,14 @@ jobs: run: tox unit-tests: name: "StreamFlow unit tests" - runs-on: ubuntu-22.04 strategy: matrix: + on: [ "ubuntu-22.04"] python: [ "3.8", "3.9", "3.10", "3.11" ] + include: + - on: "macos-12" + python: "3.11" + runs-on: ${{ matrix.on }} env: TOXENV: ${{ format('py{0}-unit', matrix.python) }} steps: @@ -107,13 +117,15 @@ jobs: - uses: actions/setup-node@v3 with: node-version: "15" + - uses: douglascamata/setup-docker-macos-action@v1-alpha + if: ${{ startsWith(matrix.on, 'macos-') }} - uses: docker/setup-qemu-action@v2 - name: "Install Python Dependencies and Streamflow" run: | python -m pip install tox --user python -m pip install . --user - name: "Run StreamFlow tests via Tox" - run: tox + run: python -m tox - name: "Upload coverage report for unit tests" uses: actions/upload-artifact@v3 with: @@ -133,4 +145,4 @@ jobs: uses: codecov/codecov-action@v3 with: fail_ci_if_error: true - token: ${{ secrets.CODECOV_TOKEN }} \ No newline at end of file + token: ${{ secrets.CODECOV_TOKEN }} diff --git a/streamflow/core/deployment.py b/streamflow/core/deployment.py index ec5ef69c6..165fe35ac 100644 --- a/streamflow/core/deployment.py +++ b/streamflow/core/deployment.py @@ -27,7 +27,7 @@ def _init_workdir(deployment_name: str) -> str: if deployment_name != LOCAL_LOCATION: return posixpath.join("/tmp", "streamflow") # nosec else: - return os.path.join(tempfile.gettempdir(), "streamflow") + return os.path.join(os.path.realpath(tempfile.gettempdir()), "streamflow") class Location: diff --git a/streamflow/core/utils.py b/streamflow/core/utils.py index 5e5373523..9c1f4e6ad 100644 --- a/streamflow/core/utils.py +++ b/streamflow/core/utils.py @@ -97,9 +97,10 @@ def dict_product(**kwargs) -> MutableMapping[Any, Any]: yield dict(zip(keys, list(instance))) -def encode_command(command: str): - return "echo {command} | base64 -d | sh".format( - command=base64.b64encode(command.encode("utf-8")).decode("utf-8") +def encode_command(command: str, shell: str = "sh"): + return "echo {command} | base64 -d | {shell}".format( + command=base64.b64encode(command.encode("utf-8")).decode("utf-8"), + shell=shell, # nosec ) diff --git a/streamflow/cwl/processor.py b/streamflow/cwl/processor.py index a4c8f806c..18c9fda1e 100644 --- a/streamflow/cwl/processor.py +++ b/streamflow/cwl/processor.py @@ -9,7 +9,7 @@ from schema_salad.exceptions import ValidationException from streamflow.core.context import StreamFlowContext -from streamflow.core.deployment import Connector, LOCAL_LOCATION, Target +from streamflow.core.deployment import Connector, Target, LOCAL_LOCATION from streamflow.core.exception import ( WorkflowDefinitionException, WorkflowExecutionException, @@ -151,82 +151,92 @@ async def _process_file_token( except ValidationException as e: raise WorkflowExecutionException(e.message) from e # If file exists, get coordinates - if filepath and ( - data_location := self.workflow.context.data_manager.get_source_location( - filepath, LOCAL_LOCATION - ) - ): - connector = self.workflow.context.deployment_manager.get_connector( - data_location.deployment - ) - path_processor = get_path_processor(connector) - base_path = path_processor.normpath( - data_location.path[: -len(data_location.relpath)] - ) - # Process file contents - token_value = await utils.update_file_token( - context=self.workflow.context, - connector=connector, - location=data_location, - token_value=token_value, - load_contents=self.load_contents, - load_listing=self.load_listing, - ) - # Process secondary files - if token_value.get("secondaryFiles"): - initial_paths = [ - utils.get_path_from_token(sf) - for sf in token_value["secondaryFiles"] - ] - sf_map = dict( - zip( - initial_paths, - await asyncio.gather( - *( - asyncio.create_task( - utils.update_file_token( - context=self.workflow.context, - connector=connector, - location=data_location, - token_value=sf, - load_contents=self.load_contents, - load_listing=self.load_listing, + if filepath: + try: + # Privilege locations on the destination connector to ensure existence of secondaryFiles + data_locations = self.workflow.context.data_manager.get_data_locations( + path=filepath, + ) + data_location = next( + loc for loc in data_locations if loc.path == filepath + ) + except StopIteration: + # If such location does not exist, apply the standard heuristic to select the best one + data_location = self.workflow.context.data_manager.get_source_location( + path=filepath, dst_deployment=LOCAL_LOCATION + ) + if data_location: + connector = self.workflow.context.deployment_manager.get_connector( + data_location.deployment + ) + path_processor = get_path_processor(connector) + base_path = path_processor.normpath( + data_location.path[: -len(data_location.relpath)] + ) + # Process file contents + token_value = await utils.update_file_token( + context=self.workflow.context, + connector=connector, + location=data_location, + token_value=token_value, + load_contents=self.load_contents, + load_listing=self.load_listing, + ) + # Process secondary files + if token_value.get("secondaryFiles"): + initial_paths = [ + utils.get_path_from_token(sf) + for sf in token_value["secondaryFiles"] + ] + sf_map = dict( + zip( + initial_paths, + await asyncio.gather( + *( + asyncio.create_task( + utils.update_file_token( + context=self.workflow.context, + connector=connector, + location=data_location, + token_value=sf, + load_contents=self.load_contents, + load_listing=self.load_listing, + ) ) + for sf in token_value["secondaryFiles"] ) - for sf in token_value["secondaryFiles"] - ) - ), + ), + ) ) - ) - else: - sf_map = {} - if self.secondary_files: - sf_context = {**context, "self": token_value} - await utils.process_secondary_files( + else: + sf_map = {} + if self.secondary_files: + sf_context = {**context, "self": token_value} + await utils.process_secondary_files( + context=self.workflow.context, + secondary_files=self.secondary_files, + sf_map=sf_map, + js_context=sf_context, + full_js=self.full_js, + expression_lib=self.expression_lib, + connector=connector, + locations=[data_location], + token_value=token_value, + load_contents=self.load_contents, + load_listing=self.load_listing, + only_retrieve_from_token=self.only_propagate_secondary_files, + ) + # Add all secondary files to the token + if sf_map: + token_value["secondaryFiles"] = list(sf_map.values()) + # Register path + await utils.register_data( context=self.workflow.context, - secondary_files=self.secondary_files, - sf_map=sf_map, - js_context=sf_context, - full_js=self.full_js, - expression_lib=self.expression_lib, connector=connector, locations=[data_location], token_value=token_value, - load_contents=self.load_contents, - load_listing=self.load_listing, - only_retrieve_from_token=self.only_propagate_secondary_files, + base_path=base_path, ) - # Add all secondary files to the token - if sf_map: - token_value["secondaryFiles"] = list(sf_map.values()) - # Register path - await utils.register_data( - context=self.workflow.context, - connector=connector, - locations=[data_location], - token_value=token_value, - base_path=base_path, - ) # Return token value return token_value diff --git a/streamflow/cwl/translator.py b/streamflow/cwl/translator.py index 436b22612..00253f74c 100644 --- a/streamflow/cwl/translator.py +++ b/streamflow/cwl/translator.py @@ -1039,19 +1039,21 @@ def _process_docker_requirement( "image": image_name, "logDriver": "none", "network": "default" if network_access else "none", - "volume": [f"{target.workdir}:{target.workdir}"], + "volume": [f"{target.workdir}:/tmp/streamflow"], } # Manage dockerOutputDirectory directive if "dockerOutputDirectory" in docker_requirement: docker_config["workdir"] = docker_requirement["dockerOutputDirectory"] context["output_directory"] = docker_config["workdir"] - local_dir = os.path.join(tempfile.gettempdir(), "streamflow", random_name()) + local_dir = os.path.join( + os.path.realpath(tempfile.gettempdir()), "streamflow", random_name() + ) os.makedirs(local_dir, exist_ok=True) docker_config["volume"].append(f"{local_dir}:{docker_config['workdir']}") # Build step target deployment = DeploymentConfig(name=name, type="docker", config=docker_config) - step_target = Target( - deployment=deployment, service=image_name, workdir=target.workdir + step_target = Target( # nosec + deployment=deployment, service=image_name, workdir="/tmp/streamflow" ) return step_target diff --git a/streamflow/deployment/connector/base.py b/streamflow/deployment/connector/base.py index a77e552e2..ac10f4fe9 100644 --- a/streamflow/deployment/connector/base.py +++ b/streamflow/deployment/connector/base.py @@ -249,6 +249,9 @@ def _get_run_command( ) -> str: ... + def _get_shell(self) -> str: + return "sh" + def _get_stream_reader(self, location: Location, src: str) -> StreamWrapperContext: dirname, basename = posixpath.split(src) return SubprocessStreamReaderWrapperContext( @@ -369,7 +372,7 @@ async def run( job=f"for job {job_name}" if job_name else "", ) ) - command = utils.encode_command(command) + command = utils.encode_command(command, self._get_shell()) run_command = self._get_run_command(command, location) proc = await asyncio.create_subprocess_exec( *shlex.split(run_command), diff --git a/streamflow/deployment/connector/local.py b/streamflow/deployment/connector/local.py index 796e71838..1f2626c9f 100644 --- a/streamflow/deployment/connector/local.py +++ b/streamflow/deployment/connector/local.py @@ -33,9 +33,17 @@ def _get_run_command( self, command: str, location: Location, interactive: bool = False ): if sys.platform == "win32": - return f"cmd /C '{command}'" + return f"{self._get_shell()} /C '{command}'" else: - return f"sh -c '{command}'" + return f"{self._get_shell()} -c '{command}'" + + def _get_shell(self) -> str: + if sys.platform == "win32": + return "cmd" + elif sys.platform == "darwin": + return "bash" + else: + return "sh" async def _copy_remote_to_remote( self, @@ -64,7 +72,10 @@ async def _copy_remote_to_remote( ) async def deploy(self, external: bool) -> None: - os.makedirs(os.path.join(tempfile.gettempdir(), "streamflow"), exist_ok=True) + os.makedirs( + os.path.join(os.path.realpath(tempfile.gettempdir()), "streamflow"), + exist_ok=True, + ) async def get_available_locations( self, diff --git a/streamflow/recovery/checkpoint_manager.py b/streamflow/recovery/checkpoint_manager.py index 66d132bf4..85785364d 100644 --- a/streamflow/recovery/checkpoint_manager.py +++ b/streamflow/recovery/checkpoint_manager.py @@ -22,7 +22,10 @@ class DefaultCheckpointManager(CheckpointManager): def __init__(self, context: StreamFlowContext, checkpoint_dir: str | None = None): super().__init__(context) self.checkpoint_dir = checkpoint_dir or os.path.join( - tempfile.gettempdir(), "streamflow", "checkpoint", utils.random_name() + os.path.realpath(tempfile.gettempdir()), + "streamflow", + "checkpoint", + utils.random_name(), ) self.copy_tasks: MutableSequence = [] diff --git a/tests/conftest.py b/tests/conftest.py index 5b74c96ea..b51adac46 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,4 +1,5 @@ import asyncio +import os import tempfile from asyncio.locks import Lock from collections.abc import Iterable @@ -42,7 +43,7 @@ def get_docker_deployment_config(): @pytest_asyncio.fixture(scope="session") async def context() -> StreamFlowContext: context = build_context( - tempfile.gettempdir(), + os.path.realpath(tempfile.gettempdir()), {"database": {"type": "default", "config": {"connection": ":memory:"}}}, ) await context.deployment_manager.deploy( @@ -52,7 +53,7 @@ async def context() -> StreamFlowContext: config={}, external=True, lazy=False, - workdir=tempfile.gettempdir(), + workdir=os.path.realpath(tempfile.gettempdir()), ) ) await context.deployment_manager.deploy(get_docker_deployment_config())