Skip to content

Commit 0b6678b

Browse files
authored
chore: Use materialized data for filter pushdown tests (#16123)
* chore: Use pre created data for filter pushdown tests * chore: Use pre created data for filter pushdown tests
1 parent 37c266a commit 0b6678b

File tree

5 files changed

+40
-38
lines changed

5 files changed

+40
-38
lines changed

datafusion/core/src/test_util/parquet.rs

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -83,23 +83,22 @@ impl TestParquetFile {
8383
props: WriterProperties,
8484
batches: impl IntoIterator<Item = RecordBatch>,
8585
) -> Result<Self> {
86-
let file = File::create(&path).unwrap();
86+
let file = File::create(&path)?;
8787

8888
let mut batches = batches.into_iter();
8989
let first_batch = batches.next().expect("need at least one record batch");
9090
let schema = first_batch.schema();
9191

92-
let mut writer =
93-
ArrowWriter::try_new(file, Arc::clone(&schema), Some(props)).unwrap();
92+
let mut writer = ArrowWriter::try_new(file, Arc::clone(&schema), Some(props))?;
9493

95-
writer.write(&first_batch).unwrap();
94+
writer.write(&first_batch)?;
9695
let mut num_rows = first_batch.num_rows();
9796

9897
for batch in batches {
99-
writer.write(&batch).unwrap();
98+
writer.write(&batch)?;
10099
num_rows += batch.num_rows();
101100
}
102-
writer.close().unwrap();
101+
writer.close()?;
103102

104103
println!("Generated test dataset with {num_rows} rows");
105104

Binary file not shown.
Binary file not shown.

datafusion/core/tests/fuzz_cases/join_fuzz.rs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -746,11 +746,9 @@ impl JoinFuzzTestCase {
746746
path.to_str().unwrap(),
747747
datafusion::prelude::ParquetReadOptions::default(),
748748
)
749-
.await
750-
.unwrap()
749+
.await?
751750
.collect()
752-
.await
753-
.unwrap();
751+
.await?;
754752

755753
batches.append(&mut batch);
756754
}

datafusion/core/tests/parquet/filter_pushdown.rs

Lines changed: 33 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -32,50 +32,41 @@ use arrow::compute::concat_batches;
3232
use arrow::record_batch::RecordBatch;
3333
use datafusion::physical_plan::collect;
3434
use datafusion::physical_plan::metrics::MetricsSet;
35-
use datafusion::prelude::{col, lit, lit_timestamp_nano, Expr, SessionContext};
35+
use datafusion::prelude::{
36+
col, lit, lit_timestamp_nano, Expr, ParquetReadOptions, SessionContext,
37+
};
3638
use datafusion::test_util::parquet::{ParquetScanOptions, TestParquetFile};
37-
use datafusion_common::instant::Instant;
3839
use datafusion_expr::utils::{conjunction, disjunction, split_conjunction};
3940

4041
use itertools::Itertools;
4142
use parquet::file::properties::WriterProperties;
4243
use tempfile::TempDir;
43-
use test_utils::AccessLogGenerator;
4444

4545
/// how many rows of generated data to write to our parquet file (arbitrary)
4646
const NUM_ROWS: usize = 4096;
4747

48-
fn generate_file(tempdir: &TempDir, props: WriterProperties) -> TestParquetFile {
49-
// Tune down the generator for smaller files
50-
let generator = AccessLogGenerator::new()
51-
.with_row_limit(NUM_ROWS)
52-
.with_pods_per_host(1..4)
53-
.with_containers_per_pod(1..2)
54-
.with_entries_per_container(128..256);
55-
56-
let file = tempdir.path().join("data.parquet");
57-
58-
let start = Instant::now();
59-
println!("Writing test data to {file:?}");
60-
let test_parquet_file = TestParquetFile::try_new(file, props, generator).unwrap();
61-
println!(
62-
"Completed generating test data in {:?}",
63-
Instant::now() - start
64-
);
65-
test_parquet_file
66-
}
67-
6848
#[tokio::test]
6949
async fn single_file() {
7050
// Only create the parquet file once as it is fairly large
71-
7251
let tempdir = TempDir::new_in(Path::new(".")).unwrap();
7352
// Set row group size smaller so can test with fewer rows
7453
let props = WriterProperties::builder()
7554
.set_max_row_group_size(1024)
7655
.build();
77-
let test_parquet_file = generate_file(&tempdir, props);
78-
56+
let ctx: SessionContext = SessionContext::new();
57+
let batches = ctx
58+
.read_parquet(
59+
"tests/data/filter_pushdown/single_file.gz.parquet".to_string(),
60+
ParquetReadOptions::default(),
61+
)
62+
.await
63+
.unwrap()
64+
.collect()
65+
.await
66+
.unwrap();
67+
let test_parquet_file =
68+
TestParquetFile::try_new(tempdir.path().join("data.parquet"), props, batches)
69+
.unwrap();
7970
let case = TestCase::new(&test_parquet_file)
8071
.with_name("selective")
8172
// request_method = 'GET'
@@ -227,13 +218,27 @@ async fn single_file() {
227218
async fn single_file_small_data_pages() {
228219
let tempdir = TempDir::new_in(Path::new(".")).unwrap();
229220

230-
// Set low row count limit to improve page filtering
221+
// Set a low row count limit to improve page filtering
231222
let props = WriterProperties::builder()
232223
.set_max_row_group_size(2048)
233224
.set_data_page_row_count_limit(512)
234225
.set_write_batch_size(512)
235226
.build();
236-
let test_parquet_file = generate_file(&tempdir, props);
227+
228+
let ctx: SessionContext = SessionContext::new();
229+
let batches = ctx
230+
.read_parquet(
231+
"tests/data/filter_pushdown/single_file_small_pages.gz.parquet".to_string(),
232+
ParquetReadOptions::default(),
233+
)
234+
.await
235+
.unwrap()
236+
.collect()
237+
.await
238+
.unwrap();
239+
let test_parquet_file =
240+
TestParquetFile::try_new(tempdir.path().join("data.parquet"), props, batches)
241+
.unwrap();
237242

238243
// The statistics on the 'pod' column are as follows:
239244
//

0 commit comments

Comments
 (0)