-
Notifications
You must be signed in to change notification settings - Fork 1.8k
implement AggregateExec.partition_statistics
#15954
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
| .num_rows | ||
| .to_inexact() | ||
| } else if *value == 0 { | ||
| // Aggregation on an empty table creates a null row. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- If
!group_by_expr.is_empty()andinput_statistics.num_rows == 0:- Both
PartialandFinalaggregation modes (agg.mode) yield 0 output rows. ((Note the AggregateExec metric:[output_rows=0])
- Both
> explain analyze select count(*) from generate_series(0) where value > 10 group by value;
+-------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+-------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| Plan with Metrics | ProjectionExec: expr=[count(Int64(1))@1 as count(*)], metrics=[output_rows=0, elapsed_compute=24ns] |
| | AggregateExec: mode=FinalPartitioned, gby=[value@0 as value], aggr=[count(Int64(1))], metrics=[output_rows=0, elapsed_compute=100.016µs, spill_count=0, spilled_bytes=0, spilled_rows=0, peak_mem_used=1536] |
| | CoalesceBatchesExec: target_batch_size=8192, metrics=[output_rows=0, elapsed_compute=452ns] |
| | RepartitionExec: partitioning=Hash([value@0], 24), input_partitions=24, metrics=[fetch_time=10.544607ms, repartition_time=24ns, send_time=576ns] |
| | AggregateExec: mode=Partial, gby=[value@0 as value], aggr=[count(Int64(1))], metrics=[output_rows=0, elapsed_compute=170.537µs, spill_count=0, spilled_bytes=0, spilled_rows=0, skipped_aggregation_rows=0, peak_mem_used=1536] |
| | CoalesceBatchesExec: target_batch_size=8192, metrics=[output_rows=0, elapsed_compute=663ns] |
| | FilterExec: value@0 > 10, metrics=[output_rows=0, elapsed_compute=2.201314ms] |
| | RepartitionExec: partitioning=RoundRobinBatch(24), input_partitions=1, metrics=[fetch_time=3.077µs, repartition_time=1ns, send_time=1.16µs] |
| | LazyMemoryExec: partitions=1, batch_generators=[generate_series: start=0, end=0, batch_size=8192], metrics=[] |
| | |
+-------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
1 row(s) fetched.
Elapsed 0.004 seconds.
> select count(*) from generate_series(0) where value > 10 group by value;
+----------+
| count(*) |
+----------+
+----------+
0 row(s) fetched.
- If
group_by_expr.is_empty()andinput_statistics.num_rows == 0:Finalaggregation mode (agg.mode == Final) yields 1 output row. But it already return never hit this line
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does it seem like a bug fix? If so, it's better add a unit test to cover it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think current unit test already covered it
+ group_by_expr.is_empty() and input_statistics.num_rows == 0
~~+ !group_by_expr.is_empty() and input_statistics.num_rows == 0~~~
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
datafusion/datafusion/core/tests/physical_optimizer/partition_statistics.rs
Lines 636 to 662 in aa62950
| let agg_partial = Arc::new(AggregateExec::try_new( | |
| AggregateMode::Partial, | |
| group_by.clone(), | |
| aggr_expr.clone(), | |
| vec![None], | |
| empty_table.clone(), | |
| scan_schema.clone(), | |
| )?); | |
| let empty_stat = Statistics { | |
| num_rows: Precision::Exact(0), | |
| total_byte_size: Precision::Absent, | |
| column_statistics: vec![ | |
| ColumnStatistics::new_unknown(), | |
| ColumnStatistics::new_unknown(), | |
| ColumnStatistics::new_unknown(), | |
| ], | |
| }; | |
| assert_eq!(&empty_stat, &agg_partial.partition_statistics(Some(0))?); | |
| assert_eq!(&empty_stat, &agg_partial.partition_statistics(Some(1))?); | |
| validate_statistics_with_data( | |
| agg_partial.clone(), | |
| vec![ExpectedStatistics::Empty, ExpectedStatistics::Empty], | |
| 0, | |
| ) | |
| .await?; |
|
FYI @xudong963 |
|
@UBarney Thank you, I'll review it in two days |
|
@xudong963 Thanks for reviewing. All comments have been addressed, PTAL |
Thank you, I'll review asap |
datafusion/core/tests/physical_optimizer/partition_statistics.rs
Outdated
Show resolved
Hide resolved
| let grouping_set_num = self.group_by.groups.len(); | ||
| child_statistics.num_rows.map(|x| x * grouping_set_num) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't get it here, could you please give a SQL example?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
> explain analyze select t1.value, t2.value, count(*) from range(1) as t1 join range(1) as t2 using(value) group by cube(t1.value, t2.value);
+-------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+-------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| Plan with Metrics | ProjectionExec: expr=[value@0 as value, value@1 as value, count(Int64(1))@3 as count(*)], metrics=[output_rows=4, elapsed_compute=20.988µs] |
| | AggregateExec: mode=FinalPartitioned, gby=[value@0 as value, value@1 as value, __grouping_id@2 as __grouping_id], aggr=[count(Int64(1))], metrics=[output_rows=4, elapsed_compute=107.217µs, spill_count=0, spilled_bytes=0, spilled_rows=0, peak_mem_used=3264] |
| | CoalesceBatchesExec: target_batch_size=8192, metrics=[output_rows=4, elapsed_compute=4.21µs] |
| | RepartitionExec: partitioning=Hash([value@0, value@1, __grouping_id@2], 24), input_partitions=24, metrics=[fetch_time=19.129834ms, repartition_time=7.618µs, send_time=35.082µs] |
| | AggregateExec: mode=Partial, gby=[(value@0 as value, value@1 as value), (NULL as value, value@1 as value), (value@0 as value, NULL as value), (NULL as value, NULL as value)], aggr=[count(Int64(1))], metrics=[output_rows=4, elapsed_compute=120.603µs, spill_count=0, spilled_bytes=0, spilled_rows=0, peak_mem_used=1976] |
| | CoalesceBatchesExec: target_batch_size=8192, metrics=[output_rows=1, elapsed_compute=1.288µs] |
| | HashJoinExec: mode=Partitioned, join_type=Inner, on=[(value@0, value@0)], metrics=[output_rows=1, build_input_batches=1, build_input_rows=1, input_batches=1, input_rows=1, output_batches=1, build_mem_used=1760, build_time=475.105µs, join_time=14.496µs] |
| | CoalesceBatchesExec: target_batch_size=8192, metrics=[output_rows=1, elapsed_compute=1.622µs] |
| | RepartitionExec: partitioning=Hash([value@0], 24), input_partitions=24, metrics=[fetch_time=654.947µs, repartition_time=4.621µs, send_time=1.056µs] |
| | RepartitionExec: partitioning=RoundRobinBatch(24), input_partitions=1, metrics=[fetch_time=3.096µs, repartition_time=1ns, send_time=741ns] |
| | LazyMemoryExec: partitions=1, batch_generators=[range: start=0, end=1, batch_size=8192], metrics=[] |
| | CoalesceBatchesExec: target_batch_size=8192, metrics=[output_rows=1, elapsed_compute=2.197µs] |
| | RepartitionExec: partitioning=Hash([value@0], 24), input_partitions=24, metrics=[fetch_time=2.702565ms, repartition_time=2.726µs, send_time=1.497µs] |
| | RepartitionExec: partitioning=RoundRobinBatch(24), input_partitions=1, metrics=[fetch_time=1.571µs, repartition_time=1ns, send_time=248ns] |
| | LazyMemoryExec: partitions=1, batch_generators=[range: start=0, end=1, batch_size=8192], metrics=[] |
| | |
+-------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
1 row(s) fetched.
Elapsed 0.004 seconds.
>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's also a bug fix, do we have a ut?
| .num_rows | ||
| .to_inexact() | ||
| } else if *value == 0 { | ||
| // Aggregation on an empty table creates a null row. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does it seem like a bug fix? If so, it's better add a unit test to cover it
| // self.schema: [<group by exprs>, <aggregate exprs>] | ||
| let mut column_statistics = Statistics::unknown_column(&self.schema()); | ||
|
|
||
| for (idx, (expr, _)) in self.group_by.expr.iter().enumerate() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the child of the agg node is a non-agg node, do the output columns of the child match with the group by expr?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. It can be verified by this testcase :
datafusion/datafusion/core/tests/physical_optimizer/partition_statistics.rs
Lines 549 to 576 in 0c7760a
| let aggregate_exec_partial = Arc::new(AggregateExec::try_new( | |
| AggregateMode::Partial, | |
| group_by.clone(), | |
| aggr_expr.clone(), | |
| vec![None], | |
| Arc::clone(&scan), | |
| scan_schema.clone(), | |
| )?); | |
| let p0_statistics = aggregate_exec_partial.partition_statistics(Some(0))?; | |
| let expected_p0_statistics = Statistics { | |
| num_rows: Precision::Inexact(2), | |
| total_byte_size: Precision::Absent, | |
| column_statistics: vec![ | |
| ColumnStatistics { | |
| null_count: Precision::Absent, | |
| max_value: Precision::Exact(ScalarValue::Int32(Some(4))), | |
| min_value: Precision::Exact(ScalarValue::Int32(Some(3))), | |
| sum_value: Precision::Absent, | |
| distinct_count: Precision::Absent, | |
| }, | |
| ColumnStatistics::new_unknown(), | |
| ColumnStatistics::new_unknown(), | |
| ], | |
| }; | |
| assert_eq!(&p0_statistics, &expected_p0_statistics); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we adjust the order of group by exprs in the test, does it still match?
let group_by = PhysicalGroupBy::new_single(vec![
(
binary(
lit(1),
Operator::Plus,
col("id", &scan_schema)?,
&scan_schema,
)?,
"expr".to_string(),
),
(col("id", &scan_schema)?, "id".to_string()),
]);
The idx will be 1, and it'll point to the date column in the scan.
Correct me if I missed something
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we adjust the order of group by exprs in the test, does it still match?
Yes.
#[tokio::test]
async fn test_agg_order() -> Result<()> {
let scan = create_scan_exec_with_statistics(None, Some(2)).await;
let scan_schema = scan.schema();
let group_by = PhysicalGroupBy::new_single(vec![
(
binary(
lit(1),
Operator::Plus,
col("id", &scan_schema)?,
&scan_schema,
)?,
"expr".to_string(),
),
(col("id", &scan_schema)?, "id".to_string()),
]);
let aggr_expr = vec![AggregateExprBuilder::new(count_udaf(), vec![lit(1)])
.schema(Arc::clone(&scan_schema))
.alias(String::from("COUNT(c)"))
.build()
.map(Arc::new)?];
let aggregate_exec_partial = Arc::new(AggregateExec::try_new(
AggregateMode::Partial,
group_by.clone(),
aggr_expr.clone(),
vec![None],
Arc::clone(&scan),
scan_schema.clone(),
)?);
let p0_statistics = aggregate_exec_partial.partition_statistics(Some(0))?;
let expected_p0_statistics = Statistics {
num_rows: Precision::Inexact(2),
total_byte_size: Precision::Absent,
column_statistics: vec![
ColumnStatistics::new_unknown(),
ColumnStatistics {
null_count: Precision::Absent,
max_value: Precision::Exact(ScalarValue::Int32(Some(4))),
min_value: Precision::Exact(ScalarValue::Int32(Some(3))),
sum_value: Precision::Absent,
distinct_count: Precision::Absent,
},
ColumnStatistics::new_unknown(),
],
};
assert_eq!(&p0_statistics, &expected_p0_statistics);
let expected_p1_statistics = Statistics {
num_rows: Precision::Inexact(2),
total_byte_size: Precision::Absent,
column_statistics: vec![
ColumnStatistics::new_unknown(),
ColumnStatistics {
null_count: Precision::Absent,
max_value: Precision::Exact(ScalarValue::Int32(Some(2))),
min_value: Precision::Exact(ScalarValue::Int32(Some(1))),
sum_value: Precision::Absent,
distinct_count: Precision::Absent,
},
ColumnStatistics::new_unknown(),
],
};
let p1_statistics = aggregate_exec_partial.partition_statistics(Some(1))?;
assert_eq!(&p1_statistics, &expected_p1_statistics);
validate_statistics_with_data(
aggregate_exec_partial.clone(),
vec![
ExpectedStatistics::NonEmpty(3, 4, 2),
ExpectedStatistics::NonEmpty(1, 2, 2),
],
1,
)
.await?;
let agg_final = Arc::new(AggregateExec::try_new(
AggregateMode::FinalPartitioned,
group_by.as_final(),
aggr_expr.clone(),
vec![None],
aggregate_exec_partial.clone(),
aggregate_exec_partial.schema(),
)?);
let p0_statistics = agg_final.partition_statistics(Some(0))?;
assert_eq!(&p0_statistics, &expected_p0_statistics);
let p1_statistics = agg_final.partition_statistics(Some(1))?;
assert_eq!(&p1_statistics, &expected_p1_statistics);
validate_statistics_with_data(
agg_final.clone(),
vec![
ExpectedStatistics::NonEmpty(3, 4, 2),
ExpectedStatistics::NonEmpty(1, 2, 2),
],
1,
)
.await?;
Ok(())
}
The idx will be 1
Yes. but I use child_statistics.column_statistics[col.index()] rather than child_statistics.column_statistics.[idx]
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you, I think it's close to finishing!
Also, could you take a look? @berkaysynnada
berkaysynnada
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, thank you @UBarney
datafusion/core/tests/physical_optimizer/partition_statistics.rs
Outdated
Show resolved
Hide resolved
As noted in the existing TODOs, there could be more special handling needed in both the group-by expressions and the aggregation logic. However, this is clearly an improvement, and others can build on it in future work. A few "good first issues" could be created from this as well. |
xudong963
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @UBarney , after resolving the comments from @berkaysynnada , we can go ahead!
@berkaysynnada Thanks for reviewing. I have addressed your comment |
|
@UBarney Thank you again!! |
|
🎉 |
Which issue does this PR close?
part of #15873
Rationale for this change
What changes are included in this PR?
statistics_inner:(self, statistics: Statistics).min_value/max_valuefrom input statistics when grouping by a simpleColumn.PartitionStatisticsExecAre these changes tested?
Yes
Are there any user-facing changes?