Skip to content

Commit 819913a

Browse files
committed
Add basic test
1 parent c8069a5 commit 819913a

File tree

2 files changed

+37
-4
lines changed

2 files changed

+37
-4
lines changed

parquet/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ brotli = "3.3"
5757
flate2 = "1.0"
5858
lz4 = "1.23"
5959
serde_json = { version = "1.0", features = ["preserve_order"] }
60-
arrow = { path = "../arrow", version = "8.0.0", default-features = false, features = ["test_utils"] }
60+
arrow = { path = "../arrow", version = "8.0.0", default-features = false, features = ["test_utils", "prettyprint"] }
6161

6262
[features]
6363
default = ["arrow", "snap", "brotli", "flate2", "lz4", "zstd", "base64"]

parquet/src/arrow/async_reader.rs

Lines changed: 36 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -427,24 +427,57 @@ impl PageIterator for ColumnChunkIterator {
427427

428428
#[cfg(test)]
429429
mod tests {
430+
use arrow::util::pretty::pretty_format_batches;
430431
use futures::TryStreamExt;
431432
use tokio::fs::File;
432433

433434
use super::*;
434435

436+
fn assert_batches_eq(batches: &[RecordBatch], expected_lines: &[&str]) {
437+
let formatted = pretty_format_batches(batches).unwrap().to_string();
438+
let actual_lines: Vec<_> = formatted.trim().lines().collect();
439+
assert_eq!(
440+
&actual_lines, expected_lines,
441+
"\n\nexpected:\n\n{:#?}\nactual:\n\n{:#?}\n\n",
442+
expected_lines, actual_lines
443+
);
444+
}
445+
435446
#[tokio::test]
436447
async fn test_parquet_stream() {
437448
let testdata = arrow::util::test_util::parquet_test_data();
438-
let path = format!("{}/nested_structs.rust.parquet", testdata);
449+
let path = format!("{}/alltypes_plain.parquet", testdata);
439450
let file = File::open(path).await.unwrap();
440451

441-
let stream = ParquetRecordBatchStreamBuilder::new(file)
452+
let builder = ParquetRecordBatchStreamBuilder::new(file)
442453
.await
443454
.unwrap()
455+
.with_projection(vec![1, 2, 6])
456+
.with_batch_size(3);
457+
458+
let stream = builder
444459
.build()
445460
.unwrap();
446461

447462
let results = stream.try_collect::<Vec<_>>().await.unwrap();
448-
println!("{:?}", results);
463+
assert_eq!(results.len(), 3);
464+
465+
assert_batches_eq(
466+
&results,
467+
&[
468+
"+----------+-------------+-----------+",
469+
"| bool_col | tinyint_col | float_col |",
470+
"+----------+-------------+-----------+",
471+
"| true | 0 | 0 |",
472+
"| false | 1 | 1.1 |",
473+
"| true | 0 | 0 |",
474+
"| false | 1 | 1.1 |",
475+
"| true | 0 | 0 |",
476+
"| false | 1 | 1.1 |",
477+
"| true | 0 | 0 |",
478+
"| false | 1 | 1.1 |",
479+
"+----------+-------------+-----------+",
480+
],
481+
);
449482
}
450483
}

0 commit comments

Comments
 (0)