Skip to content

Conversation

@UBarney
Copy link
Contributor

@UBarney UBarney commented May 6, 2025

Which issue does this PR close?

part of #15873

Rationale for this change

What changes are included in this PR?

  • statistics_inner:
    • Change signature to (self, statistics: Statistics).
    • Preserve min_value/max_value from input statistics when grouping by a simple Column.
  • Add PartitionStatisticsExec

Are these changes tested?

Yes

Are there any user-facing changes?

.num_rows
.to_inexact()
} else if *value == 0 {
// Aggregation on an empty table creates a null row.
Copy link
Contributor Author

@UBarney UBarney May 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • If !group_by_expr.is_empty() and input_statistics.num_rows == 0:
    • Both Partial and Final aggregation modes (agg.mode) yield 0 output rows. ((Note the AggregateExec metric: [output_rows=0])
> explain analyze select count(*) from generate_series(0) where value > 10 group by value;

+-------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type         | plan                                                                                                                                                                                                                                    |
+-------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| Plan with Metrics | ProjectionExec: expr=[count(Int64(1))@1 as count(*)], metrics=[output_rows=0, elapsed_compute=24ns]                                                                                                                                     |
|                   |   AggregateExec: mode=FinalPartitioned, gby=[value@0 as value], aggr=[count(Int64(1))], metrics=[output_rows=0, elapsed_compute=100.016µs, spill_count=0, spilled_bytes=0, spilled_rows=0, peak_mem_used=1536]                          |
|                   |     CoalesceBatchesExec: target_batch_size=8192, metrics=[output_rows=0, elapsed_compute=452ns]                                                                                                                                         |
|                   |       RepartitionExec: partitioning=Hash([value@0], 24), input_partitions=24, metrics=[fetch_time=10.544607ms, repartition_time=24ns, send_time=576ns]                                                                                  |
|                   |         AggregateExec: mode=Partial, gby=[value@0 as value], aggr=[count(Int64(1))], metrics=[output_rows=0, elapsed_compute=170.537µs, spill_count=0, spilled_bytes=0, spilled_rows=0, skipped_aggregation_rows=0, peak_mem_used=1536] |
|                   |           CoalesceBatchesExec: target_batch_size=8192, metrics=[output_rows=0, elapsed_compute=663ns]                                                                                                                                   |
|                   |             FilterExec: value@0 > 10, metrics=[output_rows=0, elapsed_compute=2.201314ms]                                                                                                                                               |
|                   |               RepartitionExec: partitioning=RoundRobinBatch(24), input_partitions=1, metrics=[fetch_time=3.077µs, repartition_time=1ns, send_time=1.16µs]                                                                               |
|                   |                 LazyMemoryExec: partitions=1, batch_generators=[generate_series: start=0, end=0, batch_size=8192], metrics=[]                                                                                                           |
|                   |                                                                                                                                                                                                                                         |
+-------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
1 row(s) fetched. 
Elapsed 0.004 seconds.

> select count(*) from generate_series(0) where value > 10 group by value;
+----------+
| count(*) |
+----------+
+----------+
0 row(s) fetched.
  • If group_by_expr.is_empty() and input_statistics.num_rows == 0:
    • Final aggregation mode (agg.mode == Final) yields 1 output row. But it already return never hit this line

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it seem like a bug fix? If so, it's better add a unit test to cover it

Copy link
Contributor Author

@UBarney UBarney May 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let agg_partial = Arc::new(AggregateExec::try_new(
AggregateMode::Partial,
group_by.clone(),
aggr_expr.clone(),
vec![None],
empty_table.clone(),
scan_schema.clone(),
)?);
let empty_stat = Statistics {
num_rows: Precision::Exact(0),
total_byte_size: Precision::Absent,
column_statistics: vec![
ColumnStatistics::new_unknown(),
ColumnStatistics::new_unknown(),
ColumnStatistics::new_unknown(),
],
};
assert_eq!(&empty_stat, &agg_partial.partition_statistics(Some(0))?);
assert_eq!(&empty_stat, &agg_partial.partition_statistics(Some(1))?);
validate_statistics_with_data(
agg_partial.clone(),
vec![ExpectedStatistics::Empty, ExpectedStatistics::Empty],
0,
)
.await?;

@UBarney UBarney marked this pull request as ready for review May 7, 2025 03:48
@UBarney
Copy link
Contributor Author

UBarney commented May 7, 2025

FYI @xudong963

@xudong963
Copy link
Member

@UBarney Thank you, I'll review it in two days

@xudong963 xudong963 self-requested a review May 8, 2025 02:41
@github-actions github-actions bot added the core Core DataFusion crate label May 11, 2025
@UBarney
Copy link
Contributor Author

UBarney commented May 11, 2025

@xudong963 Thanks for reviewing. All comments have been addressed, PTAL

@xudong963
Copy link
Member

@xudong963 Thanks for reviewing. All comments have been addressed, PTAL

Thank you, I'll review asap

@xudong963 xudong963 self-requested a review May 14, 2025 12:08
@UBarney UBarney requested a review from xudong963 May 22, 2025 02:47
Comment on lines +784 to +785
let grouping_set_num = self.group_by.groups.len();
child_statistics.num_rows.map(|x| x * grouping_set_num)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't get it here, could you please give a SQL example?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

> explain analyze select t1.value, t2.value, count(*) from range(1) as t1 join range(1) as t2 using(value) group by cube(t1.value, t2.value);

+-------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type         | plan                                                                                                                                                                                                                                                                                                                                  |
+-------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| Plan with Metrics | ProjectionExec: expr=[value@0 as value, value@1 as value, count(Int64(1))@3 as count(*)], metrics=[output_rows=4, elapsed_compute=20.988µs]                                                                                                                                                                                           |
|                   |   AggregateExec: mode=FinalPartitioned, gby=[value@0 as value, value@1 as value, __grouping_id@2 as __grouping_id], aggr=[count(Int64(1))], metrics=[output_rows=4, elapsed_compute=107.217µs, spill_count=0, spilled_bytes=0, spilled_rows=0, peak_mem_used=3264]                                                                    |
|                   |     CoalesceBatchesExec: target_batch_size=8192, metrics=[output_rows=4, elapsed_compute=4.21µs]                                                                                                                                                                                                                                      |
|                   |       RepartitionExec: partitioning=Hash([value@0, value@1, __grouping_id@2], 24), input_partitions=24, metrics=[fetch_time=19.129834ms, repartition_time=7.618µs, send_time=35.082µs]                                                                                                                                                |
|                   |         AggregateExec: mode=Partial, gby=[(value@0 as value, value@1 as value), (NULL as value, value@1 as value), (value@0 as value, NULL as value), (NULL as value, NULL as value)], aggr=[count(Int64(1))], metrics=[output_rows=4, elapsed_compute=120.603µs, spill_count=0, spilled_bytes=0, spilled_rows=0, peak_mem_used=1976] |
|                   |           CoalesceBatchesExec: target_batch_size=8192, metrics=[output_rows=1, elapsed_compute=1.288µs]                                                                                                                                                                                                                               |
|                   |             HashJoinExec: mode=Partitioned, join_type=Inner, on=[(value@0, value@0)], metrics=[output_rows=1, build_input_batches=1, build_input_rows=1, input_batches=1, input_rows=1, output_batches=1, build_mem_used=1760, build_time=475.105µs, join_time=14.496µs]                                                              |
|                   |               CoalesceBatchesExec: target_batch_size=8192, metrics=[output_rows=1, elapsed_compute=1.622µs]                                                                                                                                                                                                                           |
|                   |                 RepartitionExec: partitioning=Hash([value@0], 24), input_partitions=24, metrics=[fetch_time=654.947µs, repartition_time=4.621µs, send_time=1.056µs]                                                                                                                                                                   |
|                   |                   RepartitionExec: partitioning=RoundRobinBatch(24), input_partitions=1, metrics=[fetch_time=3.096µs, repartition_time=1ns, send_time=741ns]                                                                                                                                                                          |
|                   |                     LazyMemoryExec: partitions=1, batch_generators=[range: start=0, end=1, batch_size=8192], metrics=[]                                                                                                                                                                                                               |
|                   |               CoalesceBatchesExec: target_batch_size=8192, metrics=[output_rows=1, elapsed_compute=2.197µs]                                                                                                                                                                                                                           |
|                   |                 RepartitionExec: partitioning=Hash([value@0], 24), input_partitions=24, metrics=[fetch_time=2.702565ms, repartition_time=2.726µs, send_time=1.497µs]                                                                                                                                                                  |
|                   |                   RepartitionExec: partitioning=RoundRobinBatch(24), input_partitions=1, metrics=[fetch_time=1.571µs, repartition_time=1ns, send_time=248ns]                                                                                                                                                                          |
|                   |                     LazyMemoryExec: partitions=1, batch_generators=[range: start=0, end=1, batch_size=8192], metrics=[]                                                                                                                                                                                                               |
|                   |                                                                                                                                                                                                                                                                                                                                       |
+-------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
1 row(s) fetched. 
Elapsed 0.004 seconds.

> 

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's also a bug fix, do we have a ut?

.num_rows
.to_inexact()
} else if *value == 0 {
// Aggregation on an empty table creates a null row.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it seem like a bug fix? If so, it's better add a unit test to cover it

// self.schema: [<group by exprs>, <aggregate exprs>]
let mut column_statistics = Statistics::unknown_column(&self.schema());

for (idx, (expr, _)) in self.group_by.expr.iter().enumerate() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the child of the agg node is a non-agg node, do the output columns of the child match with the group by expr?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. It can be verified by this testcase :

let aggregate_exec_partial = Arc::new(AggregateExec::try_new(
AggregateMode::Partial,
group_by.clone(),
aggr_expr.clone(),
vec![None],
Arc::clone(&scan),
scan_schema.clone(),
)?);
let p0_statistics = aggregate_exec_partial.partition_statistics(Some(0))?;
let expected_p0_statistics = Statistics {
num_rows: Precision::Inexact(2),
total_byte_size: Precision::Absent,
column_statistics: vec![
ColumnStatistics {
null_count: Precision::Absent,
max_value: Precision::Exact(ScalarValue::Int32(Some(4))),
min_value: Precision::Exact(ScalarValue::Int32(Some(3))),
sum_value: Precision::Absent,
distinct_count: Precision::Absent,
},
ColumnStatistics::new_unknown(),
ColumnStatistics::new_unknown(),
],
};
assert_eq!(&p0_statistics, &expected_p0_statistics);

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we adjust the order of group by exprs in the test, does it still match?

let group_by = PhysicalGroupBy::new_single(vec![
            (
                binary(
                    lit(1),
                    Operator::Plus,
                    col("id", &scan_schema)?,
                    &scan_schema,
                )?,
                "expr".to_string(),
            ),
            (col("id", &scan_schema)?, "id".to_string()),
        ]);

The idx will be 1, and it'll point to the date column in the scan.

Correct me if I missed something

Copy link
Contributor Author

@UBarney UBarney May 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we adjust the order of group by exprs in the test, does it still match?

Yes.

#[tokio::test]
async fn test_agg_order() -> Result<()> {
    let scan = create_scan_exec_with_statistics(None, Some(2)).await;

    let scan_schema = scan.schema();

    let group_by = PhysicalGroupBy::new_single(vec![
        (
            binary(
                lit(1),
                Operator::Plus,
                col("id", &scan_schema)?,
                &scan_schema,
            )?,
            "expr".to_string(),
        ),
        (col("id", &scan_schema)?, "id".to_string()),
    ]);

    let aggr_expr = vec![AggregateExprBuilder::new(count_udaf(), vec![lit(1)])
        .schema(Arc::clone(&scan_schema))
        .alias(String::from("COUNT(c)"))
        .build()
        .map(Arc::new)?];

    let aggregate_exec_partial = Arc::new(AggregateExec::try_new(
        AggregateMode::Partial,
        group_by.clone(),
        aggr_expr.clone(),
        vec![None],
        Arc::clone(&scan),
        scan_schema.clone(),
    )?);

    let p0_statistics = aggregate_exec_partial.partition_statistics(Some(0))?;

    let expected_p0_statistics = Statistics {
        num_rows: Precision::Inexact(2),
        total_byte_size: Precision::Absent,
        column_statistics: vec![
            ColumnStatistics::new_unknown(),
            ColumnStatistics {
                null_count: Precision::Absent,
                max_value: Precision::Exact(ScalarValue::Int32(Some(4))),
                min_value: Precision::Exact(ScalarValue::Int32(Some(3))),
                sum_value: Precision::Absent,
                distinct_count: Precision::Absent,
            },
            ColumnStatistics::new_unknown(),
        ],
    };

    assert_eq!(&p0_statistics, &expected_p0_statistics);

    let expected_p1_statistics = Statistics {
        num_rows: Precision::Inexact(2),
        total_byte_size: Precision::Absent,
        column_statistics: vec![
            ColumnStatistics::new_unknown(),
            ColumnStatistics {
                null_count: Precision::Absent,
                max_value: Precision::Exact(ScalarValue::Int32(Some(2))),
                min_value: Precision::Exact(ScalarValue::Int32(Some(1))),
                sum_value: Precision::Absent,
                distinct_count: Precision::Absent,
            },
            ColumnStatistics::new_unknown(),
        ],
    };

    let p1_statistics = aggregate_exec_partial.partition_statistics(Some(1))?;
    assert_eq!(&p1_statistics, &expected_p1_statistics);

    validate_statistics_with_data(
        aggregate_exec_partial.clone(),
        vec![
            ExpectedStatistics::NonEmpty(3, 4, 2),
            ExpectedStatistics::NonEmpty(1, 2, 2),
        ],
        1,
    )
    .await?;

    let agg_final = Arc::new(AggregateExec::try_new(
        AggregateMode::FinalPartitioned,
        group_by.as_final(),
        aggr_expr.clone(),
        vec![None],
        aggregate_exec_partial.clone(),
        aggregate_exec_partial.schema(),
    )?);

    let p0_statistics = agg_final.partition_statistics(Some(0))?;
    assert_eq!(&p0_statistics, &expected_p0_statistics);

    let p1_statistics = agg_final.partition_statistics(Some(1))?;
    assert_eq!(&p1_statistics, &expected_p1_statistics);

    validate_statistics_with_data(
        agg_final.clone(),
        vec![
            ExpectedStatistics::NonEmpty(3, 4, 2),
            ExpectedStatistics::NonEmpty(1, 2, 2),
        ],
        1,
    )
    .await?;
    Ok(())
}

The idx will be 1

Yes. but I use child_statistics.column_statistics[col.index()] rather than child_statistics.column_statistics.[idx]

@github-actions github-actions bot added the physical-plan Changes to the physical-plan crate label May 23, 2025
Copy link
Member

@xudong963 xudong963 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you, I think it's close to finishing!

Also, could you take a look? @berkaysynnada

Copy link
Contributor

@berkaysynnada berkaysynnada left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thank you @UBarney

@berkaysynnada
Copy link
Contributor

LGTM, thank you @UBarney

As noted in the existing TODOs, there could be more special handling needed in both the group-by expressions and the aggregation logic. However, this is clearly an improvement, and others can build on it in future work. A few "good first issues" could be created from this as well.

Copy link
Member

@xudong963 xudong963 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @UBarney , after resolving the comments from @berkaysynnada , we can go ahead!

@UBarney
Copy link
Contributor Author

UBarney commented May 28, 2025

LGTM, thank you @UBarney

@berkaysynnada Thanks for reviewing. I have addressed your comment

@berkaysynnada berkaysynnada merged commit 7d3c7d8 into apache:main May 28, 2025
27 checks passed
@xudong963
Copy link
Member

@UBarney Thank you again!!

@alamb
Copy link
Contributor

alamb commented May 28, 2025

🎉

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

core Core DataFusion crate physical-plan Changes to the physical-plan crate

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants