Skip to content

Commit ff301c8

Browse files
authored
Add restriction for enabling limit pruning (#21)
1 parent 19bbdff commit ff301c8

File tree

6 files changed

+55
-22
lines changed

6 files changed

+55
-22
lines changed

datafusion/core/src/datasource/listing/table.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1253,6 +1253,7 @@ impl TableProvider for ListingTable {
12531253
.with_output_ordering(output_ordering)
12541254
.with_table_partition_cols(table_partition_cols)
12551255
.with_expr_adapter(self.expr_adapter_factory.clone())
1256+
.with_limit_pruning(limit.is_some())
12561257
.build(),
12571258
)
12581259
.await

datafusion/core/tests/parquet/mod.rs

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -150,11 +150,18 @@ impl TestOutput {
150150
self.metric_value("row_groups_matched_statistics")
151151
}
152152

153+
/*
153154
/// The number of row_groups fully matched by statistics
154155
fn row_groups_fully_matched_statistics(&self) -> Option<usize> {
155156
self.metric_value("row_groups_fully_matched_statistics")
156157
}
157158
159+
/// The number of row groups pruned by limit pruning
160+
fn limit_pruned_row_groups(&self) -> Option<usize> {
161+
self.metric_value("limit_pruned_row_groups")
162+
}
163+
*/
164+
158165
/// The number of row_groups pruned by statistics
159166
fn row_groups_pruned_statistics(&self) -> Option<usize> {
160167
self.metric_value("row_groups_pruned_statistics")
@@ -183,11 +190,6 @@ impl TestOutput {
183190
self.metric_value("page_index_rows_pruned")
184191
}
185192

186-
/// The number of row groups pruned by limit pruning
187-
fn limit_pruned_row_groups(&self) -> Option<usize> {
188-
self.metric_value("limit_pruned_row_groups")
189-
}
190-
191193
fn description(&self) -> String {
192194
format!(
193195
"Input:\n{}\nQuery:\n{}\nOutput:\n{}\nMetrics:\n{}",
@@ -204,7 +206,8 @@ impl ContextWithParquet {
204206
Self::with_config(scenario, unit, SessionConfig::new(), None, None).await
205207
}
206208

207-
/// Set custom schema and batches for the test
209+
// Set custom schema and batches for the test
210+
/*
208211
pub async fn with_custom_data(
209212
scenario: Scenario,
210213
unit: Unit,
@@ -220,6 +223,7 @@ impl ContextWithParquet {
220223
)
221224
.await
222225
}
226+
*/
223227

224228
async fn with_config(
225229
scenario: Scenario,

datafusion/core/tests/parquet/row_group_pruning.rs

Lines changed: 16 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,8 @@
1818
//! This file contains an end to end test of parquet pruning. It writes
1919
//! data into a parquet file and then verifies row groups are pruned as
2020
//! expected.
21-
use std::sync::Arc;
22-
23-
use arrow::array::{ArrayRef, Int32Array, RecordBatch};
24-
use arrow_schema::{DataType, Field, Schema};
2521
use datafusion::prelude::SessionConfig;
26-
use datafusion_common::{DataFusionError, ScalarValue};
22+
use datafusion_common::ScalarValue;
2723
use itertools::Itertools;
2824

2925
use crate::parquet::Unit::RowGroup;
@@ -34,12 +30,12 @@ struct RowGroupPruningTest {
3430
query: String,
3531
expected_errors: Option<usize>,
3632
expected_row_group_matched_by_statistics: Option<usize>,
37-
expected_row_group_fully_matched_by_statistics: Option<usize>,
33+
// expected_row_group_fully_matched_by_statistics: Option<usize>,
3834
expected_row_group_pruned_by_statistics: Option<usize>,
3935
expected_files_pruned_by_statistics: Option<usize>,
4036
expected_row_group_matched_by_bloom_filter: Option<usize>,
4137
expected_row_group_pruned_by_bloom_filter: Option<usize>,
42-
expected_limit_pruned_row_groups: Option<usize>,
38+
// expected_limit_pruned_row_groups: Option<usize>,
4339
expected_rows: usize,
4440
}
4541
impl RowGroupPruningTest {
@@ -51,11 +47,11 @@ impl RowGroupPruningTest {
5147
expected_errors: None,
5248
expected_row_group_matched_by_statistics: None,
5349
expected_row_group_pruned_by_statistics: None,
54-
expected_row_group_fully_matched_by_statistics: None,
50+
// expected_row_group_fully_matched_by_statistics: None,
5551
expected_files_pruned_by_statistics: None,
5652
expected_row_group_matched_by_bloom_filter: None,
5753
expected_row_group_pruned_by_bloom_filter: None,
58-
expected_limit_pruned_row_groups: None,
54+
// expected_limit_pruned_row_groups: None,
5955
expected_rows: 0,
6056
}
6157
}
@@ -85,6 +81,7 @@ impl RowGroupPruningTest {
8581
}
8682

8783
// Set the expected fully matched row groups by statistics
84+
/*
8885
fn with_fully_matched_by_stats(
8986
mut self,
9087
fully_matched_by_stats: Option<usize>,
@@ -93,6 +90,12 @@ impl RowGroupPruningTest {
9390
self
9491
}
9592
93+
fn with_limit_pruned_row_groups(mut self, pruned_by_limit: Option<usize>) -> Self {
94+
self.expected_limit_pruned_row_groups = pruned_by_limit;
95+
self
96+
}
97+
*/
98+
9699
// Set the expected pruned row groups by statistics
97100
fn with_pruned_by_stats(mut self, pruned_by_stats: Option<usize>) -> Self {
98101
self.expected_row_group_pruned_by_statistics = pruned_by_stats;
@@ -116,11 +119,6 @@ impl RowGroupPruningTest {
116119
self
117120
}
118121

119-
fn with_limit_pruned_row_groups(mut self, pruned_by_limit: Option<usize>) -> Self {
120-
self.expected_limit_pruned_row_groups = pruned_by_limit;
121-
self
122-
}
123-
124122
/// Set the number of expected rows from the output of this test
125123
fn with_expected_rows(mut self, rows: usize) -> Self {
126124
self.expected_rows = rows;
@@ -177,6 +175,7 @@ impl RowGroupPruningTest {
177175
}
178176

179177
// Execute the test with the current configuration
178+
/*
180179
async fn test_row_group_prune_with_custom_data(
181180
self,
182181
schema: Arc<Schema>,
@@ -233,6 +232,7 @@ impl RowGroupPruningTest {
233232
output.description(),
234233
);
235234
}
235+
*/
236236
}
237237

238238
#[tokio::test]
@@ -1721,6 +1721,7 @@ async fn test_bloom_filter_decimal_dict() {
17211721
.await;
17221722
}
17231723

1724+
/*
17241725
// Helper function to create a batch with a single Int32 column.
17251726
fn make_i32_batch(
17261727
name: &str,
@@ -1958,3 +1959,4 @@ async fn test_limit_pruning_exceeds_fully_matched() -> datafusion_common::error:
19581959
19591960
Ok(())
19601961
}
1962+
*/

datafusion/datasource-parquet/src/opener.rs

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,8 @@ pub(super) struct ParquetOpener {
9797
pub enable_row_group_stats_pruning: bool,
9898
/// Coerce INT96 timestamps to specific TimeUnit
9999
pub coerce_int96: Option<TimeUnit>,
100+
/// Should limit pruning be applied
101+
pub enable_limit_pruning: bool,
100102
/// Optional parquet FileDecryptionProperties
101103
#[cfg(feature = "parquet_encryption")]
102104
pub file_decryption_properties: Option<Arc<FileDecryptionProperties>>,
@@ -144,6 +146,7 @@ impl FileOpener for ParquetOpener {
144146
let enable_bloom_filter = self.enable_bloom_filter;
145147
let enable_row_group_stats_pruning = self.enable_row_group_stats_pruning;
146148
let limit = self.limit;
149+
let enable_limit_pruning = self.enable_limit_pruning;
147150

148151
let predicate_creation_errors = MetricBuilder::new(&self.metrics)
149152
.global_counter("num_predicate_creation_errors");
@@ -377,8 +380,10 @@ impl FileOpener for ParquetOpener {
377380
}
378381

379382
// Prune by limit
380-
if let Some(limit) = limit {
381-
row_groups.prune_by_limit(limit, rg_metadata, &file_metrics);
383+
if enable_limit_pruning {
384+
if let Some(limit) = limit {
385+
row_groups.prune_by_limit(limit, rg_metadata, &file_metrics);
386+
}
382387
}
383388

384389
let mut access_plan = row_groups.build();
@@ -826,6 +831,7 @@ mod test {
826831
reorder_filters: false,
827832
enable_page_index: false,
828833
enable_bloom_filter: false,
834+
enable_limit_pruning: false,
829835
schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory),
830836
enable_row_group_stats_pruning: true,
831837
coerce_int96: None,
@@ -914,6 +920,7 @@ mod test {
914920
reorder_filters: false,
915921
enable_page_index: false,
916922
enable_bloom_filter: false,
923+
enable_limit_pruning: false,
917924
schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory),
918925
enable_row_group_stats_pruning: true,
919926
coerce_int96: None,
@@ -1018,6 +1025,7 @@ mod test {
10181025
reorder_filters: false,
10191026
enable_page_index: false,
10201027
enable_bloom_filter: false,
1028+
enable_limit_pruning: false,
10211029
schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory),
10221030
enable_row_group_stats_pruning: true,
10231031
coerce_int96: None,
@@ -1132,6 +1140,7 @@ mod test {
11321140
reorder_filters: true,
11331141
enable_page_index: false,
11341142
enable_bloom_filter: false,
1143+
enable_limit_pruning: false,
11351144
schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory),
11361145
enable_row_group_stats_pruning: false, // note that this is false!
11371146
coerce_int96: None,
@@ -1247,6 +1256,7 @@ mod test {
12471256
reorder_filters: false,
12481257
enable_page_index: false,
12491258
enable_bloom_filter: false,
1259+
enable_limit_pruning: false,
12501260
schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory),
12511261
enable_row_group_stats_pruning: true,
12521262
coerce_int96: None,
@@ -1429,6 +1439,7 @@ mod test {
14291439
reorder_filters: false,
14301440
enable_page_index: false,
14311441
enable_bloom_filter: false,
1442+
enable_limit_pruning: false,
14321443
schema_adapter_factory: Arc::new(CustomSchemaAdapterFactory),
14331444
enable_row_group_stats_pruning: false,
14341445
coerce_int96: None,

datafusion/datasource-parquet/src/source.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -576,6 +576,7 @@ impl FileSource for ParquetSource {
576576
enable_page_index: self.enable_page_index(),
577577
enable_bloom_filter: self.bloom_filter_on_read(),
578578
enable_row_group_stats_pruning: self.table_parquet_options.global.pruning,
579+
enable_limit_pruning: base_config.limit_pruning,
579580
schema_adapter_factory,
580581
coerce_int96,
581582
#[cfg(feature = "parquet_encryption")]

datafusion/datasource/src/file_scan_config.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -196,6 +196,8 @@ pub struct FileScanConfig {
196196
/// Expression adapter used to adapt filters and projections that are pushed down into the scan
197197
/// from the logical schema to the physical schema of the file.
198198
pub expr_adapter_factory: Option<Arc<dyn PhysicalExprAdapterFactory>>,
199+
/// If there is a limit pushed down at the logical plan level, we can enable limit_pruning
200+
pub limit_pruning: bool,
199201
}
200202

201203
/// A builder for [`FileScanConfig`]'s.
@@ -275,6 +277,8 @@ pub struct FileScanConfigBuilder {
275277
new_lines_in_values: Option<bool>,
276278
batch_size: Option<usize>,
277279
expr_adapter_factory: Option<Arc<dyn PhysicalExprAdapterFactory>>,
280+
/// If there is a limit pushed down at the logical plan level, we can enable limit_pruning
281+
limit_pruning: bool,
278282
}
279283

280284
impl FileScanConfigBuilder {
@@ -304,6 +308,7 @@ impl FileScanConfigBuilder {
304308
constraints: None,
305309
batch_size: None,
306310
expr_adapter_factory: None,
311+
limit_pruning: false,
307312
}
308313
}
309314

@@ -426,6 +431,12 @@ impl FileScanConfigBuilder {
426431
self
427432
}
428433

434+
/// Enable or disable limit pruning.
435+
pub fn with_limit_pruning(mut self, limit_pruning: bool) -> Self {
436+
self.limit_pruning = limit_pruning;
437+
self
438+
}
439+
429440
/// Build the final [`FileScanConfig`] with all the configured settings.
430441
///
431442
/// This method takes ownership of the builder and returns the constructed `FileScanConfig`.
@@ -446,6 +457,7 @@ impl FileScanConfigBuilder {
446457
new_lines_in_values,
447458
batch_size,
448459
expr_adapter_factory: expr_adapter,
460+
limit_pruning,
449461
} = self;
450462

451463
let constraints = constraints.unwrap_or_default();
@@ -473,6 +485,7 @@ impl FileScanConfigBuilder {
473485
new_lines_in_values,
474486
batch_size,
475487
expr_adapter_factory: expr_adapter,
488+
limit_pruning,
476489
}
477490
}
478491
}
@@ -494,6 +507,7 @@ impl From<FileScanConfig> for FileScanConfigBuilder {
494507
constraints: Some(config.constraints),
495508
batch_size: config.batch_size,
496509
expr_adapter_factory: config.expr_adapter_factory,
510+
limit_pruning: config.limit_pruning,
497511
}
498512
}
499513
}

0 commit comments

Comments
 (0)