Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Single-partition Dask executor for cuDF-Polars #17262

Draft
wants to merge 46 commits into
base: branch-24.12
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 37 commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
a590076
cleanup
rjzamora Nov 6, 2024
7f1bec7
rename to parallel
rjzamora Nov 7, 2024
023e085
Merge branch 'branch-24.12' into cudf-polars-dask-simple
rjzamora Nov 7, 2024
e7a2fce
Merge branch 'branch-24.12' into cudf-polars-dask-simple
rjzamora Nov 7, 2024
69a3374
simplify solution
rjzamora Nov 7, 2024
6aa3694
Merge branch 'cudf-polars-dask-simple' of github.com:rjzamora/cudf in…
rjzamora Nov 7, 2024
ea22a9a
Merge branch 'branch-24.12' into cudf-polars-dask-simple
rjzamora Nov 7, 2024
915a779
deeper dive
rjzamora Nov 8, 2024
bd9d783
improve simple agg reduction
rjzamora Nov 8, 2024
7363d91
cleanup fundamental bugs
rjzamora Nov 10, 2024
58ee5f4
move PartitionInfo
rjzamora Nov 10, 2024
ecc51ef
add Literal
rjzamora Nov 10, 2024
75eae0c
Merge branch 'branch-24.12' into cudf-polars-dask-simple
rjzamora Nov 12, 2024
fb2d6bf
add lower_ir_graph
rjzamora Nov 12, 2024
c17564c
Merge remote-tracking branch 'upstream/branch-24.12' into cudf-polars…
rjzamora Nov 12, 2024
6e66998
strip out most exploratory logic
rjzamora Nov 12, 2024
c41723d
Merge branch 'branch-24.12' into cudf-polars-dask-simple
rjzamora Nov 12, 2024
d774f38
Merge branch 'branch-24.12' into cudf-polars-dask-simple
rjzamora Nov 13, 2024
6886f8d
Add basic Dask evaluate test
pentschev Nov 13, 2024
29b2d7b
Replace environment variable with new `"executor"` config
pentschev Nov 13, 2024
3a68a6d
Add kwarg to specify executor in `assert_gpu_result_equal`
pentschev Nov 13, 2024
8079ac0
Add couple of Dask executor tests
pentschev Nov 13, 2024
6f7ccee
Merge remote-tracking branch 'upstream/branch-24.12' into cudf-polars…
pentschev Nov 13, 2024
af4c5f5
Merge remote-tracking branch 'rjzamora/cudf-polars-dask-simple' into …
pentschev Nov 13, 2024
8aed94f
Improve `count` code
pentschev Nov 13, 2024
aadaf10
Pass `executor` to `GPUEngine` in `assert_gpu_result_equal`
pentschev Nov 13, 2024
c3a6907
Merge remote-tracking branch 'upstream/branch-24.12' into cudf-polars…
pentschev Nov 14, 2024
4f67819
Merge branch 'branch-24.12' into cudf-polars-dask-simple
rjzamora Nov 14, 2024
c8ca09e
Clarify intent renaming executor to "dask-experimental"
pentschev Nov 14, 2024
3fd51bb
move PartitionInfo out of ir module
rjzamora Nov 14, 2024
bf182e4
Merge remote-tracking branch 'rjzamora/cudf-polars-dask-simple' into …
pentschev Nov 14, 2024
453e274
skip coverage on sanity-check errors
rjzamora Nov 14, 2024
2b74f28
Add `--executor` to pytest
pentschev Nov 14, 2024
6d3cd55
Merge remote-tracking branch 'rjzamora/cudf-polars-dask-simple' into …
pentschev Nov 14, 2024
2398a2e
Enable dask-experimental tests in CI, remove duplicates
pentschev Nov 14, 2024
9aa479a
Fix wrong protocol name in deserialization test
pentschev Nov 14, 2024
64ea98e
Merge remote-tracking branch 'upstream/branch-24.12' into cudf-polars…
pentschev Nov 14, 2024
22678a5
Remove `executor` kwarg from `assert_gpu_result_equal`
pentschev Nov 14, 2024
41441ca
Merge remote-tracking branch 'upstream/branch-24.12' into cudf-polars…
rjzamora Nov 15, 2024
efadb78
Reintroduce `executor` kwarg in `assert_gpu_result_equal`
pentschev Nov 15, 2024
9b78d8f
Add basic tests for all executors to ensure 100% coverage
pentschev Nov 15, 2024
c54c217
Merge remote-tracking branch 'rjzamora/cudf-polars-dask-simple' into …
pentschev Nov 15, 2024
70da7a9
Merge remote-tracking branch 'upstream/branch-24.12' into cudf-polars…
pentschev Nov 15, 2024
3aeb1e4
Fix `executor` in `assert_gpu_result_equal`
pentschev Nov 18, 2024
485a161
Merge remote-tracking branch 'upstream/branch-24.12' into cudf-polars…
pentschev Nov 18, 2024
eb41100
Merge remote-tracking branch 'upstream/branch-24.12' into cudf-polars…
rjzamora Nov 19, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions ci/run_cudf_polars_pytests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,8 @@ set -euo pipefail
# Support invoking run_cudf_polars_pytests.sh outside the script directory
cd "$(dirname "$(realpath "${BASH_SOURCE[0]}")")"/../python/cudf_polars/

# Test the default "cudf" executor
python -m pytest --cache-clear "$@" tests

# Test the "dask-experimental" executor
python -m pytest --cache-clear "$@" tests --executor dask-experimental
rjzamora marked this conversation as resolved.
Show resolved Hide resolved
21 changes: 18 additions & 3 deletions python/cudf_polars/cudf_polars/callback.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ def _callback(
*,
device: int | None,
memory_resource: int | None,
executor: str | None,
) -> pl.DataFrame:
assert with_columns is None
assert pyarrow_predicate is None
Expand All @@ -145,7 +146,14 @@ def _callback(
set_device(device),
set_memory_resource(memory_resource),
):
return ir.evaluate(cache={}).to_polars()
if executor is None or executor == "cudf":
return ir.evaluate(cache={}).to_polars()
elif executor == "dask-experimental":
from cudf_polars.experimental.parallel import evaluate_dask

return evaluate_dask(ir).to_polars()
else:
raise ValueError(f"Unknown executor '{executor}'")


def execute_with_cudf(nt: NodeTraverser, *, config: GPUEngine) -> None:
Expand Down Expand Up @@ -174,7 +182,8 @@ def execute_with_cudf(nt: NodeTraverser, *, config: GPUEngine) -> None:
device = config.device
memory_resource = config.memory_resource
raise_on_fail = config.config.get("raise_on_fail", False)
if unsupported := (config.config.keys() - {"raise_on_fail"}):
executor = config.config.get("executor", None)
if unsupported := (config.config.keys() - {"raise_on_fail", "executor"}):
raise ValueError(
f"Engine configuration contains unsupported settings {unsupported}"
)
Expand All @@ -200,5 +209,11 @@ def execute_with_cudf(nt: NodeTraverser, *, config: GPUEngine) -> None:
raise exception
else:
nt.set_udf(
partial(_callback, ir, device=device, memory_resource=memory_resource)
partial(
_callback,
ir,
device=device,
memory_resource=memory_resource,
executor=executor,
)
)
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ def __init__(
*by: Expr,
) -> None:
self.dtype = dtype
self.options = options
self.options = (options[0], tuple(options[1]), tuple(options[2]))
self.children = (column, *by)

def do_evaluate(
Expand Down
26 changes: 18 additions & 8 deletions python/cudf_polars/cudf_polars/dsl/ir.py
Original file line number Diff line number Diff line change
Expand Up @@ -1554,13 +1554,20 @@ def __init__(self, schema: Schema, name: str, options: Any, df: IR):
self.options = (
tuple(indices),
tuple(pivotees),
(variable_name, schema[variable_name]),
(value_name, schema[value_name]),
variable_name,
value_name,
)
self._non_child_args = (name, self.options)
self._non_child_args = (schema, name, self.options)

def get_hashable(self) -> Hashable:
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
def get_hashable(self) -> Hashable:
def get_hashable(self) -> Hashable: # pragma: no cover; Needed by experimental

Pretty sure this is lowering test coverage.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I introduced basic testing for all executors independent of --executor pytest argument to ensure 100% coverage always.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See 9b78d8f .

"""Hashable representation of the node."""
schema_hash = tuple(self.schema.items())
return (type(self), schema_hash, self.name, str(self.options), *self.children)

@classmethod
def do_evaluate(cls, name: str, options: Any, df: DataFrame) -> DataFrame:
def do_evaluate(
cls, schema: Schema, name: str, options: Any, df: DataFrame
) -> DataFrame:
"""Evaluate and return a dataframe."""
if name == "rechunk":
# No-op in our data model
Expand All @@ -1582,8 +1589,8 @@ def do_evaluate(cls, name: str, options: Any, df: DataFrame) -> DataFrame:
(
indices,
pivotees,
(variable_name, variable_dtype),
(value_name, value_dtype),
variable_name,
value_name,
) = options
npiv = len(pivotees)
index_columns = [
Expand All @@ -1600,15 +1607,18 @@ def do_evaluate(cls, name: str, options: Any, df: DataFrame) -> DataFrame:
plc.interop.from_arrow(
pa.array(
pivotees,
type=plc.interop.to_arrow(variable_dtype),
type=plc.interop.to_arrow(schema[variable_name]),
rjzamora marked this conversation as resolved.
Show resolved Hide resolved
),
)
]
),
df.num_rows,
).columns()
value_column = plc.concatenate.concatenate(
[df.column_map[pivotee].astype(value_dtype).obj for pivotee in pivotees]
[
df.column_map[pivotee].astype(schema[value_name]).obj
for pivotee in pivotees
]
)
return DataFrame(
[
Expand Down
152 changes: 152 additions & 0 deletions python/cudf_polars/cudf_polars/experimental/parallel.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES.
# SPDX-License-Identifier: Apache-2.0
"""Partitioned LogicalPlan nodes."""

from __future__ import annotations

from functools import singledispatch
from typing import TYPE_CHECKING, Any

from cudf_polars.dsl.expr import NamedExpr
from cudf_polars.dsl.traversal import reuse_if_unchanged, traversal

if TYPE_CHECKING:
from collections.abc import MutableMapping

from cudf_polars.containers import DataFrame
from cudf_polars.dsl.ir import IR
from cudf_polars.dsl.nodebase import Node


class PartitionInfo:
"""
Partitioning information.

This class only tracks the partition count (for now).
"""

__slots__ = ("count",)

def __init__(self, count: int):
self.count = count


# The hash of an IR object must always map to a
# unique PartitionInfo object, and we can cache
# this mapping until evaluation is complete.
_IR_PARTS_CACHE: MutableMapping[int, PartitionInfo] = {}


def _clear_parts_info_cache() -> None:
"""Clear cached partitioning information."""
_IR_PARTS_CACHE.clear()


def get_key_name(node: Node | NamedExpr) -> str:
"""Generate the key name for a Node."""
if isinstance(node, NamedExpr):
return f"named-{get_key_name(node.value)}" # pragma: no cover
return f"{type(node).__name__.lower()}-{hash(node)}"


@singledispatch
def lower_ir_node(ir: IR, rec) -> IR:
"""Rewrite an IR node with proper partitioning."""
# Return same node by default
return reuse_if_unchanged(ir, rec)


def lower_ir_graph(ir: IR) -> IR:
"""Rewrite an IR graph with proper partitioning."""
from cudf_polars.dsl.traversal import CachingVisitor

mapper = CachingVisitor(lower_ir_node)
return mapper(ir)


def _default_ir_parts_info(ir: IR) -> PartitionInfo:
# Single-partition default behavior.
# This is used by `_ir_parts_info` for all unregistered IR sub-types.
count = max((ir_parts_info(child).count for child in ir.children), default=1)
if count > 1:
raise NotImplementedError(
f"Class {type(ir)} does not support multiple partitions."
) # pragma: no cover
return PartitionInfo(count=count)


@singledispatch
def _ir_parts_info(ir: IR) -> PartitionInfo:
"""IR partitioning-info dispatch."""
return _default_ir_parts_info(ir)


def ir_parts_info(ir: IR) -> PartitionInfo:
rjzamora marked this conversation as resolved.
Show resolved Hide resolved
"""Return the partitioning info for an IR node."""
key = hash(ir)
try:
return _IR_PARTS_CACHE[key]
except KeyError:
_IR_PARTS_CACHE[key] = _ir_parts_info(ir)
return _IR_PARTS_CACHE[key]


def _default_ir_tasks(ir: IR) -> MutableMapping[Any, Any]:
# Single-partition default behavior.
# This is used by `generate_ir_tasks` for all unregistered IR sub-types.
if ir_parts_info(ir).count > 1:
raise NotImplementedError(
f"Failed to generate multiple output tasks for {ir}."
) # pragma: no cover

child_names = []
for child in ir.children:
child_names.append(get_key_name(child))
if ir_parts_info(child).count > 1:
raise NotImplementedError(
f"Failed to generate tasks for {ir} with child {child}."
) # pragma: no cover

key_name = get_key_name(ir)
return {
(key_name, 0): (
ir.do_evaluate,
*ir._non_child_args,
*((child_name, 0) for child_name in child_names),
)
}


@singledispatch
def generate_ir_tasks(ir: IR) -> MutableMapping[Any, Any]:
"""
Generate tasks for an IR node.

An IR node only needs to generate the graph for
the current IR logic (not including child IRs).
"""
return _default_ir_tasks(ir)


def task_graph(_ir: IR) -> tuple[MutableMapping[str, Any], str]:
"""Construct a Dask-compatible task graph."""
ir: IR = lower_ir_graph(_ir)

graph = {
k: v
for layer in [generate_ir_tasks(n) for n in traversal(ir)]
for k, v in layer.items()
}
key_name = get_key_name(ir)
graph[key_name] = (key_name, 0)

_clear_parts_info_cache()
return graph, key_name


def evaluate_dask(ir: IR) -> DataFrame:
"""Evaluate an IR graph with Dask."""
from dask import get
rjzamora marked this conversation as resolved.
Show resolved Hide resolved

graph, key = task_graph(ir)
return get(graph, key)
10 changes: 9 additions & 1 deletion python/cudf_polars/cudf_polars/testing/asserts.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@
__all__: list[str] = ["assert_gpu_result_equal", "assert_ir_translation_raises"]


# Will be overriden by `conftest.py` with the value from the `--executor`
# command-line argument
Executor = None


def assert_gpu_result_equal(
lazydf: pl.LazyFrame,
*,
Expand All @@ -33,6 +38,7 @@ def assert_gpu_result_equal(
rtol: float = 1e-05,
atol: float = 1e-08,
categorical_as_str: bool = False,
executor: str | None = None,
) -> None:
"""
Assert that collection of a lazyframe on GPU produces correct results.
Expand Down Expand Up @@ -68,6 +74,8 @@ def assert_gpu_result_equal(
Absolute tolerance for float comparisons
categorical_as_str
Decat categoricals to strings before comparing
executor
The executor configuration to pass to `GPUEngine`

Raises
------
Expand All @@ -81,7 +89,7 @@ def assert_gpu_result_equal(
)

expect = lazydf.collect(**final_polars_collect_kwargs)
engine = GPUEngine(raise_on_fail=True)
engine = GPUEngine(raise_on_fail=True, executor=Executor)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be something like executor=executor or Executor?
Right now, it seems like the executor is always ignored.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was a leftover from a previous change, I intended to remove the executor kwarg. I've done that now in 22678a5, but we may want to change this still depending on how the discussion in #17262 (comment) goes.

got = lazydf.collect(**final_cudf_collect_kwargs, engine=engine)
assert_frame_equal(
expect,
Expand Down
16 changes: 16 additions & 0 deletions python/cudf_polars/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,19 @@
@pytest.fixture(params=[False, True], ids=["no_nulls", "nulls"], scope="session")
def with_nulls(request):
return request.param


def pytest_addoption(parser):
parser.addoption(
"--executor",
action="store",
default="cudf",
choices=("cudf", "dask-experimental"),
help="Executor to use for GPUEngine.",
)


def pytest_configure(config):
import cudf_polars.testing.asserts

cudf_polars.testing.asserts.Executor = config.getoption("--executor")
24 changes: 24 additions & 0 deletions python/cudf_polars/tests/experimental/test_parallel.py
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's a good idea to keep parallel tests here.

With that said, I wonder if it makes sense to somehow run the entire test suite with executor="dask" when dask is installed? (not sure how this would work, but all tests should technically work with a single partition)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, apparently tests do work, I just copied a couple for some initial testing, I didn't want to duplicate everything. We do have a few options if we want to test everything:

  1. Explicitly parametrize all tests with executor: [None, "dask"] (None and "cudf" both mean the "default" executor);
  2. Add some sort of fixture to automatically parametrize tests with both executors;
  3. Add a pytest argument to control the behavior of 2 so that we can only enable Dask tests explicitly, at least for now and later turn it on by default;
  4. Others?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add some sort of fixture to automatically parametrize tests with both executors;

We probably don't need to test everything for this specific PR. However, I think it may make sense to go in this direction pretty soon. We will probably want to make sure that single-partition execution continues working for the
entire test suite as multi-partition support is added.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@wence- @rjzamora I have made the changes we discussed earlier today in 2b74f28 . It adds a new --executor pytest command-line argument that has the default value of "cudf" (default executor) but allows us to run with --executor dask-experimental (I've also renamed from "dask" to "dask-experimental" in c8ca09e, as discussed as well) to rerun the test suite with that executor. The caveat is that to be the least intrusive as possible in the API I had to add an Executor variable to cudf_polars.testing.asserts, which allows us to modify it upon pytest entry in the pytest_configure function in conftest.py. The advantage of this approach is we don't need to force the user to always specify the executor to assert_gpu_result_equal via its API (and thus prevent things like forgetting to pass it), but the obvious downside is the need to modify the cudf_polars.testing.asserts.Executor module variable which always feels as a bit of a hacky solution.

I'm happy to change this to whatever way you feel may suit best, or if you can think of a better solution please let me know too.

Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES.
# SPDX-License-Identifier: Apache-2.0

from __future__ import annotations

import polars as pl
from polars.testing import assert_frame_equal

from cudf_polars import Translator
from cudf_polars.experimental.parallel import evaluate_dask


def test_evaluate_dask():
df = pl.LazyFrame({"a": [1, 2, 3], "b": [3, 4, 5], "c": [5, 6, 7], "d": [7, 9, 8]})

q = df.select(pl.col("a") - (pl.col("b") + pl.col("c") * 2), pl.col("d")).sort("d")

qir = Translator(q._ldf.visit()).translate_ir()

expected = qir.evaluate(cache={}).to_polars()

got = evaluate_dask(qir).to_polars()

assert_frame_equal(expected, got)
Loading