Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Creation of CI artifacts for cudf-polars wheels #16680

Merged
merged 48 commits into from
Sep 17, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
48 commits
Select commit Hold shift + click to select a range
7742b8b
Avoid GPU initialisation during import
wence- Jul 26, 2024
ef0b49f
Require polars >= 1.3
wence- Jul 29, 2024
e9fd96d
Adapt to IR changes
wence- Jul 25, 2024
9d69621
Use new IR versioning to report if we don't support an IR version
wence- Jul 26, 2024
918a40e
Use new GPUEngine config object to set things up
wence- Jul 22, 2024
f8f2d0d
Plausibly provide useful error message if driver is too old
wence- Jul 26, 2024
bcedb6b
Update overview docs
wence- Jul 24, 2024
6f2d406
Support right join
wence- Jul 29, 2024
f3bbd3f
Test that invalid GPUEngine config raises
wence- Jul 29, 2024
1d4c30c
More coverage for gpuengine config
wence- Jul 29, 2024
abcf22b
Versioned handling of PythonScan translation
wence- Jul 30, 2024
62a5dbd
Merge pull request #16347 from wence-/wence/fea/polars-engine-config
wence- Aug 2, 2024
7d0c7ad
Adapt to IR changes in polars 1.4 (#16494)
lithomas1 Aug 5, 2024
5de29b3
Implement polars string Replace and ReplaceMany (#16039)
lithomas1 Aug 6, 2024
7f6b00f
Use a key column rather than a placeholder for count agg
wence- Aug 19, 2024
822e7d0
Backport: Remove cuDF dependency from pylibcudf column from_device te…
lithomas1 Aug 20, 2024
152111b
Implement scan-based whole-frame aggregations for cudf-polars (#16509)
lithomas1 Aug 20, 2024
13a1493
Merge pull request #16599 from wence/fix/remove-placeholder-column
wence- Aug 21, 2024
7cf3289
Implement order preserving groupby in cudf-polars (#16555)
lithomas1 Aug 22, 2024
f6c938f
Fix integer overflow in indexalator pointer logic
davidwendt Aug 22, 2024
4ded370
use std::ptrdiff_t
davidwendt Aug 23, 2024
edabb67
Correctly export empty column names in DataFrame.to_polars (#16596)
wence- Aug 27, 2024
a4c35e9
Forward-merge 24.08
wence- Aug 27, 2024
0a95b2c
Add more `cudf-polars` unaryops (#16579)
brandon-b-miller Aug 27, 2024
cc892fc
Merge pull request #16667 from wence-/wence/merge-2408
wence- Aug 27, 2024
41a3a95
Add `pylibcudf`/`cudf-polars` string `strip` (#16504)
brandon-b-miller Aug 27, 2024
0bf68d4
`cudf-polars`/`pylibcudf` string -> date parsing (#16306)
brandon-b-miller Aug 28, 2024
40d33cb
Support quantile in cudf_polars (#16093)
lithomas1 Aug 29, 2024
95da2c5
Implement handlers for first/last in groupby (#16688)
wence- Aug 30, 2024
434afab
Ensure IR validation always checks for empty columns
wence- Aug 30, 2024
385ae98
Need to check for nulls in nested dtypes
wence- Aug 30, 2024
1cf1146
Add test reading nested Null column
wence- Aug 30, 2024
de445a3
Move creation of regex program to initialisation
wence- Aug 30, 2024
f39713e
Merge pull request #16703 from wence-/wence/fea/polars-reject-invalid…
wence- Aug 30, 2024
ad364c6
Include failing node in error message
wence- Aug 30, 2024
d158b22
Merge pull request #16702 from wence-/wence/fea/polars-no-empty-columns
wence- Sep 2, 2024
b550645
Partially reject dynamic groupby (#16720)
wence- Sep 3, 2024
eb2a23e
Implement Kleene logic handling for Any/All and bitwise Or/And (#16476)
wence- Sep 4, 2024
ebc3bbe
Some fixes for unary functions (#16719)
wence- Sep 4, 2024
5d262df
Implement unpivot in cudf-polars (#16689)
wence- Sep 4, 2024
c76e90b
Small scan-handler fixes (#16721)
wence- Sep 4, 2024
ccb8061
Implement cudf-polars datetime extraction methods (#16500)
lithomas1 Sep 5, 2024
feb2e63
Polars 1.7 will change a minor thing in the IR, adapt to that (#16755)
wence- Sep 6, 2024
6d2e455
Run polars test suite (defaulting to GPU) in CI (#16710)
wence- Sep 6, 2024
1b5cb1a
skip test_groupby_literal_in_agg if polars>=1.7.1
brandon-b-miller Sep 16, 2024
b6a110e
API Doc for Polars GPU Engine (#16753)
singhmanas1 Sep 16, 2024
3b7ffb8
test in polars 1.7.0 environment
brandon-b-miller Sep 16, 2024
9428154
Revert "skip test_groupby_literal_in_agg if polars>=1.7.1"
brandon-b-miller Sep 16, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Adapt to IR changes in polars 1.4 (#16494)
## Description
<!-- Provide a standalone description of changes in this PR. -->
<!-- Reference any issues closed by this PR with "closes #1234". -->
<!-- Note: The pull request title will be included in the CHANGELOG. -->

Adapts to IR changes in polars 1.4 and handles nrows/skiprows a little
more correctly.

## Checklist
- [ ] I am familiar with the [Contributing
Guidelines](https://github.com/rapidsai/cudf/blob/HEAD/CONTRIBUTING.md).
- [ ] New or existing tests cover these changes.
- [ ] The documentation is up to date with these changes.

---------

Co-authored-by: Lawrence Mitchell <lmitchell@nvidia.com>
  • Loading branch information
lithomas1 and wence- authored Aug 5, 2024
commit 7d0c7ad54528b96c36d0fdbdc00e644322b9b0fd
46 changes: 29 additions & 17 deletions python/cudf_polars/cudf_polars/dsl/ir.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,15 +190,14 @@ class Scan(IR):
"""Cloud-related authentication options, currently ignored."""
paths: list[str]
"""List of paths to read from."""
file_options: Any
"""Options for reading the file.

Attributes are:
- ``with_columns: list[str]`` of projected columns to return.
- ``n_rows: int``: Number of rows to read.
- ``row_index: tuple[name, offset] | None``: Add an integer index
column with given name.
"""
with_columns: list[str]
"""Projected columns to return."""
skip_rows: int
"""Rows to skip at the start when reading."""
n_rows: int
"""Number of rows to read after skipping."""
row_index: tuple[str, int] | None
"""If not None add an integer index column of the given name."""
predicate: expr.NamedExpr | None
"""Mask to apply to the read dataframe."""

Expand All @@ -208,8 +207,16 @@ def __post_init__(self) -> None:
# This line is unhittable ATM since IPC/Anonymous scan raise
# on the polars side
raise NotImplementedError(f"Unhandled scan type: {self.typ}")
if self.typ == "ndjson" and self.file_options.n_rows is not None:
raise NotImplementedError("row limit in scan")
if self.typ == "ndjson" and (self.n_rows != -1 or self.skip_rows != 0):
raise NotImplementedError("row limit in scan for json reader")
if self.skip_rows < 0:
# TODO: polars has this implemented for parquet,
# maybe we can do this too?
raise NotImplementedError("slice pushdown for negative slices")
if self.typ == "csv" and self.skip_rows != 0: # pragma: no cover
# This comes from slice pushdown, but that
# optimization doesn't happen right now
raise NotImplementedError("skipping rows in CSV reader")
if self.cloud_options is not None and any(
self.cloud_options.get(k) is not None for k in ("aws", "azure", "gcp")
):
Expand Down Expand Up @@ -246,10 +253,9 @@ def __post_init__(self) -> None:

def evaluate(self, *, cache: MutableMapping[int, DataFrame]) -> DataFrame:
"""Evaluate and return a dataframe."""
options = self.file_options
with_columns = options.with_columns
row_index = options.row_index
nrows = self.file_options.n_rows if self.file_options.n_rows is not None else -1
with_columns = self.with_columns
row_index = self.row_index
n_rows = self.n_rows
if self.typ == "csv":
parse_options = self.reader_options["parse_options"]
sep = chr(parse_options["separator"])
Expand Down Expand Up @@ -283,6 +289,7 @@ def evaluate(self, *, cache: MutableMapping[int, DataFrame]) -> DataFrame:

# polars skips blank lines at the beginning of the file
pieces = []
read_partial = n_rows != -1
for p in self.paths:
skiprows = self.reader_options["skip_rows"]
path = Path(p)
Expand All @@ -304,9 +311,13 @@ def evaluate(self, *, cache: MutableMapping[int, DataFrame]) -> DataFrame:
comment=comment,
decimal=decimal,
dtypes=self.schema,
nrows=nrows,
nrows=n_rows,
)
pieces.append(tbl_w_meta)
if read_partial:
n_rows -= tbl_w_meta.tbl.num_rows()
if n_rows <= 0:
break
tables, colnames = zip(
*(
(piece.tbl, piece.column_names(include_children=False))
Expand All @@ -321,7 +332,8 @@ def evaluate(self, *, cache: MutableMapping[int, DataFrame]) -> DataFrame:
tbl_w_meta = plc.io.parquet.read_parquet(
plc.io.SourceInfo(self.paths),
columns=with_columns,
num_rows=nrows,
num_rows=n_rows,
skip_rows=self.skip_rows,
)
df = DataFrame.from_table(
tbl_w_meta.tbl,
Expand Down
25 changes: 22 additions & 3 deletions python/cudf_polars/cudf_polars/dsl/translate.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ def _translate_ir(
def _(
node: pl_ir.PythonScan, visitor: NodeTraverser, schema: dict[str, plc.DataType]
) -> ir.IR:
if visitor.version()[0] == 1: # pragma: no cover
if visitor.version()[0] == 1:
# https://github.com/pola-rs/polars/pull/17939
# Versioning can be dropped once polars 1.4 is lowest
# supported version.
Expand All @@ -87,7 +87,7 @@ def _(
if predicate is not None
else None
)
else:
else: # pragma: no cover; CI tests 1.4
# version == 0
options = node.options
predicate = (
Expand All @@ -108,13 +108,32 @@ def _(
cloud_options = None
else:
reader_options, cloud_options = map(json.loads, options)
file_options = node.file_options
with_columns = file_options.with_columns
n_rows = file_options.n_rows
if n_rows is None:
n_rows = -1 # All rows
skip_rows = 0 # Don't skip
else:
if visitor.version() >= (1, 0):
# Polars 1.4 n_rows property is (skip, nrows)
skip_rows, n_rows = n_rows
else: # pragma: no cover; CI tests 1.4
# Polars 1.3 n_rows property is integer, skip rows was
# always zero because it was not pushed down to reader.
skip_rows = 0

row_index = file_options.row_index
return ir.Scan(
schema,
typ,
reader_options,
cloud_options,
node.paths,
node.file_options,
with_columns,
skip_rows,
n_rows,
row_index,
translate_named_expr(visitor, n=node.predicate)
if node.predicate is not None
else None,
Expand Down
52 changes: 46 additions & 6 deletions python/cudf_polars/tests/test_scan.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,22 @@ def mask(request):
return request.param


@pytest.fixture(
params=[
None,
(1, 1),
],
ids=[
"no-slice",
"slice-second",
],
)
def slice(request):
# For use in testing that we handle
# polars slice pushdown correctly
return request.param


def make_source(df, path, format):
"""
Writes the passed polars df to a file of
Expand All @@ -78,7 +94,9 @@ def make_source(df, path, format):
("parquet", pl.scan_parquet),
],
)
def test_scan(tmp_path, df, format, scan_fn, row_index, n_rows, columns, mask, request):
def test_scan(
tmp_path, df, format, scan_fn, row_index, n_rows, columns, mask, slice, request
):
name, offset = row_index
make_source(df, tmp_path / "file", format)
request.applymarker(
Expand All @@ -93,13 +111,25 @@ def test_scan(tmp_path, df, format, scan_fn, row_index, n_rows, columns, mask, r
row_index_offset=offset,
n_rows=n_rows,
)
if slice is not None:
q = q.slice(*slice)
if mask is not None:
q = q.filter(mask)
if columns is not None:
q = q.select(*columns)
assert_gpu_result_equal(q)


def test_negative_slice_pushdown_raises(tmp_path):
df = pl.DataFrame({"a": [1, 2, 3]})

df.write_parquet(tmp_path / "df.parquet")
q = pl.scan_parquet(tmp_path / "df.parquet")
# Take the last row
q = q.slice(-1, 1)
assert_ir_translation_raises(q, NotImplementedError)


def test_scan_unsupported_raises(tmp_path):
df = pl.DataFrame({"a": [1, 2, 3]})

Expand Down Expand Up @@ -154,15 +184,25 @@ def test_scan_csv_column_renames_projection_schema(tmp_path):
("test*.csv", False),
],
)
def test_scan_csv_multi(tmp_path, filename, glob):
@pytest.mark.parametrize(
"nrows_skiprows",
[
(None, 0),
(1, 1),
(3, 0),
(4, 2),
],
)
def test_scan_csv_multi(tmp_path, filename, glob, nrows_skiprows):
n_rows, skiprows = nrows_skiprows
with (tmp_path / "test1.csv").open("w") as f:
f.write("""foo,bar,baz\n1,2\n3,4,5""")
f.write("""foo,bar,baz\n1,2,3\n3,4,5""")
with (tmp_path / "test2.csv").open("w") as f:
f.write("""foo,bar,baz\n1,2\n3,4,5""")
f.write("""foo,bar,baz\n1,2,3\n3,4,5""")
with (tmp_path / "test*.csv").open("w") as f:
f.write("""foo,bar,baz\n1,2\n3,4,5""")
f.write("""foo,bar,baz\n1,2,3\n3,4,5""")
os.chdir(tmp_path)
q = pl.scan_csv(filename, glob=glob)
q = pl.scan_csv(filename, glob=glob, n_rows=n_rows, skip_rows=skiprows)

assert_gpu_result_equal(q)

Expand Down
Loading