Skip to content

Globus Compute executor #689

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions cubed/runtime/create.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ def create_executor(name: str, executor_options: Optional[dict] = None) -> Execu
from cubed.runtime.executors.dask import DaskExecutor

return DaskExecutor(**executor_options)
elif name == "globus-compute":
from cubed.runtime.executors.globus_compute import GlobusComputeExecutor

return GlobusComputeExecutor(**executor_options)
elif name == "lithops":
from cubed.runtime.executors.lithops import LithopsExecutor

Expand Down
101 changes: 101 additions & 0 deletions cubed/runtime/executors/globus_compute.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
import asyncio
from typing import Any, Callable, Optional, Sequence

from globus_compute_sdk import Client, Executor
from globus_compute_sdk.serialize import CombinedCode
from networkx import MultiDiGraph

from cubed.runtime.asyncio import async_map_dag
from cubed.runtime.backup import use_backups_default
from cubed.runtime.types import Callback, DagExecutor
from cubed.runtime.utils import asyncio_run
from cubed.spec import Spec


class GlobusComputeExecutor(DagExecutor):
"""An execution engine that uses Globus Compute."""

def __init__(self, **kwargs):
self.kwargs = kwargs

@property
def name(self) -> str:
return "globus-compute"

def execute_dag(
self,
dag: MultiDiGraph,
callbacks: Optional[Sequence[Callback]] = None,
resume: Optional[bool] = None,
spec: Optional[Spec] = None,
compute_id: Optional[str] = None,
**kwargs,
) -> None:
merged_kwargs = {**self.kwargs, **kwargs}
asyncio_run(
self._async_execute_dag(
dag,
callbacks=callbacks,
resume=resume,
spec=spec,
compute_id=compute_id,
**merged_kwargs,
)
)

async def _async_execute_dag(
self,
dag: MultiDiGraph,
callbacks: Optional[Sequence[Callback]] = None,
resume: Optional[bool] = None,
spec: Optional[Spec] = None,
compute_arrays_in_parallel: Optional[bool] = None,
**kwargs,
) -> None:
if spec is not None:
if "use_backups" not in kwargs and use_backups_default(spec):
kwargs["use_backups"] = True

endpoint_id = kwargs.pop("endpoint_id")
client = Client(code_serialization_strategy=CombinedCode())
concurrent_executor = Executor(endpoint_id=endpoint_id, client=client)
try:
create_futures_func = globus_compute_create_futures_func(
concurrent_executor,
run_func_threads,
)
await async_map_dag(
create_futures_func,
dag=dag,
callbacks=callbacks,
resume=resume,
compute_arrays_in_parallel=compute_arrays_in_parallel,
**kwargs,
)
finally:
# don't wait for any cancelled tasks
concurrent_executor.shutdown(wait=False)


def globus_compute_create_futures_func(
concurrent_executor, function: Callable[..., Any]
):
def create_futures_func(input, **kwargs):
return [
(
i,
asyncio.wrap_future(concurrent_executor.submit(function, i, **kwargs)),
)
for i in input
]

return create_futures_func


def run_func_threads(input, func=None, config=None, name=None, compute_id=None):
from cubed.runtime.utils import execute_with_stats

# TODO: can't use the execution_stats decorator since we get:
# AttributeError: 'functools.partial' object has no attribute '__name__'
result, stats = execute_with_stats(func, input, config=config)
return result, stats
8 changes: 8 additions & 0 deletions cubed/tests/utils.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import os
import platform
from typing import Iterable

Expand Down Expand Up @@ -44,6 +45,13 @@
except ImportError:
pass

try:
executor_options = dict(endpoint_id=os.getenv("ENDPOINT_ID", None))
ALL_EXECUTORS.append(create_executor("globus-compute", executor_options))
MAIN_EXECUTORS.append(create_executor("globus-compute", executor_options))
except ImportError:
pass

try:
executor_options = dict(config=LITHOPS_LOCAL_CONFIG, wait_dur_sec=0.1)
ALL_EXECUTORS.append(create_executor("lithops", executor_options))
Expand Down
2 changes: 2 additions & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ ignore_missing_imports = True
ignore_missing_imports = True
[mypy-fsspec.*]
ignore_missing_imports = True
[mypy-globus_compute_sdk.*]
ignore_missing_imports = True
[mypy-icechunk.*]
ignore_missing_imports = True
[mypy-lithops.*]
Expand Down
Loading