Skip to content

Commit

Permalink
Fixed MacOS X compatibility (#77)
Browse files Browse the repository at this point in the history
This commit fixes support for MacOS X operating system. In particular it
fixes the following bugs:
 - On MacOS X, temporary directory can be a symbolic link. This commit
explicitly derives the real path when creating a temporary folder;
 - On MacOS X, the default version of sh is quite old and doesn't support
all command features. This commit explicitly invokes bash shell on Darwin
systems;
 - The CWL translator was mounting the host StreamFlow workdir with a bind
mount to the same path in a Docker container. Now the folder in the container
is always configured as `/tmp/streamflow`.
In addition, this commit sets up a Mac OS X runner to perform automatic CI
unit tests and CWL conformance tests when submitting PRs.
  • Loading branch information
GlassOfWhiskey authored Mar 3, 2023
1 parent 802fd42 commit b583419
Show file tree
Hide file tree
Showing 9 changed files with 133 additions and 88 deletions.
22 changes: 18 additions & 4 deletions .github/workflows/ci-tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ jobs:
- uses: github/codeql-action/analyze@v2
cwl-conformance:
name: "CWL conformance tests"
runs-on: ubuntu-22.04
strategy:
matrix:
on: [ "ubuntu-22.04" ]
python: [ "3.8", "3.9", "3.10", "3.11" ]
version: [ "v1.0", "v1.1", "v1.2" ]
include:
Expand All @@ -39,6 +39,12 @@ jobs:
- commit: "ad6f77c648ae9f0eaa2fd53ba7032b0523e12de8"
exclude: "docker_entrypoint,modify_file_content"
version: "v1.2"
- on: "macos-12"
python: "3.11"
commit: "ad6f77c648ae9f0eaa2fd53ba7032b0523e12de8"
exclude: "docker_entrypoint,modify_file_content,step_input_default_value_noexp,step_input_default_value_overriden_noexp,nested_workflow_noexp,step_input_default_value_overriden_2nd_step_noexp,step_input_default_value_overriden_2nd_step_null_noexp"
version: "v1.2"
runs-on: ${{ matrix.on }}
steps:
- uses: actions/checkout@v3
- uses: actions/setup-python@v4
Expand All @@ -47,6 +53,8 @@ jobs:
- uses: actions/setup-node@v3
with:
node-version: "15"
- uses: douglascamata/setup-docker-macos-action@v1-alpha
if: ${{ startsWith(matrix.on, 'macos-') }}
- uses: docker/setup-qemu-action@v2
- name: "Install Streamflow"
run: |
Expand Down Expand Up @@ -89,10 +97,14 @@ jobs:
run: tox
unit-tests:
name: "StreamFlow unit tests"
runs-on: ubuntu-22.04
strategy:
matrix:
on: [ "ubuntu-22.04"]
python: [ "3.8", "3.9", "3.10", "3.11" ]
include:
- on: "macos-12"
python: "3.11"
runs-on: ${{ matrix.on }}
env:
TOXENV: ${{ format('py{0}-unit', matrix.python) }}
steps:
Expand All @@ -107,13 +119,15 @@ jobs:
- uses: actions/setup-node@v3
with:
node-version: "15"
- uses: douglascamata/setup-docker-macos-action@v1-alpha
if: ${{ startsWith(matrix.on, 'macos-') }}
- uses: docker/setup-qemu-action@v2
- name: "Install Python Dependencies and Streamflow"
run: |
python -m pip install tox --user
python -m pip install . --user
- name: "Run StreamFlow tests via Tox"
run: tox
run: python -m tox
- name: "Upload coverage report for unit tests"
uses: actions/upload-artifact@v3
with:
Expand All @@ -133,4 +147,4 @@ jobs:
uses: codecov/codecov-action@v3
with:
fail_ci_if_error: true
token: ${{ secrets.CODECOV_TOKEN }}
token: ${{ secrets.CODECOV_TOKEN }}
2 changes: 1 addition & 1 deletion streamflow/core/deployment.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ def _init_workdir(deployment_name: str) -> str:
if deployment_name != LOCAL_LOCATION:
return posixpath.join("/tmp", "streamflow") # nosec
else:
return os.path.join(tempfile.gettempdir(), "streamflow")
return os.path.join(os.path.realpath(tempfile.gettempdir()), "streamflow")


class Location:
Expand Down
7 changes: 4 additions & 3 deletions streamflow/core/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,9 +97,10 @@ def dict_product(**kwargs) -> MutableMapping[Any, Any]:
yield dict(zip(keys, list(instance)))


def encode_command(command: str):
return "echo {command} | base64 -d | sh".format(
command=base64.b64encode(command.encode("utf-8")).decode("utf-8")
def encode_command(command: str, shell: str = "sh"):
return "echo {command} | base64 -d | {shell}".format(
command=base64.b64encode(command.encode("utf-8")).decode("utf-8"),
shell=shell, # nosec
)


Expand Down
148 changes: 79 additions & 69 deletions streamflow/cwl/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from schema_salad.exceptions import ValidationException

from streamflow.core.context import StreamFlowContext
from streamflow.core.deployment import Connector, LOCAL_LOCATION, Target
from streamflow.core.deployment import Connector, Target, LOCAL_LOCATION
from streamflow.core.exception import (
WorkflowDefinitionException,
WorkflowExecutionException,
Expand Down Expand Up @@ -151,82 +151,92 @@ async def _process_file_token(
except ValidationException as e:
raise WorkflowExecutionException(e.message) from e
# If file exists, get coordinates
if filepath and (
data_location := self.workflow.context.data_manager.get_source_location(
filepath, LOCAL_LOCATION
)
):
connector = self.workflow.context.deployment_manager.get_connector(
data_location.deployment
)
path_processor = get_path_processor(connector)
base_path = path_processor.normpath(
data_location.path[: -len(data_location.relpath)]
)
# Process file contents
token_value = await utils.update_file_token(
context=self.workflow.context,
connector=connector,
location=data_location,
token_value=token_value,
load_contents=self.load_contents,
load_listing=self.load_listing,
)
# Process secondary files
if token_value.get("secondaryFiles"):
initial_paths = [
utils.get_path_from_token(sf)
for sf in token_value["secondaryFiles"]
]
sf_map = dict(
zip(
initial_paths,
await asyncio.gather(
*(
asyncio.create_task(
utils.update_file_token(
context=self.workflow.context,
connector=connector,
location=data_location,
token_value=sf,
load_contents=self.load_contents,
load_listing=self.load_listing,
if filepath:
try:
# Privilege locations on the destination connector to ensure existence of secondaryFiles
data_locations = self.workflow.context.data_manager.get_data_locations(
path=filepath,
)
data_location = next(
loc for loc in data_locations if loc.path == filepath
)
except StopIteration:
# If such location does not exist, apply the standard heuristic to select the best one
data_location = self.workflow.context.data_manager.get_source_location(
path=filepath, dst_deployment=LOCAL_LOCATION
)
if data_location:
connector = self.workflow.context.deployment_manager.get_connector(
data_location.deployment
)
path_processor = get_path_processor(connector)
base_path = path_processor.normpath(
data_location.path[: -len(data_location.relpath)]
)
# Process file contents
token_value = await utils.update_file_token(
context=self.workflow.context,
connector=connector,
location=data_location,
token_value=token_value,
load_contents=self.load_contents,
load_listing=self.load_listing,
)
# Process secondary files
if token_value.get("secondaryFiles"):
initial_paths = [
utils.get_path_from_token(sf)
for sf in token_value["secondaryFiles"]
]
sf_map = dict(
zip(
initial_paths,
await asyncio.gather(
*(
asyncio.create_task(
utils.update_file_token(
context=self.workflow.context,
connector=connector,
location=data_location,
token_value=sf,
load_contents=self.load_contents,
load_listing=self.load_listing,
)
)
for sf in token_value["secondaryFiles"]
)
for sf in token_value["secondaryFiles"]
)
),
),
)
)
)
else:
sf_map = {}
if self.secondary_files:
sf_context = {**context, "self": token_value}
await utils.process_secondary_files(
else:
sf_map = {}
if self.secondary_files:
sf_context = {**context, "self": token_value}
await utils.process_secondary_files(
context=self.workflow.context,
secondary_files=self.secondary_files,
sf_map=sf_map,
js_context=sf_context,
full_js=self.full_js,
expression_lib=self.expression_lib,
connector=connector,
locations=[data_location],
token_value=token_value,
load_contents=self.load_contents,
load_listing=self.load_listing,
only_retrieve_from_token=self.only_propagate_secondary_files,
)
# Add all secondary files to the token
if sf_map:
token_value["secondaryFiles"] = list(sf_map.values())
# Register path
await utils.register_data(
context=self.workflow.context,
secondary_files=self.secondary_files,
sf_map=sf_map,
js_context=sf_context,
full_js=self.full_js,
expression_lib=self.expression_lib,
connector=connector,
locations=[data_location],
token_value=token_value,
load_contents=self.load_contents,
load_listing=self.load_listing,
only_retrieve_from_token=self.only_propagate_secondary_files,
base_path=base_path,
)
# Add all secondary files to the token
if sf_map:
token_value["secondaryFiles"] = list(sf_map.values())
# Register path
await utils.register_data(
context=self.workflow.context,
connector=connector,
locations=[data_location],
token_value=token_value,
base_path=base_path,
)
# Return token value
return token_value

Expand Down
10 changes: 6 additions & 4 deletions streamflow/cwl/translator.py
Original file line number Diff line number Diff line change
Expand Up @@ -1039,19 +1039,21 @@ def _process_docker_requirement(
"image": image_name,
"logDriver": "none",
"network": "default" if network_access else "none",
"volume": [f"{target.workdir}:{target.workdir}"],
"volume": [f"{target.workdir}:/tmp/streamflow"],
}
# Manage dockerOutputDirectory directive
if "dockerOutputDirectory" in docker_requirement:
docker_config["workdir"] = docker_requirement["dockerOutputDirectory"]
context["output_directory"] = docker_config["workdir"]
local_dir = os.path.join(tempfile.gettempdir(), "streamflow", random_name())
local_dir = os.path.join(
os.path.realpath(tempfile.gettempdir()), "streamflow", random_name()
)
os.makedirs(local_dir, exist_ok=True)
docker_config["volume"].append(f"{local_dir}:{docker_config['workdir']}")
# Build step target
deployment = DeploymentConfig(name=name, type="docker", config=docker_config)
step_target = Target(
deployment=deployment, service=image_name, workdir=target.workdir
step_target = Target( # nosec
deployment=deployment, service=image_name, workdir="/tmp/streamflow"
)
return step_target

Expand Down
5 changes: 4 additions & 1 deletion streamflow/deployment/connector/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,9 @@ def _get_run_command(
) -> str:
...

def _get_shell(self) -> str:
return "sh"

def _get_stream_reader(self, location: Location, src: str) -> StreamWrapperContext:
dirname, basename = posixpath.split(src)
return SubprocessStreamReaderWrapperContext(
Expand Down Expand Up @@ -369,7 +372,7 @@ async def run(
job=f"for job {job_name}" if job_name else "",
)
)
command = utils.encode_command(command)
command = utils.encode_command(command, self._get_shell())
run_command = self._get_run_command(command, location)
proc = await asyncio.create_subprocess_exec(
*shlex.split(run_command),
Expand Down
17 changes: 14 additions & 3 deletions streamflow/deployment/connector/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,17 @@ def _get_run_command(
self, command: str, location: Location, interactive: bool = False
):
if sys.platform == "win32":
return f"cmd /C '{command}'"
return f"{self._get_shell()} /C '{command}'"
else:
return f"sh -c '{command}'"
return f"{self._get_shell()} -c '{command}'"

def _get_shell(self) -> str:
if sys.platform == "win32":
return "cmd"
elif sys.platform == "darwin":
return "bash"
else:
return "sh"

async def _copy_remote_to_remote(
self,
Expand Down Expand Up @@ -64,7 +72,10 @@ async def _copy_remote_to_remote(
)

async def deploy(self, external: bool) -> None:
os.makedirs(os.path.join(tempfile.gettempdir(), "streamflow"), exist_ok=True)
os.makedirs(
os.path.join(os.path.realpath(tempfile.gettempdir()), "streamflow"),
exist_ok=True,
)

async def get_available_locations(
self,
Expand Down
5 changes: 4 additions & 1 deletion streamflow/recovery/checkpoint_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,10 @@ class DefaultCheckpointManager(CheckpointManager):
def __init__(self, context: StreamFlowContext, checkpoint_dir: str | None = None):
super().__init__(context)
self.checkpoint_dir = checkpoint_dir or os.path.join(
tempfile.gettempdir(), "streamflow", "checkpoint", utils.random_name()
os.path.realpath(tempfile.gettempdir()),
"streamflow",
"checkpoint",
utils.random_name(),
)
self.copy_tasks: MutableSequence = []

Expand Down
5 changes: 3 additions & 2 deletions tests/conftest.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import asyncio
import os
import tempfile
from asyncio.locks import Lock
from collections.abc import Iterable
Expand Down Expand Up @@ -42,7 +43,7 @@ def get_docker_deployment_config():
@pytest_asyncio.fixture(scope="session")
async def context() -> StreamFlowContext:
context = build_context(
tempfile.gettempdir(),
os.path.realpath(tempfile.gettempdir()),
{"database": {"type": "default", "config": {"connection": ":memory:"}}},
)
await context.deployment_manager.deploy(
Expand All @@ -52,7 +53,7 @@ async def context() -> StreamFlowContext:
config={},
external=True,
lazy=False,
workdir=tempfile.gettempdir(),
workdir=os.path.realpath(tempfile.gettempdir()),
)
)
await context.deployment_manager.deploy(get_docker_deployment_config())
Expand Down

0 comments on commit b583419

Please sign in to comment.