Skip to content

Commit 5e1ea80

Browse files
committed
Update for new APIs, update other dependencies
1 parent 5e184ee commit 5e1ea80

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

56 files changed

+606
-697
lines changed

Cargo.lock

Lines changed: 42 additions & 170 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -166,7 +166,7 @@ parquet = { version = "56.2.0", default-features = false, features = [
166166
pbjson = { version = "0.8.0" }
167167
pbjson-types = "0.8"
168168
# Should match arrow-flight's version of prost.
169-
prost = "0.13.1"
169+
prost = "0.14.1"
170170
rand = "0.9"
171171
recursive = "0.1.1"
172172
regex = "1.11"

datafusion-cli/src/main.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -575,9 +575,9 @@ mod tests {
575575
+-----------------------------------+-----------------+---------------------+------+------------------+
576576
| filename | file_size_bytes | metadata_size_bytes | hits | extra |
577577
+-----------------------------------+-----------------+---------------------+------+------------------+
578-
| alltypes_plain.parquet | 1851 | 10181 | 2 | page_index=false |
579-
| alltypes_tiny_pages.parquet | 454233 | 881418 | 2 | page_index=true |
580-
| lz4_raw_compressed_larger.parquet | 380836 | 2939 | 2 | page_index=false |
578+
| alltypes_plain.parquet | 1851 | 10309 | 2 | page_index=false |
579+
| alltypes_tiny_pages.parquet | 454233 | 881546 | 2 | page_index=true |
580+
| lz4_raw_compressed_larger.parquet | 380836 | 2971 | 2 | page_index=false |
581581
+-----------------------------------+-----------------+---------------------+------+------------------+
582582
");
583583

@@ -606,9 +606,9 @@ mod tests {
606606
+-----------------------------------+-----------------+---------------------+------+------------------+
607607
| filename | file_size_bytes | metadata_size_bytes | hits | extra |
608608
+-----------------------------------+-----------------+---------------------+------+------------------+
609-
| alltypes_plain.parquet | 1851 | 10181 | 5 | page_index=false |
610-
| alltypes_tiny_pages.parquet | 454233 | 881418 | 2 | page_index=true |
611-
| lz4_raw_compressed_larger.parquet | 380836 | 2939 | 3 | page_index=false |
609+
| alltypes_plain.parquet | 1851 | 10309 | 5 | page_index=false |
610+
| alltypes_tiny_pages.parquet | 454233 | 881546 | 2 | page_index=true |
611+
| lz4_raw_compressed_larger.parquet | 380836 | 2971 | 3 | page_index=false |
612612
+-----------------------------------+-----------------+---------------------+------+------------------+
613613
");
614614

datafusion-examples/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ serde_json = { workspace = true }
8181
tempfile = { workspace = true }
8282
test-utils = { path = "../test-utils" }
8383
tokio = { workspace = true, features = ["rt-multi-thread", "parking_lot"] }
84-
tonic = "0.13.1"
84+
tonic = "0.14"
8585
tracing = { version = "0.1" }
8686
tracing-subscriber = { version = "0.3" }
8787
url = { workspace = true }

datafusion-examples/examples/flight/flight_client.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
use std::collections::HashMap;
1919
use std::sync::Arc;
20+
use tonic::transport::Endpoint;
2021

2122
use datafusion::arrow::datatypes::Schema;
2223

@@ -34,7 +35,9 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
3435
let testdata = datafusion::test_util::parquet_test_data();
3536

3637
// Create Flight client
37-
let mut client = FlightServiceClient::connect("http://localhost:50051").await?;
38+
let endpoint = Endpoint::new("http://localhost:50051")?;
39+
let channel = endpoint.connect().await?;
40+
let mut client = FlightServiceClient::new(channel);
3841

3942
// Call get_schema to get the schema of a Parquet file
4043
let request = tonic::Request::new(FlightDescriptor {

datafusion-examples/examples/flight/flight_server.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18-
use arrow::ipc::writer::{DictionaryTracker, IpcDataGenerator};
18+
use arrow::ipc::writer::{CompressionContext, DictionaryTracker, IpcDataGenerator};
1919
use std::sync::Arc;
2020

2121
use arrow_flight::{PollInfo, SchemaAsIpc};
@@ -106,6 +106,7 @@ impl FlightService for FlightServiceImpl {
106106

107107
// add an initial FlightData message that sends schema
108108
let options = arrow::ipc::writer::IpcWriteOptions::default();
109+
let mut compression_context = CompressionContext::default();
109110
let schema_flight_data = SchemaAsIpc::new(&schema, &options);
110111

111112
let mut flights = vec![FlightData::from(schema_flight_data)];
@@ -115,7 +116,7 @@ impl FlightService for FlightServiceImpl {
115116

116117
for batch in &results {
117118
let (flight_dictionaries, flight_batch) = encoder
118-
.encoded_batch(batch, &mut tracker, &options)
119+
.encode(batch, &mut tracker, &options, &mut compression_context)
119120
.map_err(|e: ArrowError| Status::internal(e.to_string()))?;
120121

121122
flights.extend(flight_dictionaries.into_iter().map(Into::into));

datafusion/common/src/dfschema.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1417,9 +1417,7 @@ mod tests {
14171417
fn from_qualified_schema_into_arrow_schema() -> Result<()> {
14181418
let schema = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
14191419
let arrow_schema = schema.as_arrow();
1420-
let expected = "Field { name: \"c0\", data_type: Boolean, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, \
1421-
Field { name: \"c1\", data_type: Boolean, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }";
1422-
assert_eq!(expected, arrow_schema.to_string());
1420+
insta::assert_snapshot!(arrow_schema.to_string(), @r#"Field { "c0": nullable Boolean }, Field { "c1": nullable Boolean }"#);
14231421
Ok(())
14241422
}
14251423

datafusion/common/src/pyarrow.rs

Lines changed: 12 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ use arrow::pyarrow::{FromPyArrow, ToPyArrow};
2222
use pyo3::exceptions::PyException;
2323
use pyo3::prelude::PyErr;
2424
use pyo3::types::{PyAnyMethods, PyList};
25-
use pyo3::{Bound, FromPyObject, IntoPyObject, PyAny, PyObject, PyResult, Python};
25+
use pyo3::{Bound, FromPyObject, IntoPyObject, PyAny, PyResult, Python};
2626

2727
use crate::{DataFusionError, ScalarValue};
2828

@@ -52,11 +52,11 @@ impl FromPyArrow for ScalarValue {
5252
}
5353

5454
impl ToPyArrow for ScalarValue {
55-
fn to_pyarrow(&self, py: Python) -> PyResult<PyObject> {
55+
fn to_pyarrow<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
5656
let array = self.to_array()?;
5757
// convert to pyarrow array using C data interface
5858
let pyarray = array.to_data().to_pyarrow(py)?;
59-
let pyscalar = pyarray.call_method1(py, "__getitem__", (0,))?;
59+
let pyscalar = pyarray.call_method1("__getitem__", (0,))?;
6060

6161
Ok(pyscalar)
6262
}
@@ -79,23 +79,22 @@ impl<'source> IntoPyObject<'source> for ScalarValue {
7979
let array = self.to_array()?;
8080
// convert to pyarrow array using C data interface
8181
let pyarray = array.to_data().to_pyarrow(py)?;
82-
let pyarray_bound = pyarray.bind(py);
83-
pyarray_bound.call_method1("__getitem__", (0,))
82+
pyarray.call_method1("__getitem__", (0,))
8483
}
8584
}
8685

8786
#[cfg(test)]
8887
mod tests {
8988
use pyo3::ffi::c_str;
90-
use pyo3::prepare_freethreaded_python;
9189
use pyo3::py_run;
9290
use pyo3::types::PyDict;
91+
use pyo3::Python;
9392

9493
use super::*;
9594

9695
fn init_python() {
97-
prepare_freethreaded_python();
98-
Python::with_gil(|py| {
96+
Python::initialize();
97+
Python::attach(|py| {
9998
if py.run(c_str!("import pyarrow"), None, None).is_err() {
10099
let locals = PyDict::new(py);
101100
py.run(
@@ -135,12 +134,11 @@ mod tests {
135134
ScalarValue::Date32(Some(1234)),
136135
];
137136

138-
Python::with_gil(|py| {
137+
Python::attach(|py| {
139138
for scalar in example_scalars.iter() {
140-
let result = ScalarValue::from_pyarrow_bound(
141-
scalar.to_pyarrow(py).unwrap().bind(py),
142-
)
143-
.unwrap();
139+
let result =
140+
ScalarValue::from_pyarrow_bound(&scalar.to_pyarrow(py).unwrap())
141+
.unwrap();
144142
assert_eq!(scalar, &result);
145143
}
146144
});
@@ -150,7 +148,7 @@ mod tests {
150148
fn test_py_scalar() -> PyResult<()> {
151149
init_python();
152150

153-
Python::with_gil(|py| -> PyResult<()> {
151+
Python::attach(|py| -> PyResult<()> {
154152
let scalar_float = ScalarValue::Float64(Some(12.34));
155153
let py_float = scalar_float
156154
.into_pyobject(py)?

datafusion/core/src/physical_planner.rs

Lines changed: 12 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -2484,9 +2484,9 @@ mod tests {
24842484
// verify that the plan correctly casts u8 to i64
24852485
// the cast from u8 to i64 for literal will be simplified, and get lit(int64(5))
24862486
// the cast here is implicit so has CastOptions with safe=true
2487-
let expected = r#"BinaryExpr { left: Column { name: "c7", index: 2 }, op: Lt, right: Literal { value: Int64(5), field: Field { name: "lit", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} } }, fail_on_overflow: false }"#;
2487+
let expected = r#"BinaryExpr { left: Column { name: "c7", index: 2 }, op: Lt, right: Literal { value: Int64(5), field: Field { name: "lit", data_type: Int64 } }, fail_on_overflow: false"#;
24882488

2489-
assert!(format!("{exec_plan:?}").contains(expected));
2489+
assert_contains!(format!("{exec_plan:?}"), expected);
24902490
Ok(())
24912491
}
24922492

@@ -2510,9 +2510,7 @@ mod tests {
25102510
&session_state,
25112511
);
25122512

2513-
let expected = r#"Ok(PhysicalGroupBy { expr: [(Column { name: "c1", index: 0 }, "c1"), (Column { name: "c2", index: 1 }, "c2"), (Column { name: "c3", index: 2 }, "c3")], null_expr: [(Literal { value: Utf8(NULL), field: Field { name: "lit", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} } }, "c1"), (Literal { value: Int64(NULL), field: Field { name: "lit", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} } }, "c2"), (Literal { value: Int64(NULL), field: Field { name: "lit", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} } }, "c3")], groups: [[false, false, false], [true, false, false], [false, true, false], [false, false, true], [true, true, false], [true, false, true], [false, true, true], [true, true, true]] })"#;
2514-
2515-
assert_eq!(format!("{cube:?}"), expected);
2513+
insta::assert_snapshot!(format!("{cube:?}"), @r#"Ok(PhysicalGroupBy { expr: [(Column { name: "c1", index: 0 }, "c1"), (Column { name: "c2", index: 1 }, "c2"), (Column { name: "c3", index: 2 }, "c3")], null_expr: [(Literal { value: Utf8(NULL), field: Field { name: "lit", data_type: Utf8, nullable: true } }, "c1"), (Literal { value: Int64(NULL), field: Field { name: "lit", data_type: Int64, nullable: true } }, "c2"), (Literal { value: Int64(NULL), field: Field { name: "lit", data_type: Int64, nullable: true } }, "c3")], groups: [[false, false, false], [true, false, false], [false, true, false], [false, false, true], [true, true, false], [true, false, true], [false, true, true], [true, true, true]] })"#);
25162514

25172515
Ok(())
25182516
}
@@ -2537,9 +2535,7 @@ mod tests {
25372535
&session_state,
25382536
);
25392537

2540-
let expected = r#"Ok(PhysicalGroupBy { expr: [(Column { name: "c1", index: 0 }, "c1"), (Column { name: "c2", index: 1 }, "c2"), (Column { name: "c3", index: 2 }, "c3")], null_expr: [(Literal { value: Utf8(NULL), field: Field { name: "lit", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} } }, "c1"), (Literal { value: Int64(NULL), field: Field { name: "lit", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} } }, "c2"), (Literal { value: Int64(NULL), field: Field { name: "lit", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} } }, "c3")], groups: [[true, true, true], [false, true, true], [false, false, true], [false, false, false]] })"#;
2541-
2542-
assert_eq!(format!("{rollup:?}"), expected);
2538+
insta::assert_snapshot!(format!("{rollup:?}"), @r#"Ok(PhysicalGroupBy { expr: [(Column { name: "c1", index: 0 }, "c1"), (Column { name: "c2", index: 1 }, "c2"), (Column { name: "c3", index: 2 }, "c3")], null_expr: [(Literal { value: Utf8(NULL), field: Field { name: "lit", data_type: Utf8, nullable: true } }, "c1"), (Literal { value: Int64(NULL), field: Field { name: "lit", data_type: Int64, nullable: true } }, "c2"), (Literal { value: Int64(NULL), field: Field { name: "lit", data_type: Int64, nullable: true } }, "c3")], groups: [[true, true, true], [false, true, true], [false, false, true], [false, false, false]] })"#);
25432539

25442540
Ok(())
25452541
}
@@ -2677,35 +2673,13 @@ mod tests {
26772673
let logical_plan = LogicalPlan::Extension(Extension {
26782674
node: Arc::new(NoOpExtensionNode::default()),
26792675
});
2680-
let plan = planner
2676+
let e = planner
26812677
.create_physical_plan(&logical_plan, &session_state)
2682-
.await;
2678+
.await
2679+
.expect_err("planning error")
2680+
.strip_backtrace();
26832681

2684-
let expected_error: &str = "Error during planning: \
2685-
Extension planner for NoOp created an ExecutionPlan with mismatched schema. \
2686-
LogicalPlan schema: \
2687-
DFSchema { inner: Schema { fields: \
2688-
[Field { name: \"a\", \
2689-
data_type: Int32, \
2690-
nullable: false, \
2691-
dict_id: 0, \
2692-
dict_is_ordered: false, metadata: {} }], \
2693-
metadata: {} }, field_qualifiers: [None], \
2694-
functional_dependencies: FunctionalDependencies { deps: [] } }, \
2695-
ExecutionPlan schema: Schema { fields: \
2696-
[Field { name: \"b\", \
2697-
data_type: Int32, \
2698-
nullable: false, \
2699-
dict_id: 0, \
2700-
dict_is_ordered: false, metadata: {} }], \
2701-
metadata: {} }";
2702-
match plan {
2703-
Ok(_) => panic!("Expected planning failure"),
2704-
Err(e) => assert!(
2705-
e.to_string().contains(expected_error),
2706-
"Error '{e}' did not contain expected error '{expected_error}'"
2707-
),
2708-
}
2682+
insta::assert_snapshot!(e, @r#"Error during planning: Extension planner for NoOp created an ExecutionPlan with mismatched schema. LogicalPlan schema: DFSchema { inner: Schema { fields: [Field { name: "a", data_type: Int32 }], metadata: {} }, field_qualifiers: [None], functional_dependencies: FunctionalDependencies { deps: [] } }, ExecutionPlan schema: Schema { fields: [Field { name: "b", data_type: Int32 }], metadata: {} }"#);
27092683
}
27102684

27112685
#[tokio::test]
@@ -2721,10 +2695,9 @@ mod tests {
27212695
let execution_plan = plan(&logical_plan).await?;
27222696
// verify that the plan correctly adds cast from Int64(1) to Utf8, and the const will be evaluated.
27232697

2724-
let expected = "expr: [ProjectionExpr { expr: BinaryExpr { left: BinaryExpr { left: Column { name: \"c1\", index: 0 }, op: Eq, right: Literal { value: Utf8(\"a\"), field: Field { name: \"lit\", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} } }, fail_on_overflow: false }, op: Or, right: BinaryExpr { left: Column { name: \"c1\", index: 0 }, op: Eq, right: Literal { value: Utf8(\"1\"), field: Field { name: \"lit\", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} } }, fail_on_overflow: false }, fail_on_overflow: false }";
2698+
let expected = r#"expr: [ProjectionExpr { expr: BinaryExpr { left: BinaryExpr { left: Column { name: "c1", index: 0 }, op: Eq, right: Literal { value: Utf8("a"), field: Field { name: "lit", data_type: Utf8 } }, fail_on_overflow: false"#;
27252699

2726-
let actual = format!("{execution_plan:?}");
2727-
assert!(actual.contains(expected), "{}", actual);
2700+
assert_contains!(format!("{execution_plan:?}"), expected);
27282701

27292702
Ok(())
27302703
}
@@ -2744,7 +2717,7 @@ mod tests {
27442717

27452718
assert_contains!(
27462719
&e,
2747-
r#"Error during planning: Can not find compatible types to compare Boolean with [Struct(foo Boolean), Utf8]"#
2720+
r#"Error during planning: Can not find compatible types to compare Boolean with [Struct("foo": Boolean), Utf8]"#
27482721
);
27492722

27502723
Ok(())

datafusion/core/tests/dataframe/dataframe_functions.rs

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -282,16 +282,16 @@ async fn test_fn_arrow_typeof() -> Result<()> {
282282

283283
assert_snapshot!(
284284
batches_to_string(&batches),
285-
@r#"
286-
+------------------------------------------------------------------------------------------------------------------+
287-
| arrow_typeof(test.l) |
288-
+------------------------------------------------------------------------------------------------------------------+
289-
| List(Field { name: "item", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }) |
290-
| List(Field { name: "item", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }) |
291-
| List(Field { name: "item", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }) |
292-
| List(Field { name: "item", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }) |
293-
+------------------------------------------------------------------------------------------------------------------+
294-
"#);
285+
@r"
286+
+----------------------+
287+
| arrow_typeof(test.l) |
288+
+----------------------+
289+
| List(nullable Int32) |
290+
| List(nullable Int32) |
291+
| List(nullable Int32) |
292+
| List(nullable Int32) |
293+
+----------------------+
294+
");
295295

296296
Ok(())
297297
}

0 commit comments

Comments
 (0)