Skip to content

Commit

Permalink
dask order no longer needs dependencies copy
Browse files Browse the repository at this point in the history
  • Loading branch information
fjetter committed Sep 3, 2024
1 parent a4fae21 commit 0ef6e89
Showing 1 changed file with 6 additions and 21 deletions.
27 changes: 6 additions & 21 deletions distributed/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -4623,7 +4623,7 @@ async def add_nanny(self, comm: Comm, address: str) -> None:
def _find_lost_dependencies(
self,
dsk: dict[Key, T_runspec],
dependencies: dict[Key, set[Key]],
dependencies: DependenciesMapping,
keys: set[Key],
) -> set[Key]:
lost_keys = set()
Expand Down Expand Up @@ -4689,7 +4689,7 @@ def _create_taskstate_from_graph(
*,
start: float,
dsk: dict[Key, T_runspec],
dependencies: dict,
dependencies: DependenciesMapping,
keys: set[Key],
ordered: dict[Key, int],
client: str,
Expand Down Expand Up @@ -4838,7 +4838,7 @@ def _create_taskstate_from_graph(
def _remove_done_tasks_from_dsk(
self,
dsk: dict[Key, T_runspec],
dependencies: dict[Key, set[Key]],
dependencies: DependenciesMapping,
) -> None:
# Avoid computation that is already finished
done = set() # tasks that are already done
Expand Down Expand Up @@ -4869,7 +4869,6 @@ def _remove_done_tasks_from_dsk(
stack.append(dep)
for anc in done:
dsk.pop(anc, None)
dependencies.pop(anc, None)

@log_errors
async def update_graph(
Expand Down Expand Up @@ -4927,19 +4926,7 @@ async def update_graph(
dsk = _cull(dsk, keys)

if not internal_priority:
# Removing all non-local keys before calling order()
dsk_keys = set(
dsk
) # intersection() of sets is much faster than dict_keys
stripped_deps = {
k: v.intersection(dsk_keys)
for k, v in dependencies.items()
if k in dsk_keys
}

internal_priority = await offload(
dask.order.order, dsk=dsk, dependencies=stripped_deps
)
internal_priority = await offload(dask.order.order, dsk=dsk)
ordering_done = time()
logger.debug("Ordering done.")

Expand Down Expand Up @@ -9426,7 +9413,7 @@ def _materialize_graph(
global_annotations: dict[str, Any],
validate: bool,
keys: set[Key],
) -> tuple[dict[Key, T_runspec], dict[Key, set[Key]], dict[str, dict[Key, Any]]]:
) -> tuple[dict[Key, T_runspec], DependenciesMapping, dict[str, dict[Key, Any]]]:
dsk: dict = ensure_dict(graph)
if validate:
for k in dsk:
Expand Down Expand Up @@ -9455,9 +9442,7 @@ def _materialize_graph(
logger.debug(
"Removing aliases. Started with %i and got %i left", len(dsk2), len(dsk3)
)
# FIXME: There should be no need to fully materialize and copy this but some
# sections in the scheduler are mutating it.
dependencies = {k: set(v) for k, v in DependenciesMapping(dsk3).items()}
dependencies = DependenciesMapping(dsk3)
return dsk3, dependencies, annotations_by_type


Expand Down

0 comments on commit 0ef6e89

Please sign in to comment.