Skip to content

Commit 5f88106

Browse files
alambappletreeisyellow
authored andcommitted
Fix regression with Incorrect results when reading parquet files with different schemas and statistics (apache#8533)
* Add test for schema evolution * Fix reading parquet statistics * Update tests for fix * Add comments to help explain the test * Add another test
1 parent 50e5612 commit 5f88106

File tree

3 files changed

+256
-33
lines changed

3 files changed

+256
-33
lines changed

datafusion/core/src/datasource/physical_plan/parquet/mod.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -468,8 +468,10 @@ impl FileOpener for ParquetOpener {
468468
ParquetRecordBatchStreamBuilder::new_with_options(reader, options)
469469
.await?;
470470

471+
let file_schema = builder.schema().clone();
472+
471473
let (schema_mapping, adapted_projections) =
472-
schema_adapter.map_schema(builder.schema())?;
474+
schema_adapter.map_schema(&file_schema)?;
473475
// let predicate = predicate.map(|p| reassign_predicate_columns(p, builder.schema(), true)).transpose()?;
474476

475477
let mask = ProjectionMask::roots(
@@ -481,8 +483,8 @@ impl FileOpener for ParquetOpener {
481483
if let Some(predicate) = pushdown_filters.then_some(predicate).flatten() {
482484
let row_filter = row_filter::build_row_filter(
483485
&predicate,
484-
builder.schema().as_ref(),
485-
table_schema.as_ref(),
486+
&file_schema,
487+
&table_schema,
486488
builder.metadata(),
487489
reorder_predicates,
488490
&file_metrics,
@@ -507,6 +509,7 @@ impl FileOpener for ParquetOpener {
507509
let file_metadata = builder.metadata().clone();
508510
let predicate = pruning_predicate.as_ref().map(|p| p.as_ref());
509511
let mut row_groups = row_groups::prune_row_groups_by_statistics(
512+
&file_schema,
510513
builder.parquet_schema(),
511514
file_metadata.row_groups(),
512515
file_range,

datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs

Lines changed: 110 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ use super::ParquetFileMetrics;
5555
/// Note: This method currently ignores ColumnOrder
5656
/// <https://github.com/apache/arrow-datafusion/issues/8335>
5757
pub(crate) fn prune_row_groups_by_statistics(
58+
arrow_schema: &Schema,
5859
parquet_schema: &SchemaDescriptor,
5960
groups: &[RowGroupMetaData],
6061
range: Option<FileRange>,
@@ -80,7 +81,7 @@ pub(crate) fn prune_row_groups_by_statistics(
8081
let pruning_stats = RowGroupPruningStatistics {
8182
parquet_schema,
8283
row_group_metadata: metadata,
83-
arrow_schema: predicate.schema().as_ref(),
84+
arrow_schema,
8485
};
8586
match predicate.prune(&pruning_stats) {
8687
Ok(values) => {
@@ -416,11 +417,11 @@ mod tests {
416417
fn row_group_pruning_predicate_simple_expr() {
417418
use datafusion_expr::{col, lit};
418419
// int > 1 => c1_max > 1
419-
let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
420+
let schema =
421+
Arc::new(Schema::new(vec![Field::new("c1", DataType::Int32, false)]));
420422
let expr = col("c1").gt(lit(15));
421423
let expr = logical2physical(&expr, &schema);
422-
let pruning_predicate =
423-
PruningPredicate::try_new(expr, Arc::new(schema)).unwrap();
424+
let pruning_predicate = PruningPredicate::try_new(expr, schema.clone()).unwrap();
424425

425426
let field = PrimitiveTypeField::new("c1", PhysicalType::INT32);
426427
let schema_descr = get_test_schema_descr(vec![field]);
@@ -436,6 +437,7 @@ mod tests {
436437
let metrics = parquet_file_metrics();
437438
assert_eq!(
438439
prune_row_groups_by_statistics(
440+
&schema,
439441
&schema_descr,
440442
&[rgm1, rgm2],
441443
None,
@@ -450,11 +452,11 @@ mod tests {
450452
fn row_group_pruning_predicate_missing_stats() {
451453
use datafusion_expr::{col, lit};
452454
// int > 1 => c1_max > 1
453-
let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
455+
let schema =
456+
Arc::new(Schema::new(vec![Field::new("c1", DataType::Int32, false)]));
454457
let expr = col("c1").gt(lit(15));
455458
let expr = logical2physical(&expr, &schema);
456-
let pruning_predicate =
457-
PruningPredicate::try_new(expr, Arc::new(schema)).unwrap();
459+
let pruning_predicate = PruningPredicate::try_new(expr, schema.clone()).unwrap();
458460

459461
let field = PrimitiveTypeField::new("c1", PhysicalType::INT32);
460462
let schema_descr = get_test_schema_descr(vec![field]);
@@ -471,6 +473,7 @@ mod tests {
471473
// is null / undefined so the first row group can't be filtered out
472474
assert_eq!(
473475
prune_row_groups_by_statistics(
476+
&schema,
474477
&schema_descr,
475478
&[rgm1, rgm2],
476479
None,
@@ -519,6 +522,7 @@ mod tests {
519522
// when conditions are joined using AND
520523
assert_eq!(
521524
prune_row_groups_by_statistics(
525+
&schema,
522526
&schema_descr,
523527
groups,
524528
None,
@@ -532,12 +536,13 @@ mod tests {
532536
// this bypasses the entire predicate expression and no row groups are filtered out
533537
let expr = col("c1").gt(lit(15)).or(col("c2").rem(lit(2)).eq(lit(0)));
534538
let expr = logical2physical(&expr, &schema);
535-
let pruning_predicate = PruningPredicate::try_new(expr, schema).unwrap();
539+
let pruning_predicate = PruningPredicate::try_new(expr, schema.clone()).unwrap();
536540

537541
// if conditions in predicate are joined with OR and an unsupported expression is used
538542
// this bypasses the entire predicate expression and no row groups are filtered out
539543
assert_eq!(
540544
prune_row_groups_by_statistics(
545+
&schema,
541546
&schema_descr,
542547
groups,
543548
None,
@@ -548,6 +553,64 @@ mod tests {
548553
);
549554
}
550555

556+
#[test]
557+
fn row_group_pruning_predicate_file_schema() {
558+
use datafusion_expr::{col, lit};
559+
// test row group predicate when file schema is different than table schema
560+
// c1 > 0
561+
let table_schema = Arc::new(Schema::new(vec![
562+
Field::new("c1", DataType::Int32, false),
563+
Field::new("c2", DataType::Int32, false),
564+
]));
565+
let expr = col("c1").gt(lit(0));
566+
let expr = logical2physical(&expr, &table_schema);
567+
let pruning_predicate =
568+
PruningPredicate::try_new(expr, table_schema.clone()).unwrap();
569+
570+
// Model a file schema's column order c2 then c1, which is the opposite
571+
// of the table schema
572+
let file_schema = Arc::new(Schema::new(vec![
573+
Field::new("c2", DataType::Int32, false),
574+
Field::new("c1", DataType::Int32, false),
575+
]));
576+
let schema_descr = get_test_schema_descr(vec![
577+
PrimitiveTypeField::new("c2", PhysicalType::INT32),
578+
PrimitiveTypeField::new("c1", PhysicalType::INT32),
579+
]);
580+
// rg1 has c2 less than zero, c1 greater than zero
581+
let rgm1 = get_row_group_meta_data(
582+
&schema_descr,
583+
vec![
584+
ParquetStatistics::int32(Some(-10), Some(-1), None, 0, false), // c2
585+
ParquetStatistics::int32(Some(1), Some(10), None, 0, false),
586+
],
587+
);
588+
// rg1 has c2 greater than zero, c1 less than zero
589+
let rgm2 = get_row_group_meta_data(
590+
&schema_descr,
591+
vec![
592+
ParquetStatistics::int32(Some(1), Some(10), None, 0, false),
593+
ParquetStatistics::int32(Some(-10), Some(-1), None, 0, false),
594+
],
595+
);
596+
597+
let metrics = parquet_file_metrics();
598+
let groups = &[rgm1, rgm2];
599+
// the first row group should be left because c1 is greater than zero
600+
// the second should be filtered out because c1 is less than zero
601+
assert_eq!(
602+
prune_row_groups_by_statistics(
603+
&file_schema, // NB must be file schema, not table_schema
604+
&schema_descr,
605+
groups,
606+
None,
607+
Some(&pruning_predicate),
608+
&metrics
609+
),
610+
vec![0]
611+
);
612+
}
613+
551614
fn gen_row_group_meta_data_for_pruning_predicate() -> Vec<RowGroupMetaData> {
552615
let schema_descr = get_test_schema_descr(vec![
553616
PrimitiveTypeField::new("c1", PhysicalType::INT32),
@@ -581,13 +644,14 @@ mod tests {
581644
let schema_descr = arrow_to_parquet_schema(&schema).unwrap();
582645
let expr = col("c1").gt(lit(15)).and(col("c2").is_null());
583646
let expr = logical2physical(&expr, &schema);
584-
let pruning_predicate = PruningPredicate::try_new(expr, schema).unwrap();
647+
let pruning_predicate = PruningPredicate::try_new(expr, schema.clone()).unwrap();
585648
let groups = gen_row_group_meta_data_for_pruning_predicate();
586649

587650
let metrics = parquet_file_metrics();
588651
// First row group was filtered out because it contains no null value on "c2".
589652
assert_eq!(
590653
prune_row_groups_by_statistics(
654+
&schema,
591655
&schema_descr,
592656
&groups,
593657
None,
@@ -613,14 +677,15 @@ mod tests {
613677
.gt(lit(15))
614678
.and(col("c2").eq(lit(ScalarValue::Boolean(None))));
615679
let expr = logical2physical(&expr, &schema);
616-
let pruning_predicate = PruningPredicate::try_new(expr, schema).unwrap();
680+
let pruning_predicate = PruningPredicate::try_new(expr, schema.clone()).unwrap();
617681
let groups = gen_row_group_meta_data_for_pruning_predicate();
618682

619683
let metrics = parquet_file_metrics();
620684
// bool = NULL always evaluates to NULL (and thus will not
621685
// pass predicates. Ideally these should both be false
622686
assert_eq!(
623687
prune_row_groups_by_statistics(
688+
&schema,
624689
&schema_descr,
625690
&groups,
626691
None,
@@ -639,8 +704,11 @@ mod tests {
639704

640705
// INT32: c1 > 5, the c1 is decimal(9,2)
641706
// The type of scalar value if decimal(9,2), don't need to do cast
642-
let schema =
643-
Schema::new(vec![Field::new("c1", DataType::Decimal128(9, 2), false)]);
707+
let schema = Arc::new(Schema::new(vec![Field::new(
708+
"c1",
709+
DataType::Decimal128(9, 2),
710+
false,
711+
)]));
644712
let field = PrimitiveTypeField::new("c1", PhysicalType::INT32)
645713
.with_logical_type(LogicalType::Decimal {
646714
scale: 2,
@@ -651,8 +719,7 @@ mod tests {
651719
let schema_descr = get_test_schema_descr(vec![field]);
652720
let expr = col("c1").gt(lit(ScalarValue::Decimal128(Some(500), 9, 2)));
653721
let expr = logical2physical(&expr, &schema);
654-
let pruning_predicate =
655-
PruningPredicate::try_new(expr, Arc::new(schema)).unwrap();
722+
let pruning_predicate = PruningPredicate::try_new(expr, schema.clone()).unwrap();
656723
let rgm1 = get_row_group_meta_data(
657724
&schema_descr,
658725
// [1.00, 6.00]
@@ -680,6 +747,7 @@ mod tests {
680747
let metrics = parquet_file_metrics();
681748
assert_eq!(
682749
prune_row_groups_by_statistics(
750+
&schema,
683751
&schema_descr,
684752
&[rgm1, rgm2, rgm3],
685753
None,
@@ -693,8 +761,11 @@ mod tests {
693761
// The c1 type is decimal(9,0) in the parquet file, and the type of scalar is decimal(5,2).
694762
// We should convert all type to the coercion type, which is decimal(11,2)
695763
// The decimal of arrow is decimal(5,2), the decimal of parquet is decimal(9,0)
696-
let schema =
697-
Schema::new(vec![Field::new("c1", DataType::Decimal128(9, 0), false)]);
764+
let schema = Arc::new(Schema::new(vec![Field::new(
765+
"c1",
766+
DataType::Decimal128(9, 0),
767+
false,
768+
)]));
698769

699770
let field = PrimitiveTypeField::new("c1", PhysicalType::INT32)
700771
.with_logical_type(LogicalType::Decimal {
@@ -709,8 +780,7 @@ mod tests {
709780
Decimal128(11, 2),
710781
));
711782
let expr = logical2physical(&expr, &schema);
712-
let pruning_predicate =
713-
PruningPredicate::try_new(expr, Arc::new(schema)).unwrap();
783+
let pruning_predicate = PruningPredicate::try_new(expr, schema.clone()).unwrap();
714784
let rgm1 = get_row_group_meta_data(
715785
&schema_descr,
716786
// [100, 600]
@@ -744,6 +814,7 @@ mod tests {
744814
let metrics = parquet_file_metrics();
745815
assert_eq!(
746816
prune_row_groups_by_statistics(
817+
&schema,
747818
&schema_descr,
748819
&[rgm1, rgm2, rgm3, rgm4],
749820
None,
@@ -754,8 +825,11 @@ mod tests {
754825
);
755826

756827
// INT64: c1 < 5, the c1 is decimal(18,2)
757-
let schema =
758-
Schema::new(vec![Field::new("c1", DataType::Decimal128(18, 2), false)]);
828+
let schema = Arc::new(Schema::new(vec![Field::new(
829+
"c1",
830+
DataType::Decimal128(18, 2),
831+
false,
832+
)]));
759833
let field = PrimitiveTypeField::new("c1", PhysicalType::INT64)
760834
.with_logical_type(LogicalType::Decimal {
761835
scale: 2,
@@ -766,8 +840,7 @@ mod tests {
766840
let schema_descr = get_test_schema_descr(vec![field]);
767841
let expr = col("c1").lt(lit(ScalarValue::Decimal128(Some(500), 18, 2)));
768842
let expr = logical2physical(&expr, &schema);
769-
let pruning_predicate =
770-
PruningPredicate::try_new(expr, Arc::new(schema)).unwrap();
843+
let pruning_predicate = PruningPredicate::try_new(expr, schema.clone()).unwrap();
771844
let rgm1 = get_row_group_meta_data(
772845
&schema_descr,
773846
// [6.00, 8.00]
@@ -792,6 +865,7 @@ mod tests {
792865
let metrics = parquet_file_metrics();
793866
assert_eq!(
794867
prune_row_groups_by_statistics(
868+
&schema,
795869
&schema_descr,
796870
&[rgm1, rgm2, rgm3],
797871
None,
@@ -803,8 +877,11 @@ mod tests {
803877

804878
// FIXED_LENGTH_BYTE_ARRAY: c1 = decimal128(100000, 28, 3), the c1 is decimal(18,2)
805879
// the type of parquet is decimal(18,2)
806-
let schema =
807-
Schema::new(vec![Field::new("c1", DataType::Decimal128(18, 2), false)]);
880+
let schema = Arc::new(Schema::new(vec![Field::new(
881+
"c1",
882+
DataType::Decimal128(18, 2),
883+
false,
884+
)]));
808885
let field = PrimitiveTypeField::new("c1", PhysicalType::FIXED_LEN_BYTE_ARRAY)
809886
.with_logical_type(LogicalType::Decimal {
810887
scale: 2,
@@ -818,8 +895,7 @@ mod tests {
818895
let left = cast(col("c1"), DataType::Decimal128(28, 3));
819896
let expr = left.eq(lit(ScalarValue::Decimal128(Some(100000), 28, 3)));
820897
let expr = logical2physical(&expr, &schema);
821-
let pruning_predicate =
822-
PruningPredicate::try_new(expr, Arc::new(schema)).unwrap();
898+
let pruning_predicate = PruningPredicate::try_new(expr, schema.clone()).unwrap();
823899
// we must use the big-endian when encode the i128 to bytes or vec[u8].
824900
let rgm1 = get_row_group_meta_data(
825901
&schema_descr,
@@ -863,6 +939,7 @@ mod tests {
863939
let metrics = parquet_file_metrics();
864940
assert_eq!(
865941
prune_row_groups_by_statistics(
942+
&schema,
866943
&schema_descr,
867944
&[rgm1, rgm2, rgm3],
868945
None,
@@ -874,8 +951,11 @@ mod tests {
874951

875952
// BYTE_ARRAY: c1 = decimal128(100000, 28, 3), the c1 is decimal(18,2)
876953
// the type of parquet is decimal(18,2)
877-
let schema =
878-
Schema::new(vec![Field::new("c1", DataType::Decimal128(18, 2), false)]);
954+
let schema = Arc::new(Schema::new(vec![Field::new(
955+
"c1",
956+
DataType::Decimal128(18, 2),
957+
false,
958+
)]));
879959
let field = PrimitiveTypeField::new("c1", PhysicalType::BYTE_ARRAY)
880960
.with_logical_type(LogicalType::Decimal {
881961
scale: 2,
@@ -889,8 +969,7 @@ mod tests {
889969
let left = cast(col("c1"), DataType::Decimal128(28, 3));
890970
let expr = left.eq(lit(ScalarValue::Decimal128(Some(100000), 28, 3)));
891971
let expr = logical2physical(&expr, &schema);
892-
let pruning_predicate =
893-
PruningPredicate::try_new(expr, Arc::new(schema)).unwrap();
972+
let pruning_predicate = PruningPredicate::try_new(expr, schema.clone()).unwrap();
894973
// we must use the big-endian when encode the i128 to bytes or vec[u8].
895974
let rgm1 = get_row_group_meta_data(
896975
&schema_descr,
@@ -923,6 +1002,7 @@ mod tests {
9231002
let metrics = parquet_file_metrics();
9241003
assert_eq!(
9251004
prune_row_groups_by_statistics(
1005+
&schema,
9261006
&schema_descr,
9271007
&[rgm1, rgm2, rgm3],
9281008
None,

0 commit comments

Comments
 (0)