Skip to content

Conversation

@alamb
Copy link
Contributor

@alamb alamb commented Sep 17, 2021

Which issue does this PR close?

Finally 🎉 closes #866 (following the same model as #1004).

There are still some operators like Parquet, CSV, Avro, and Json sources that are not instrumented, but I don't have time to devote to intrumenting them now, #1019 tracks that work

Rationale for this change

We want basic understanding of where a plan's time is spent and in what operators. See #866 for more details

What changes are included in this PR?

  1. Instrument WindowAggExec and UnionExec, using the API from Add BaselineMetrics, Timestamp metrics, add for CoalescePartitionsExec, rename output_time -> elapsed_compute #909
  2. Tweak instrumentation for CoalescePartitionsExec so it reports elapsed_compute
  3. Tests for same

Are there any user-facing changes?

More fields in EXPLAIN ANALYZE are now filled out

Example of how explain analyze is looking (dense but packed with good info). I find it quite cool that DataFusion can even plan and execute such queries.

running query: EXPLAIN ANALYZE SELECT count(*) as cnt FROM (SELECT count(*), c1 FROM aggregate_test_100 WHERE c13 != 'C2GT5KVyOPZpgKVl110TyZO0NcJ434' GROUP BY c1 ORDER BY c1 ) UNION ALL SELECT 1 as cnt UNION ALL SELECT lead(c1, 1) OVER () as cnt FROM (select 1 as c1) LIMIT 2
Query Output:

+-------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type         | plan                                                                                                                                                                                                                                            |
+-------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| Plan with Metrics | GlobalLimitExec: limit=2, metrics=[output_rows=2, elapsed_compute=3.023µs]                                                                                                                                                                      |
|                   |   CoalescePartitionsExec, metrics=[output_rows=3, elapsed_compute=50.216µs]                                                                                                                                                                     |
|                   |     LocalLimitExec: limit=2, metrics=[output_rows=3, elapsed_compute=699ns]                                                                                                                                                                     |
|                   |       UnionExec, metrics=[output_rows=3, elapsed_compute=198.269µs]                                                                                                                                                                             |
|                   |         RepartitionExec: partitioning=RoundRobinBatch(3), metrics=[repart_time{inputPartition=0}=NOT RECORDED, fetch_time{inputPartition=0}=11.908387ms, send_time{inputPartition=0}=3.816µs]                                                   |
|                   |           GlobalLimitExec: limit=2, metrics=[output_rows=1, elapsed_compute=157ns]                                                                                                                                                              |
|                   |             ProjectionExec: expr=[COUNT(UInt8(1))@0 as cnt], metrics=[output_rows=1, elapsed_compute=4.125µs]                                                                                                                                   |
|                   |               HashAggregateExec: mode=Final, gby=[], aggr=[COUNT(UInt8(1))], metrics=[output_rows=1, elapsed_compute=66.501µs]                                                                                                                  |
|                   |                 CoalescePartitionsExec, metrics=[output_rows=3, elapsed_compute=11.969µs]                                                                                                                                                       |
|                   |                   HashAggregateExec: mode=Partial, gby=[], aggr=[COUNT(UInt8(1))], metrics=[output_rows=3, elapsed_compute=89.888µs]                                                                                                            |
|                   |                     RepartitionExec: partitioning=RoundRobinBatch(3), metrics=[fetch_time{inputPartition=0}=11.049667ms, send_time{inputPartition=0}=3.965µs, repart_time{inputPartition=0}=NOT RECORDED]                                       |
|                   |                       SortExec: [c1@1 ASC], metrics=[output_rows=5, elapsed_compute=196.635µs]                                                                                                                                                  |
|                   |                         CoalescePartitionsExec, metrics=[output_rows=5, elapsed_compute=11.127µs]                                                                                                                                               |
|                   |                           ProjectionExec: expr=[COUNT(UInt8(1))@1 as COUNT(UInt8(1)), c1@0 as c1], metrics=[output_rows=5, elapsed_compute=17.878µs]                                                                                            |
|                   |                             HashAggregateExec: mode=FinalPartitioned, gby=[c1@0 as c1], aggr=[COUNT(UInt8(1))], metrics=[output_rows=5, elapsed_compute=308.373µs]                                                                              |
|                   |                               CoalesceBatchesExec: target_batch_size=4096, metrics=[output_rows=5, elapsed_compute=77.054µs]                                                                                                                    |
|                   |                                 RepartitionExec: partitioning=Hash([Column { name: "c1", index: 0 }], 3), metrics=[send_time{inputPartition=0}=NOT RECORDED, repart_time{inputPartition=0}=200.022µs, fetch_time{inputPartition=0}=28.377811ms] |
|                   |                                   HashAggregateExec: mode=Partial, gby=[c1@0 as c1], aggr=[COUNT(UInt8(1))], metrics=[output_rows=5, elapsed_compute=585.625µs]                                                                                 |
|                   |                                     CoalesceBatchesExec: target_batch_size=4096, metrics=[output_rows=99, elapsed_compute=268.21µs]                                                                                                             |
|                   |                                       FilterExec: c13@1 != C2GT5KVyOPZpgKVl110TyZO0NcJ434, metrics=[output_rows=99, elapsed_compute=228.181µs]                                                                                                  |
|                   |                                         RepartitionExec: partitioning=RoundRobinBatch(3), metrics=[repart_time{inputPartition=0}=NOT RECORDED, fetch_time{inputPartition=0}=6.765885ms, send_time{inputPartition=0}=3.554µs]                    |
|                   |                                           CsvExec: source=Path(/Users/alamb/Software/arrow/testing/data/csv/aggregate_test_100.csv: [/Users/alamb/Software/arrow/testing/data/csv/aggregate_test_100.csv]), has_header=true, metrics=[]         |
|                   |         RepartitionExec: partitioning=RoundRobinBatch(3), metrics=[fetch_time{inputPartition=0}=131.806µs, send_time{inputPartition=0}=11.372µs, repart_time{inputPartition=0}=NOT RECORDED]                                                    |
|                   |           GlobalLimitExec: limit=2, metrics=[output_rows=1, elapsed_compute=211ns]                                                                                                                                                              |
|                   |             ProjectionExec: expr=[1 as cnt], metrics=[output_rows=1, elapsed_compute=60.248µs]                                                                                                                                                  |
|                   |               EmptyExec: produce_one_row=true, metrics=[]                                                                                                                                                                                       |
|                   |         RepartitionExec: partitioning=RoundRobinBatch(3), metrics=[repart_time{inputPartition=0}=NOT RECORDED, fetch_time{inputPartition=0}=10.627386ms, send_time{inputPartition=0}=2.322µs]                                                   |
|                   |           GlobalLimitExec: limit=2, metrics=[output_rows=1, elapsed_compute=287ns]                                                                                                                                                              |
|                   |             CoalescePartitionsExec, metrics=[output_rows=1, elapsed_compute=11.487µs]                                                                                                                                                           |
|                   |               ProjectionExec: expr=[LEAD(c1,Int64(1))@0 as cnt], metrics=[output_rows=1, elapsed_compute=5.57µs]                                                                                                                                |
|                   |                 RepartitionExec: partitioning=RoundRobinBatch(3), metrics=[repart_time{inputPartition=0}=NOT RECORDED, send_time{inputPartition=0}=4.463µs, fetch_time{inputPartition=0}=618.203µs]                                             |
|                   |                   WindowAggExec: wdw=[LEAD(c1,Int64(1)): Ok(Field { name: "LEAD(c1,Int64(1))", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: None })], metrics=[output_rows=1, elapsed_compute=140.931µs]     |
|                   |                     ProjectionExec: expr=[1 as c1], metrics=[output_rows=1, elapsed_compute=9.968µs]                                                                                                                                            |
|                   |                       EmptyExec: produce_one_row=true, metrics=[]                                                                                                                                                                               |
+-------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+

);
assert_metrics!(
&formatted,
"LocalLimitExec: limit=3",
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

by making the query more complicated, it has also introduced a LocalLimitExec for testing 🎉

if partition < input.output_partitioning().partition_count() {
return input.execute(partition).await;
let stream = input.execute(partition).await?;
drop(timer);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this drop needed if the next line contains a return?

Copy link
Contributor Author

@alamb alamb Sep 17, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The drop is needed to satisify the borrow checker: let timer = baseline_metrics.elapsed_compute().timer(); has borrowed from baseline_metrics and rust won't allow baseline metrics to be given to ObserverdStream::new while borrowed.

An alternative is to clone the elapsed compute

        let elapsed_compute = baseline_metrics.elapsed_compute().clone();
        let timer = elapsed_compute.timer();

Which is done in several other parts of this PR. 🤔

Perhaps that would be better to keep the code consistent

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in c0b656c

Copy link
Contributor

@Dandandan Dandandan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great, looking forward to using those!

@alamb
Copy link
Contributor Author

alamb commented Sep 17, 2021

I think something unrelated to this PR is causing the tests to fail. I will look into it later today or tomorrow if no one else gets around to it

@Dandandan
Copy link
Contributor

I think something unrelated to this PR is causing the tests to fail. I will look into it later today or tomorrow if no one else gets around to it

Looks to me just the test in csv_explain_analyze which now contains metrics for the CoalescePartitionsExec which it didn't do before.

@Dandandan Dandandan merged commit 7d8a5cf into apache:master Sep 18, 2021
@Dandandan
Copy link
Contributor

Thanks 👍

@alamb alamb deleted the alamb/stats branch September 19, 2021 10:31
@houqp houqp added the enhancement New feature or request label Nov 6, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

enhancement New feature or request

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Add "baseline" metrics to all built in operators

3 participants