Skip to content

Commit 57795da

Browse files
abhiaagarwalrtyler
andauthored
chore: bump to datafusion 39, arrow 52, pyo3 0.21 (#2581)
# Description Updates the arrow and datafusion dependencies to 52 and 39(-rc1) respectively. This is necessary for updating pyo3. While most changes with trivial, some required big rewrites. Namely, the logic for the Updates operation had to be rewritten (and simplified) to accommodate some new sanity checks inside datafusion: (apache/datafusion#10088). Depends on delta-kernel having its arrow and object-store version bumped as well. This PR doesn't include any major changes for pyo3, I'll open a separate PR depending on this PR. # Related Issue(s) <!--- For example: - closes #106 ---> # Documentation <!--- Share links to useful documentation ---> --------- Co-authored-by: R. Tyler Croy <rtyler@brokenco.de>
1 parent 0a44a0d commit 57795da

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

43 files changed

+368
-463
lines changed

Cargo.toml

+22-22
Original file line numberDiff line numberDiff line change
@@ -26,33 +26,33 @@ debug = true
2626
debug = "line-tables-only"
2727

2828
[workspace.dependencies]
29-
delta_kernel = { version = "0.1" }
29+
delta_kernel = { version = "0.1.1" }
3030
# delta_kernel = { path = "../delta-kernel-rs/kernel" }
3131

3232
# arrow
33-
arrow = { version = "51" }
34-
arrow-arith = { version = "51" }
35-
arrow-array = { version = "51", features = ["chrono-tz"] }
36-
arrow-buffer = { version = "51" }
37-
arrow-cast = { version = "51" }
38-
arrow-ipc = { version = "51" }
39-
arrow-json = { version = "51" }
40-
arrow-ord = { version = "51" }
41-
arrow-row = { version = "51" }
42-
arrow-schema = { version = "51" }
43-
arrow-select = { version = "51" }
44-
object_store = { version = "0.9" }
45-
parquet = { version = "51" }
33+
arrow = { version = "52" }
34+
arrow-arith = { version = "52" }
35+
arrow-array = { version = "52", features = ["chrono-tz"] }
36+
arrow-buffer = { version = "52" }
37+
arrow-cast = { version = "52" }
38+
arrow-ipc = { version = "52" }
39+
arrow-json = { version = "52" }
40+
arrow-ord = { version = "52" }
41+
arrow-row = { version = "52" }
42+
arrow-schema = { version = "52" }
43+
arrow-select = { version = "52" }
44+
object_store = { version = "0.10.1" }
45+
parquet = { version = "52" }
4646

4747
# datafusion
48-
datafusion = { version = "37.1" }
49-
datafusion-expr = { version = "37.1" }
50-
datafusion-common = { version = "37.1" }
51-
datafusion-proto = { version = "37.1" }
52-
datafusion-sql = { version = "37.1" }
53-
datafusion-physical-expr = { version = "37.1" }
54-
datafusion-functions = { version = "37.1" }
55-
datafusion-functions-array = { version = "37.1" }
48+
datafusion = { version = "39" }
49+
datafusion-expr = { version = "39" }
50+
datafusion-common = { version = "39" }
51+
datafusion-proto = { version = "39" }
52+
datafusion-sql = { version = "39" }
53+
datafusion-physical-expr = { version = "39" }
54+
datafusion-functions = { version = "39" }
55+
datafusion-functions-array = { version = "39" }
5656

5757
# serde
5858
serde = { version = "1.0.194", features = ["derive"] }

crates/aws/src/lib.rs

+3-3
Original file line numberDiff line numberDiff line change
@@ -189,15 +189,15 @@ impl DynamoDbLockClient {
189189
if dynamodb_override_endpoint exists/AWS_ENDPOINT_URL_DYNAMODB is specified by user
190190
use dynamodb_override_endpoint to create dynamodb client
191191
*/
192-
let dynamodb_sdk_config = match dynamodb_override_endpoint {
192+
193+
match dynamodb_override_endpoint {
193194
Some(dynamodb_endpoint_url) => sdk_config
194195
.to_owned()
195196
.to_builder()
196197
.endpoint_url(dynamodb_endpoint_url)
197198
.build(),
198199
None => sdk_config.to_owned(),
199-
};
200-
dynamodb_sdk_config
200+
}
201201
}
202202

203203
/// Create the lock table where DynamoDb stores the commit information for all delta tables.

crates/aws/src/storage.rs

+10-13
Original file line numberDiff line numberDiff line change
@@ -5,22 +5,22 @@ use aws_config::provider_config::ProviderConfig;
55
use aws_config::{Region, SdkConfig};
66
use bytes::Bytes;
77
use deltalake_core::storage::object_store::{
8-
aws::AmazonS3ConfigKey, parse_url_opts, GetOptions, GetResult, ListResult, MultipartId,
9-
ObjectMeta, ObjectStore, PutOptions, PutResult, Result as ObjectStoreResult,
8+
aws::AmazonS3ConfigKey, parse_url_opts, GetOptions, GetResult, ListResult, ObjectMeta,
9+
ObjectStore, PutOptions, PutResult, Result as ObjectStoreResult,
1010
};
1111
use deltalake_core::storage::{
1212
limit_store_handler, str_is_truthy, ObjectStoreFactory, ObjectStoreRef, StorageOptions,
1313
};
1414
use deltalake_core::{DeltaResult, ObjectStoreError, Path};
1515
use futures::stream::BoxStream;
1616
use futures::Future;
17+
use object_store::{MultipartUpload, PutMultipartOpts, PutPayload};
1718
use std::collections::HashMap;
1819
use std::fmt::Debug;
1920
use std::ops::Range;
2021
use std::str::FromStr;
2122
use std::sync::Arc;
2223
use std::time::Duration;
23-
use tokio::io::AsyncWrite;
2424
use url::Url;
2525

2626
use crate::errors::DynamoDbConfigError;
@@ -334,14 +334,14 @@ impl std::fmt::Debug for S3StorageBackend {
334334

335335
#[async_trait::async_trait]
336336
impl ObjectStore for S3StorageBackend {
337-
async fn put(&self, location: &Path, bytes: Bytes) -> ObjectStoreResult<PutResult> {
337+
async fn put(&self, location: &Path, bytes: PutPayload) -> ObjectStoreResult<PutResult> {
338338
self.inner.put(location, bytes).await
339339
}
340340

341341
async fn put_opts(
342342
&self,
343343
location: &Path,
344-
bytes: Bytes,
344+
bytes: PutPayload,
345345
options: PutOptions,
346346
) -> ObjectStoreResult<PutResult> {
347347
self.inner.put_opts(location, bytes, options).await
@@ -402,19 +402,16 @@ impl ObjectStore for S3StorageBackend {
402402
}
403403
}
404404

405-
async fn put_multipart(
406-
&self,
407-
location: &Path,
408-
) -> ObjectStoreResult<(MultipartId, Box<dyn AsyncWrite + Unpin + Send>)> {
405+
async fn put_multipart(&self, location: &Path) -> ObjectStoreResult<Box<dyn MultipartUpload>> {
409406
self.inner.put_multipart(location).await
410407
}
411408

412-
async fn abort_multipart(
409+
async fn put_multipart_opts(
413410
&self,
414411
location: &Path,
415-
multipart_id: &MultipartId,
416-
) -> ObjectStoreResult<()> {
417-
self.inner.abort_multipart(location, multipart_id).await
412+
options: PutMultipartOpts,
413+
) -> ObjectStoreResult<Box<dyn MultipartUpload>> {
414+
self.inner.put_multipart_opts(location, options).await
418415
}
419416
}
420417

crates/aws/tests/common.rs

+3-3
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ impl S3Integration {
8787
"dynamodb",
8888
"create-table",
8989
"--table-name",
90-
&table_name,
90+
table_name,
9191
"--provisioned-throughput",
9292
"ReadCapacityUnits=1,WriteCapacityUnits=1",
9393
"--attribute-definitions",
@@ -112,7 +112,7 @@ impl S3Integration {
112112
}
113113

114114
fn wait_for_table(table_name: &str) -> std::io::Result<()> {
115-
let args = ["dynamodb", "describe-table", "--table-name", &table_name];
115+
let args = ["dynamodb", "describe-table", "--table-name", table_name];
116116
loop {
117117
let output = Command::new("aws")
118118
.args(args)
@@ -145,7 +145,7 @@ impl S3Integration {
145145

146146
fn delete_dynamodb_table(table_name: &str) -> std::io::Result<ExitStatus> {
147147
let mut child = Command::new("aws")
148-
.args(["dynamodb", "delete-table", "--table-name", &table_name])
148+
.args(["dynamodb", "delete-table", "--table-name", table_name])
149149
.stdout(Stdio::null())
150150
.spawn()
151151
.expect("aws command is installed");

crates/aws/tests/repair_s3_rename_test.rs

+10-12
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ use deltalake_core::storage::object_store::{
99
use deltalake_core::{DeltaTableBuilder, ObjectStore, Path};
1010
use deltalake_test::utils::IntegrationContext;
1111
use futures::stream::BoxStream;
12+
use object_store::{MultipartUpload, PutMultipartOpts, PutPayload};
1213
use serial_test::serial;
1314
use std::ops::Range;
1415
use std::sync::{Arc, Mutex};
@@ -60,8 +61,8 @@ async fn run_repair_test_case(path: &str, pause_copy: bool) -> Result<(), Object
6061
};
6162
let (s3_2, _) = create_s3_backend(&context, "w2", None, None);
6263

63-
s3_1.put(&src1, Bytes::from("test1")).await.unwrap();
64-
s3_2.put(&src2, Bytes::from("test2")).await.unwrap();
64+
s3_1.put(&src1, Bytes::from("test1").into()).await.unwrap();
65+
s3_2.put(&src2, Bytes::from("test2").into()).await.unwrap();
6566

6667
let rename1 = rename(s3_1, &src1, &dst1);
6768
// to ensure that first one is started actually first
@@ -166,14 +167,14 @@ impl ObjectStore for DelayedObjectStore {
166167
self.delete(from).await
167168
}
168169

169-
async fn put(&self, location: &Path, bytes: Bytes) -> ObjectStoreResult<PutResult> {
170+
async fn put(&self, location: &Path, bytes: PutPayload) -> ObjectStoreResult<PutResult> {
170171
self.inner.put(location, bytes).await
171172
}
172173

173174
async fn put_opts(
174175
&self,
175176
location: &Path,
176-
bytes: Bytes,
177+
bytes: PutPayload,
177178
options: PutOptions,
178179
) -> ObjectStoreResult<PutResult> {
179180
self.inner.put_opts(location, bytes, options).await
@@ -227,19 +228,16 @@ impl ObjectStore for DelayedObjectStore {
227228
self.inner.rename_if_not_exists(from, to).await
228229
}
229230

230-
async fn put_multipart(
231-
&self,
232-
location: &Path,
233-
) -> ObjectStoreResult<(MultipartId, Box<dyn AsyncWrite + Unpin + Send>)> {
231+
async fn put_multipart(&self, location: &Path) -> ObjectStoreResult<Box<dyn MultipartUpload>> {
234232
self.inner.put_multipart(location).await
235233
}
236234

237-
async fn abort_multipart(
235+
async fn put_multipart_opts(
238236
&self,
239237
location: &Path,
240-
multipart_id: &MultipartId,
241-
) -> ObjectStoreResult<()> {
242-
self.inner.abort_multipart(location, multipart_id).await
238+
options: PutMultipartOpts,
239+
) -> ObjectStoreResult<Box<dyn MultipartUpload>> {
240+
self.inner.put_multipart_opts(location, options).await
243241
}
244242
}
245243

crates/azure/tests/integration.rs

+4-1
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,10 @@ async fn read_write_test_onelake(context: &IntegrationContext, path: &Path) -> T
7575

7676
let expected = Bytes::from_static(b"test world from delta-rs on friday");
7777

78-
delta_store.put(path, expected.clone()).await.unwrap();
78+
delta_store
79+
.put(path, expected.clone().into())
80+
.await
81+
.unwrap();
7982
let fetched = delta_store.get(path).await.unwrap().bytes().await.unwrap();
8083
assert_eq!(expected, fetched);
8184

crates/benchmarks/src/bin/merge.rs

+2-1
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,10 @@ use arrow::datatypes::Schema as ArrowSchema;
77
use arrow_array::{RecordBatch, StringArray, UInt32Array};
88
use chrono::Duration;
99
use clap::{command, Args, Parser, Subcommand};
10+
use datafusion::functions::expr_fn::random;
1011
use datafusion::{datasource::MemTable, prelude::DataFrame};
1112
use datafusion_common::DataFusionError;
12-
use datafusion_expr::{cast, col, lit, random};
13+
use datafusion_expr::{cast, col, lit};
1314
use deltalake_core::protocol::SaveMode;
1415
use deltalake_core::{
1516
arrow::{

crates/core/src/delta_datafusion/cdf/scan.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ impl ExecutionPlan for DeltaCdfScan {
3838
self.plan.properties()
3939
}
4040

41-
fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
41+
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
4242
vec![]
4343
}
4444

crates/core/src/delta_datafusion/cdf/scan_utils.rs

+1
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ pub fn create_partition_values<F: FileAction>(
8383
partition_values: new_part_values.clone(),
8484
extensions: None,
8585
range: None,
86+
statistics: None,
8687
};
8788

8889
file_groups.entry(new_part_values).or_default().push(part);

0 commit comments

Comments
 (0)