Skip to content

Commit bf22c1d

Browse files
authored
Prepare for DF50 (apache#1231)
* ruff fmt * Upgrade to DF50 release candidate * Add support for passing filter and distinct in window() * Update documentation to not use deprecated window fn * Remove crates.io patch * Cargo update
1 parent b7d3519 commit bf22c1d

File tree

11 files changed

+525
-471
lines changed

11 files changed

+525
-471
lines changed

Cargo.lock

Lines changed: 400 additions & 417 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -34,15 +34,15 @@ protoc = [ "datafusion-substrait/protoc" ]
3434
substrait = ["dep:datafusion-substrait"]
3535

3636
[dependencies]
37-
tokio = { version = "1.45", features = ["macros", "rt", "rt-multi-thread", "sync"] }
38-
pyo3 = { version = "0.24", features = ["extension-module", "abi3", "abi3-py39"] }
39-
pyo3-async-runtimes = { version = "0.24", features = ["tokio-runtime"]}
37+
tokio = { version = "1.47", features = ["macros", "rt", "rt-multi-thread", "sync"] }
38+
pyo3 = { version = "0.25", features = ["extension-module", "abi3", "abi3-py39"] }
39+
pyo3-async-runtimes = { version = "0.25", features = ["tokio-runtime"]}
4040
pyo3-log = "0.12.4"
41-
arrow = { version = "55.1.0", features = ["pyarrow"] }
42-
datafusion = { version = "49.0.2", features = ["avro", "unicode_expressions"] }
43-
datafusion-substrait = { version = "49.0.2", optional = true }
44-
datafusion-proto = { version = "49.0.2" }
45-
datafusion-ffi = { version = "49.0.2" }
41+
arrow = { version = "56", features = ["pyarrow"] }
42+
datafusion = { version = "50", features = ["avro", "unicode_expressions"] }
43+
datafusion-substrait = { version = "50", optional = true }
44+
datafusion-proto = { version = "50" }
45+
datafusion-ffi = { version = "50" }
4646
prost = "0.13.1" # keep in line with `datafusion-substrait`
4747
uuid = { version = "1.18", features = ["v4"] }
4848
mimalloc = { version = "0.1", optional = true, default-features = false, features = ["local_dynamic_tls"] }
@@ -54,7 +54,7 @@ log = "0.4.27"
5454

5555
[build-dependencies]
5656
prost-types = "0.13.1" # keep in line with `datafusion-substrait`
57-
pyo3-build-config = "0.24"
57+
pyo3-build-config = "0.25"
5858

5959
[lib]
6060
name = "datafusion_python"

benchmarks/max_cpu_usage.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@
5353

5454
def main(num_rows: int, partitions: int) -> None:
5555
"""Run a simple aggregation after repartitioning.
56-
56+
5757
This function demonstrates basic partitioning concepts using synthetic data.
5858
Real-world performance will depend on your specific data sources, query types,
5959
and system configuration.

docs/source/user-guide/common-operations/windows.rst

Lines changed: 24 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ We'll use the pokemon dataset (from Ritchie Vink) in the following examples.
3131
.. ipython:: python
3232
3333
from datafusion import SessionContext
34-
from datafusion import col
34+
from datafusion import col, lit
3535
from datafusion import functions as f
3636
3737
ctx = SessionContext()
@@ -120,16 +120,14 @@ two preceding rows.
120120

121121
.. ipython:: python
122122
123-
from datafusion.expr import WindowFrame
123+
from datafusion.expr import Window, WindowFrame
124124
125125
df.select(
126126
col('"Name"'),
127127
col('"Speed"'),
128-
f.window("avg",
129-
[col('"Speed"')],
130-
order_by=[col('"Speed"')],
131-
window_frame=WindowFrame("rows", 2, 0)
132-
).alias("Previous Speed")
128+
f.avg(col('"Speed"'))
129+
.over(Window(window_frame=WindowFrame("rows", 2, 0), order_by=[col('"Speed"')]))
130+
.alias("Previous Speed"),
133131
)
134132
135133
Null Treatment
@@ -151,21 +149,27 @@ it's ``Type 2`` column that are null.
151149
152150
from datafusion.common import NullTreatment
153151
154-
df.filter(col('"Type 1"') == lit("Bug")).select(
152+
df.filter(col('"Type 1"') == lit("Bug")).select(
155153
'"Name"',
156154
'"Type 2"',
157-
f.window("last_value", [col('"Type 2"')])
158-
.window_frame(WindowFrame("rows", None, 0))
159-
.order_by(col('"Speed"'))
160-
.null_treatment(NullTreatment.IGNORE_NULLS)
161-
.build()
162-
.alias("last_wo_null"),
163-
f.window("last_value", [col('"Type 2"')])
164-
.window_frame(WindowFrame("rows", None, 0))
165-
.order_by(col('"Speed"'))
166-
.null_treatment(NullTreatment.RESPECT_NULLS)
167-
.build()
168-
.alias("last_with_null")
155+
f.last_value(col('"Type 2"'))
156+
.over(
157+
Window(
158+
window_frame=WindowFrame("rows", None, 0),
159+
order_by=[col('"Speed"')],
160+
null_treatment=NullTreatment.IGNORE_NULLS,
161+
)
162+
)
163+
.alias("last_wo_null"),
164+
f.last_value(col('"Type 2"'))
165+
.over(
166+
Window(
167+
window_frame=WindowFrame("rows", None, 0),
168+
order_by=[col('"Speed"')],
169+
null_treatment=NullTreatment.RESPECT_NULLS,
170+
)
171+
)
172+
.alias("last_with_null"),
169173
)
170174
171175
Aggregate Functions

python/datafusion/functions.py

Lines changed: 39 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,14 @@
3232
WindowFrame,
3333
expr_list_to_raw_expr_list,
3434
sort_list_to_raw_sort_list,
35+
sort_or_default,
3536
)
3637

38+
try:
39+
from warnings import deprecated # Python 3.13+
40+
except ImportError:
41+
from typing_extensions import deprecated # Python 3.12
42+
3743
if TYPE_CHECKING:
3844
from datafusion.context import SessionContext
3945

@@ -426,12 +432,15 @@ def when(when: Expr, then: Expr) -> CaseBuilder:
426432
return CaseBuilder(f.when(when.expr, then.expr))
427433

428434

435+
@deprecated("Prefer to call Expr.over() instead")
429436
def window(
430437
name: str,
431438
args: list[Expr],
432439
partition_by: list[Expr] | Expr | None = None,
433440
order_by: list[SortKey] | SortKey | None = None,
434441
window_frame: WindowFrame | None = None,
442+
filter: Expr | None = None,
443+
distinct: bool = False,
435444
ctx: SessionContext | None = None,
436445
) -> Expr:
437446
"""Creates a new Window function expression.
@@ -451,7 +460,19 @@ def window(
451460
order_by_raw = sort_list_to_raw_sort_list(order_by)
452461
window_frame = window_frame.window_frame if window_frame is not None else None
453462
ctx = ctx.ctx if ctx is not None else None
454-
return Expr(f.window(name, args, partition_by_raw, order_by_raw, window_frame, ctx))
463+
filter_raw = filter.expr if filter is not None else None
464+
return Expr(
465+
f.window(
466+
name,
467+
args,
468+
partition_by=partition_by_raw,
469+
order_by=order_by_raw,
470+
window_frame=window_frame,
471+
ctx=ctx,
472+
filter=filter_raw,
473+
distinct=distinct,
474+
)
475+
)
455476

456477

457478
# scalar functions
@@ -1664,7 +1685,7 @@ def approx_median(expression: Expr, filter: Optional[Expr] = None) -> Expr:
16641685

16651686

16661687
def approx_percentile_cont(
1667-
expression: Expr,
1688+
sort_expression: Expr | SortExpr,
16681689
percentile: float,
16691690
num_centroids: Optional[int] = None,
16701691
filter: Optional[Expr] = None,
@@ -1685,21 +1706,26 @@ def approx_percentile_cont(
16851706
the options ``order_by``, ``null_treatment``, and ``distinct``.
16861707
16871708
Args:
1688-
expression: Values for which to find the approximate percentile
1709+
sort_expression: Values for which to find the approximate percentile
16891710
percentile: This must be between 0.0 and 1.0, inclusive
16901711
num_centroids: Max bin size for the t-digest algorithm
16911712
filter: If provided, only compute against rows for which the filter is True
16921713
"""
1714+
sort_expr_raw = sort_or_default(sort_expression)
16931715
filter_raw = filter.expr if filter is not None else None
16941716
return Expr(
16951717
f.approx_percentile_cont(
1696-
expression.expr, percentile, num_centroids=num_centroids, filter=filter_raw
1718+
sort_expr_raw, percentile, num_centroids=num_centroids, filter=filter_raw
16971719
)
16981720
)
16991721

17001722

17011723
def approx_percentile_cont_with_weight(
1702-
expression: Expr, weight: Expr, percentile: float, filter: Optional[Expr] = None
1724+
sort_expression: Expr | SortExpr,
1725+
weight: Expr,
1726+
percentile: float,
1727+
num_centroids: Optional[int] = None,
1728+
filter: Optional[Expr] = None,
17031729
) -> Expr:
17041730
"""Returns the value of the weighted approximate percentile.
17051731
@@ -1710,16 +1736,22 @@ def approx_percentile_cont_with_weight(
17101736
the options ``order_by``, ``null_treatment``, and ``distinct``.
17111737
17121738
Args:
1713-
expression: Values for which to find the approximate percentile
1739+
sort_expression: Values for which to find the approximate percentile
17141740
weight: Relative weight for each of the values in ``expression``
17151741
percentile: This must be between 0.0 and 1.0, inclusive
1742+
num_centroids: Max bin size for the t-digest algorithm
17161743
filter: If provided, only compute against rows for which the filter is True
17171744
17181745
"""
1746+
sort_expr_raw = sort_or_default(sort_expression)
17191747
filter_raw = filter.expr if filter is not None else None
17201748
return Expr(
17211749
f.approx_percentile_cont_with_weight(
1722-
expression.expr, weight.expr, percentile, filter=filter_raw
1750+
sort_expr_raw,
1751+
weight.expr,
1752+
percentile,
1753+
num_centroids=num_centroids,
1754+
filter=filter_raw,
17231755
)
17241756
)
17251757

python/tests/test_aggregation.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,11 +130,27 @@ def test_aggregation_stats(df, agg_expr, calc_expected):
130130
(f.median(column("b"), filter=column("a") != 2), pa.array([5]), False),
131131
(f.approx_median(column("b"), filter=column("a") != 2), pa.array([5]), False),
132132
(f.approx_percentile_cont(column("b"), 0.5), pa.array([4]), False),
133+
(
134+
f.approx_percentile_cont(
135+
column("b").sort(ascending=True, nulls_first=False),
136+
0.5,
137+
num_centroids=2,
138+
),
139+
pa.array([4]),
140+
False,
141+
),
133142
(
134143
f.approx_percentile_cont_with_weight(column("b"), lit(0.6), 0.5),
135144
pa.array([6], type=pa.float64()),
136145
False,
137146
),
147+
(
148+
f.approx_percentile_cont_with_weight(
149+
column("b").sort(ascending=False, nulls_first=False), lit(0.6), 0.5
150+
),
151+
pa.array([6], type=pa.float64()),
152+
False,
153+
),
138154
(
139155
f.approx_percentile_cont_with_weight(
140156
column("b"), lit(0.6), 0.5, filter=column("a") != lit(3)

src/common/data_type.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -215,6 +215,16 @@ impl DataTypeMap {
215215
DataType::Dictionary(_, _) => {
216216
Err(PyNotImplementedError::new_err(format!("{arrow_type:?}")))
217217
}
218+
DataType::Decimal32(precision, scale) => Ok(DataTypeMap::new(
219+
DataType::Decimal32(*precision, *scale),
220+
PythonType::Float,
221+
SqlType::DECIMAL,
222+
)),
223+
DataType::Decimal64(precision, scale) => Ok(DataTypeMap::new(
224+
DataType::Decimal64(*precision, *scale),
225+
PythonType::Float,
226+
SqlType::DECIMAL,
227+
)),
218228
DataType::Decimal128(precision, scale) => Ok(DataTypeMap::new(
219229
DataType::Decimal128(*precision, *scale),
220230
PythonType::Float,
@@ -549,6 +559,8 @@ impl DataTypeMap {
549559
DataType::Struct(_) => "Struct",
550560
DataType::Union(_, _) => "Union",
551561
DataType::Dictionary(_, _) => "Dictionary",
562+
DataType::Decimal32(_, _) => "Decimal32",
563+
DataType::Decimal64(_, _) => "Decimal64",
552564
DataType::Decimal128(_, _) => "Decimal128",
553565
DataType::Decimal256(_, _) => "Decimal256",
554566
DataType::Map(_, _) => "Map",

src/dataframe.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -276,7 +276,6 @@ impl PyParquetColumnOptions {
276276
statistics_enabled,
277277
bloom_filter_fpp,
278278
bloom_filter_ndv,
279-
..Default::default()
280279
},
281280
}
282281
}

src/expr/sort_expr.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ use std::fmt::{self, Display, Formatter};
2323
#[pyclass(name = "SortExpr", module = "datafusion.expr", subclass)]
2424
#[derive(Clone)]
2525
pub struct PySortExpr {
26-
sort: SortExpr,
26+
pub(crate) sort: SortExpr,
2727
}
2828

2929
impl From<PySortExpr> for SortExpr {

src/functions.rs

Lines changed: 19 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -319,21 +319,25 @@ fn find_window_fn(
319319
}
320320

321321
/// Creates a new Window function expression
322+
#[allow(clippy::too_many_arguments)]
322323
#[pyfunction]
323-
#[pyo3(signature = (name, args, partition_by=None, order_by=None, window_frame=None, ctx=None))]
324+
#[pyo3(signature = (name, args, partition_by=None, order_by=None, window_frame=None, filter=None, distinct=false, ctx=None))]
324325
fn window(
325326
name: &str,
326327
args: Vec<PyExpr>,
327328
partition_by: Option<Vec<PyExpr>>,
328329
order_by: Option<Vec<PySortExpr>>,
329330
window_frame: Option<PyWindowFrame>,
331+
filter: Option<PyExpr>,
332+
distinct: bool,
330333
ctx: Option<PySessionContext>,
331334
) -> PyResult<PyExpr> {
332335
let fun = find_window_fn(name, ctx)?;
333336

334337
let window_frame = window_frame
335338
.map(|w| w.into())
336339
.unwrap_or(WindowFrame::new(order_by.as_ref().map(|v| !v.is_empty())));
340+
let filter = filter.map(|f| f.expr.into());
337341

338342
Ok(PyExpr {
339343
expr: datafusion::logical_expr::Expr::WindowFunction(Box::new(WindowFunction {
@@ -351,6 +355,8 @@ fn window(
351355
.map(|x| x.into())
352356
.collect::<Vec<_>>(),
353357
window_frame,
358+
filter,
359+
distinct,
354360
null_treatment: None,
355361
},
356362
})),
@@ -649,36 +655,36 @@ aggregate_function!(approx_median);
649655
// aggregate_function!(grouping);
650656

651657
#[pyfunction]
652-
#[pyo3(signature = (expression, percentile, num_centroids=None, filter=None))]
658+
#[pyo3(signature = (sort_expression, percentile, num_centroids=None, filter=None))]
653659
pub fn approx_percentile_cont(
654-
expression: PyExpr,
660+
sort_expression: PySortExpr,
655661
percentile: f64,
656662
num_centroids: Option<i64>, // enforces optional arguments at the end, currently
657663
filter: Option<PyExpr>,
658664
) -> PyDataFusionResult<PyExpr> {
659-
let args = if let Some(num_centroids) = num_centroids {
660-
vec![expression.expr, lit(percentile), lit(num_centroids)]
661-
} else {
662-
vec![expression.expr, lit(percentile)]
663-
};
664-
let udaf = functions_aggregate::approx_percentile_cont::approx_percentile_cont_udaf();
665-
let agg_fn = udaf.call(args);
665+
let agg_fn = functions_aggregate::expr_fn::approx_percentile_cont(
666+
sort_expression.sort,
667+
lit(percentile),
668+
num_centroids.map(lit),
669+
);
666670

667671
add_builder_fns_to_aggregate(agg_fn, None, filter, None, None)
668672
}
669673

670674
#[pyfunction]
671-
#[pyo3(signature = (expression, weight, percentile, filter=None))]
675+
#[pyo3(signature = (sort_expression, weight, percentile, num_centroids=None, filter=None))]
672676
pub fn approx_percentile_cont_with_weight(
673-
expression: PyExpr,
677+
sort_expression: PySortExpr,
674678
weight: PyExpr,
675679
percentile: f64,
680+
num_centroids: Option<i64>,
676681
filter: Option<PyExpr>,
677682
) -> PyDataFusionResult<PyExpr> {
678683
let agg_fn = functions_aggregate::expr_fn::approx_percentile_cont_with_weight(
679-
expression.expr,
684+
sort_expression.sort,
680685
weight.expr,
681686
lit(percentile),
687+
num_centroids.map(lit),
682688
);
683689

684690
add_builder_fns_to_aggregate(agg_fn, None, filter, None, None)

0 commit comments

Comments
 (0)