Skip to content

Commit b717723

Browse files
authored
Upgrade to arrow/parquet 55, and object_store to 0.12.0 and pyo3 to 0.24.0 (#15466)
* Temp pin to datafusion main * Update cargo lock * update pyo3 * vendor random generation * Update error message * Update for extraction * Update pin * Upgrade object_store * fix feature * Update file size handling * bash for object store API changes * few more * Update APIs more * update expected message * update error messages * Update to apache * Update API for nicer parquet u64s * Fix wasm build * Remove pin * Fix signature
1 parent 61e8a5d commit b717723

File tree

34 files changed

+239
-180
lines changed

34 files changed

+239
-180
lines changed

Cargo.lock

+84-79
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

+8-8
Original file line numberDiff line numberDiff line change
@@ -87,19 +87,19 @@ ahash = { version = "0.8", default-features = false, features = [
8787
"runtime-rng",
8888
] }
8989
apache-avro = { version = "0.17", default-features = false }
90-
arrow = { version = "54.3.1", features = [
90+
arrow = { version = "55.0.0", features = [
9191
"prettyprint",
9292
"chrono-tz",
9393
] }
94-
arrow-buffer = { version = "54.3.0", default-features = false }
95-
arrow-flight = { version = "54.3.1", features = [
94+
arrow-buffer = { version = "55.0.0", default-features = false }
95+
arrow-flight = { version = "55.0.0", features = [
9696
"flight-sql-experimental",
9797
] }
98-
arrow-ipc = { version = "54.3.0", default-features = false, features = [
98+
arrow-ipc = { version = "55.0.0", default-features = false, features = [
9999
"lz4",
100100
] }
101-
arrow-ord = { version = "54.3.0", default-features = false }
102-
arrow-schema = { version = "54.3.0", default-features = false }
101+
arrow-ord = { version = "55.0.0", default-features = false }
102+
arrow-schema = { version = "55.0.0", default-features = false }
103103
async-trait = "0.1.88"
104104
bigdecimal = "0.4.8"
105105
bytes = "1.10"
@@ -147,9 +147,9 @@ hashbrown = { version = "0.14.5", features = ["raw"] }
147147
indexmap = "2.8.0"
148148
itertools = "0.14"
149149
log = "^0.4"
150-
object_store = { version = "0.11.0", default-features = false }
150+
object_store = { version = "0.12.0", default-features = false }
151151
parking_lot = "0.12"
152-
parquet = { version = "54.3.1", default-features = false, features = [
152+
parquet = { version = "55.0.0", default-features = false, features = [
153153
"arrow",
154154
"async",
155155
"object_store",

datafusion-examples/examples/advanced_parquet_index.rs

+6-3
Original file line numberDiff line numberDiff line change
@@ -571,7 +571,9 @@ impl ParquetFileReaderFactory for CachedParquetFileReaderFactory {
571571
.to_string();
572572

573573
let object_store = Arc::clone(&self.object_store);
574-
let mut inner = ParquetObjectReader::new(object_store, file_meta.object_meta);
574+
let mut inner =
575+
ParquetObjectReader::new(object_store, file_meta.object_meta.location)
576+
.with_file_size(file_meta.object_meta.size);
575577

576578
if let Some(hint) = metadata_size_hint {
577579
inner = inner.with_footer_size_hint(hint)
@@ -599,15 +601,15 @@ struct ParquetReaderWithCache {
599601
impl AsyncFileReader for ParquetReaderWithCache {
600602
fn get_bytes(
601603
&mut self,
602-
range: Range<usize>,
604+
range: Range<u64>,
603605
) -> BoxFuture<'_, datafusion::parquet::errors::Result<Bytes>> {
604606
println!("get_bytes: {} Reading range {:?}", self.filename, range);
605607
self.inner.get_bytes(range)
606608
}
607609

608610
fn get_byte_ranges(
609611
&mut self,
610-
ranges: Vec<Range<usize>>,
612+
ranges: Vec<Range<u64>>,
611613
) -> BoxFuture<'_, datafusion::parquet::errors::Result<Vec<Bytes>>> {
612614
println!(
613615
"get_byte_ranges: {} Reading ranges {:?}",
@@ -618,6 +620,7 @@ impl AsyncFileReader for ParquetReaderWithCache {
618620

619621
fn get_metadata(
620622
&mut self,
623+
_options: Option<&ArrowReaderOptions>,
621624
) -> BoxFuture<'_, datafusion::parquet::errors::Result<Arc<ParquetMetaData>>> {
622625
println!("get_metadata: {} returning cached metadata", self.filename);
623626

datafusion/common/Cargo.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ log = { workspace = true }
6363
object_store = { workspace = true, optional = true }
6464
parquet = { workspace = true, optional = true, default-features = true }
6565
paste = "1.0.15"
66-
pyo3 = { version = "0.23.5", optional = true }
66+
pyo3 = { version = "0.24.0", optional = true }
6767
recursive = { workspace = true, optional = true }
6868
sqlparser = { workspace = true }
6969
tokio = { workspace = true }

datafusion/core/src/datasource/file_format/arrow.rs

+3-2
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,7 @@ impl FileFormat for ArrowFormat {
144144
for object in objects {
145145
let r = store.as_ref().get(&object.location).await?;
146146
let schema = match r.payload {
147+
#[cfg(not(target_arch = "wasm32"))]
147148
GetResultPayload::File(mut file, _) => {
148149
let reader = FileReader::try_new(&mut file, None)?;
149150
reader.schema()
@@ -442,7 +443,7 @@ mod tests {
442443
let object_meta = ObjectMeta {
443444
location,
444445
last_modified: DateTime::default(),
445-
size: usize::MAX,
446+
size: u64::MAX,
446447
e_tag: None,
447448
version: None,
448449
};
@@ -485,7 +486,7 @@ mod tests {
485486
let object_meta = ObjectMeta {
486487
location,
487488
last_modified: DateTime::default(),
488-
size: usize::MAX,
489+
size: u64::MAX,
489490
e_tag: None,
490491
version: None,
491492
};

datafusion/core/src/datasource/file_format/csv.rs

+9-8
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ mod tests {
7272
#[derive(Debug)]
7373
struct VariableStream {
7474
bytes_to_repeat: Bytes,
75-
max_iterations: usize,
75+
max_iterations: u64,
7676
iterations_detected: Arc<Mutex<usize>>,
7777
}
7878

@@ -103,14 +103,15 @@ mod tests {
103103

104104
async fn get(&self, location: &Path) -> object_store::Result<GetResult> {
105105
let bytes = self.bytes_to_repeat.clone();
106-
let range = 0..bytes.len() * self.max_iterations;
106+
let len = bytes.len() as u64;
107+
let range = 0..len * self.max_iterations;
107108
let arc = self.iterations_detected.clone();
108109
let stream = futures::stream::repeat_with(move || {
109110
let arc_inner = arc.clone();
110111
*arc_inner.lock().unwrap() += 1;
111112
Ok(bytes.clone())
112113
})
113-
.take(self.max_iterations)
114+
.take(self.max_iterations as usize)
114115
.boxed();
115116

116117
Ok(GetResult {
@@ -138,7 +139,7 @@ mod tests {
138139
async fn get_ranges(
139140
&self,
140141
_location: &Path,
141-
_ranges: &[Range<usize>],
142+
_ranges: &[Range<u64>],
142143
) -> object_store::Result<Vec<Bytes>> {
143144
unimplemented!()
144145
}
@@ -154,7 +155,7 @@ mod tests {
154155
fn list(
155156
&self,
156157
_prefix: Option<&Path>,
157-
) -> BoxStream<'_, object_store::Result<ObjectMeta>> {
158+
) -> BoxStream<'static, object_store::Result<ObjectMeta>> {
158159
unimplemented!()
159160
}
160161

@@ -179,7 +180,7 @@ mod tests {
179180
}
180181

181182
impl VariableStream {
182-
pub fn new(bytes_to_repeat: Bytes, max_iterations: usize) -> Self {
183+
pub fn new(bytes_to_repeat: Bytes, max_iterations: u64) -> Self {
183184
Self {
184185
bytes_to_repeat,
185186
max_iterations,
@@ -371,7 +372,7 @@ mod tests {
371372
let object_meta = ObjectMeta {
372373
location: Path::parse("/")?,
373374
last_modified: DateTime::default(),
374-
size: usize::MAX,
375+
size: u64::MAX,
375376
e_tag: None,
376377
version: None,
377378
};
@@ -429,7 +430,7 @@ mod tests {
429430
let object_meta = ObjectMeta {
430431
location: Path::parse("/")?,
431432
last_modified: DateTime::default(),
432-
size: usize::MAX,
433+
size: u64::MAX,
433434
e_tag: None,
434435
version: None,
435436
};

datafusion/core/src/datasource/file_format/mod.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,7 @@ mod tests {
127127
.write_parquet(out_dir_url, DataFrameWriteOptions::new(), None)
128128
.await
129129
.expect_err("should fail because input file does not match inferred schema");
130-
assert_eq!(e.strip_backtrace(), "Arrow error: Parser error: Error while parsing value d for column 0 at line 4");
130+
assert_eq!(e.strip_backtrace(), "Arrow error: Parser error: Error while parsing value 'd' as type 'Int64' for column 0 at line 4. Row data: '[d,4]'");
131131
Ok(())
132132
}
133133
}

datafusion/core/src/datasource/file_format/parquet.rs

+3-3
Original file line numberDiff line numberDiff line change
@@ -331,7 +331,7 @@ mod tests {
331331
fn list(
332332
&self,
333333
_prefix: Option<&Path>,
334-
) -> BoxStream<'_, object_store::Result<ObjectMeta>> {
334+
) -> BoxStream<'static, object_store::Result<ObjectMeta>> {
335335
Box::pin(futures::stream::once(async {
336336
Err(object_store::Error::NotImplemented)
337337
}))
@@ -408,7 +408,7 @@ mod tests {
408408
)));
409409

410410
// Use the file size as the hint so we can get the full metadata from the first fetch
411-
let size_hint = meta[0].size;
411+
let size_hint = meta[0].size as usize;
412412

413413
fetch_parquet_metadata(store.upcast().as_ref(), &meta[0], Some(size_hint))
414414
.await
@@ -443,7 +443,7 @@ mod tests {
443443
)));
444444

445445
// Use the a size hint larger than the file size to make sure we don't panic
446-
let size_hint = meta[0].size + 100;
446+
let size_hint = (meta[0].size + 100) as usize;
447447

448448
fetch_parquet_metadata(store.upcast().as_ref(), &meta[0], Some(size_hint))
449449
.await

datafusion/core/src/datasource/mod.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ mod tests {
106106
let meta = ObjectMeta {
107107
location,
108108
last_modified: metadata.modified().map(chrono::DateTime::from).unwrap(),
109-
size: metadata.len() as usize,
109+
size: metadata.len(),
110110
e_tag: None,
111111
version: None,
112112
};

datafusion/core/src/datasource/physical_plan/arrow_file.rs

+11-10
Original file line numberDiff line numberDiff line change
@@ -273,6 +273,7 @@ impl FileOpener for ArrowOpener {
273273
None => {
274274
let r = object_store.get(file_meta.location()).await?;
275275
match r.payload {
276+
#[cfg(not(target_arch = "wasm32"))]
276277
GetResultPayload::File(file, _) => {
277278
let arrow_reader = arrow::ipc::reader::FileReader::try_new(
278279
file, projection,
@@ -305,7 +306,7 @@ impl FileOpener for ArrowOpener {
305306
)?;
306307
// read footer according to footer_len
307308
let get_option = GetOptions {
308-
range: Some(GetRange::Suffix(10 + footer_len)),
309+
range: Some(GetRange::Suffix(10 + (footer_len as u64))),
309310
..Default::default()
310311
};
311312
let get_result = object_store
@@ -332,9 +333,9 @@ impl FileOpener for ArrowOpener {
332333
.iter()
333334
.flatten()
334335
.map(|block| {
335-
let block_len = block.bodyLength() as usize
336-
+ block.metaDataLength() as usize;
337-
let block_offset = block.offset() as usize;
336+
let block_len =
337+
block.bodyLength() as u64 + block.metaDataLength() as u64;
338+
let block_offset = block.offset() as u64;
338339
block_offset..block_offset + block_len
339340
})
340341
.collect_vec();
@@ -354,19 +355,19 @@ impl FileOpener for ArrowOpener {
354355
.iter()
355356
.flatten()
356357
.filter(|block| {
357-
let block_offset = block.offset() as usize;
358-
block_offset >= range.start as usize
359-
&& block_offset < range.end as usize
358+
let block_offset = block.offset() as u64;
359+
block_offset >= range.start as u64
360+
&& block_offset < range.end as u64
360361
})
361362
.copied()
362363
.collect_vec();
363364

364365
let recordbatch_ranges = recordbatches
365366
.iter()
366367
.map(|block| {
367-
let block_len = block.bodyLength() as usize
368-
+ block.metaDataLength() as usize;
369-
let block_offset = block.offset() as usize;
368+
let block_len =
369+
block.bodyLength() as u64 + block.metaDataLength() as u64;
370+
let block_offset = block.offset() as u64;
370371
block_offset..block_offset + block_len
371372
})
372373
.collect_vec();

datafusion/core/src/datasource/physical_plan/csv.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -658,7 +658,7 @@ mod tests {
658658
)
659659
.await
660660
.expect_err("should fail because input file does not match inferred schema");
661-
assert_eq!(e.strip_backtrace(), "Arrow error: Parser error: Error while parsing value d for column 0 at line 4");
661+
assert_eq!(e.strip_backtrace(), "Arrow error: Parser error: Error while parsing value 'd' as type 'Int64' for column 0 at line 4. Row data: '[d,4]'");
662662
Ok(())
663663
}
664664

datafusion/core/src/datasource/physical_plan/json.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -495,7 +495,7 @@ mod tests {
495495
.write_json(out_dir_url, DataFrameWriteOptions::new(), None)
496496
.await
497497
.expect_err("should fail because input file does not match inferred schema");
498-
assert_eq!(e.strip_backtrace(), "Arrow error: Parser error: Error while parsing value d for column 0 at line 4");
498+
assert_eq!(e.strip_backtrace(), "Arrow error: Parser error: Error while parsing value 'd' as type 'Int64' for column 0 at line 4. Row data: '[d,4]'");
499499
Ok(())
500500
}
501501

datafusion/core/src/datasource/physical_plan/parquet.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -1786,13 +1786,13 @@ mod tests {
17861786
path: &str,
17871787
store: Arc<dyn ObjectStore>,
17881788
batch: RecordBatch,
1789-
) -> usize {
1789+
) -> u64 {
17901790
let mut writer =
17911791
ArrowWriter::try_new(BytesMut::new().writer(), batch.schema(), None).unwrap();
17921792
writer.write(&batch).unwrap();
17931793
writer.flush().unwrap();
17941794
let bytes = writer.into_inner().unwrap().into_inner().freeze();
1795-
let total_size = bytes.len();
1795+
let total_size = bytes.len() as u64;
17961796
let path = Path::from(path);
17971797
let payload = object_store::PutPayload::from_bytes(bytes);
17981798
store

datafusion/core/src/test/object_store.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ pub fn local_unpartitioned_file(path: impl AsRef<std::path::Path>) -> ObjectMeta
6666
ObjectMeta {
6767
location,
6868
last_modified: metadata.modified().map(chrono::DateTime::from).unwrap(),
69-
size: metadata.len() as usize,
69+
size: metadata.len(),
7070
e_tag: None,
7171
version: None,
7272
}
@@ -166,7 +166,7 @@ impl ObjectStore for BlockingObjectStore {
166166
fn list(
167167
&self,
168168
prefix: Option<&Path>,
169-
) -> BoxStream<'_, object_store::Result<ObjectMeta>> {
169+
) -> BoxStream<'static, object_store::Result<ObjectMeta>> {
170170
self.inner.list(prefix)
171171
}
172172

datafusion/core/src/test_util/parquet.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ impl TestParquetFile {
102102

103103
println!("Generated test dataset with {num_rows} rows");
104104

105-
let size = std::fs::metadata(&path)?.len() as usize;
105+
let size = std::fs::metadata(&path)?.len();
106106

107107
let mut canonical_path = path.canonicalize()?;
108108

datafusion/core/tests/parquet/custom_reader.rs

+6-3
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ use insta::assert_snapshot;
4444
use object_store::memory::InMemory;
4545
use object_store::path::Path;
4646
use object_store::{ObjectMeta, ObjectStore};
47+
use parquet::arrow::arrow_reader::ArrowReaderOptions;
4748
use parquet::arrow::async_reader::AsyncFileReader;
4849
use parquet::arrow::ArrowWriter;
4950
use parquet::errors::ParquetError;
@@ -186,7 +187,7 @@ async fn store_parquet_in_memory(
186187
location: Path::parse(format!("file-{offset}.parquet"))
187188
.expect("creating path"),
188189
last_modified: chrono::DateTime::from(SystemTime::now()),
189-
size: buf.len(),
190+
size: buf.len() as u64,
190191
e_tag: None,
191192
version: None,
192193
};
@@ -218,9 +219,10 @@ struct ParquetFileReader {
218219
impl AsyncFileReader for ParquetFileReader {
219220
fn get_bytes(
220221
&mut self,
221-
range: Range<usize>,
222+
range: Range<u64>,
222223
) -> BoxFuture<'_, parquet::errors::Result<Bytes>> {
223-
self.metrics.bytes_scanned.add(range.end - range.start);
224+
let bytes_scanned = range.end - range.start;
225+
self.metrics.bytes_scanned.add(bytes_scanned as usize);
224226

225227
self.store
226228
.get_range(&self.meta.location, range)
@@ -232,6 +234,7 @@ impl AsyncFileReader for ParquetFileReader {
232234

233235
fn get_metadata(
234236
&mut self,
237+
_options: Option<&ArrowReaderOptions>,
235238
) -> BoxFuture<'_, parquet::errors::Result<Arc<ParquetMetaData>>> {
236239
Box::pin(async move {
237240
let metadata = fetch_parquet_metadata(

datafusion/core/tests/parquet/page_pruning.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ async fn get_parquet_exec(state: &SessionState, filter: Expr) -> DataSourceExec
5252
let meta = ObjectMeta {
5353
location,
5454
last_modified: metadata.modified().map(chrono::DateTime::from).unwrap(),
55-
size: metadata.len() as usize,
55+
size: metadata.len(),
5656
e_tag: None,
5757
version: None,
5858
};

0 commit comments

Comments
 (0)