Skip to content

Commit e297d23

Browse files
authored
feat: Improve shuffle metrics (second attempt) (#1175)
* improve shuffle metrics * docs * more metrics * refactor * address feedback
1 parent 655081b commit e297d23

File tree

9 files changed

+261
-140
lines changed

9 files changed

+261
-140
lines changed

docs/source/index.rst

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ as a native runtime to achieve improvement in terms of query efficiency and quer
5151
Configuration Settings <user-guide/configs>
5252
Compatibility Guide <user-guide/compatibility>
5353
Tuning Guide <user-guide/tuning>
54+
Metrics Guide <user-guide/metrics>
5455

5556
.. _toc.contributor-guide-links:
5657
.. toctree::

docs/source/user-guide/metrics.md

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
<!---
2+
Licensed to the Apache Software Foundation (ASF) under one
3+
or more contributor license agreements. See the NOTICE file
4+
distributed with this work for additional information
5+
regarding copyright ownership. The ASF licenses this file
6+
to you under the Apache License, Version 2.0 (the
7+
"License"); you may not use this file except in compliance
8+
with the License. You may obtain a copy of the License at
9+
10+
http://www.apache.org/licenses/LICENSE-2.0
11+
12+
Unless required by applicable law or agreed to in writing,
13+
software distributed under the License is distributed on an
14+
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
KIND, either express or implied. See the License for the
16+
specific language governing permissions and limitations
17+
under the License.
18+
-->
19+
20+
# Comet Metrics
21+
22+
## Spark SQL Metrics
23+
24+
Set `spark.comet.metrics.detailed=true` to see all available Comet metrics.
25+
26+
### CometScanExec
27+
28+
| Metric | Description |
29+
| ----------- | ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
30+
| `scan time` | Total time to scan a Parquet file. This is not comparable to the same metric in Spark because Comet's scan metric is more accurate. Although both Comet and Spark measure the time in nanoseconds, Spark rounds this time to the nearest millisecond per batch and Comet does not. |
31+
32+
### Exchange
33+
34+
Comet adds some additional metrics:
35+
36+
| Metric | Description |
37+
| ------------------------------- | ------------------------------------------------------------- |
38+
| `native shuffle time` | Total time in native code excluding any child operators. |
39+
| `repartition time` | Time to repartition batches. |
40+
| `memory pool time` | Time interacting with memory pool. |
41+
| `encoding and compression time` | Time to encode batches in IPC format and compress using ZSTD. |
42+
43+
## Native Metrics
44+
45+
Setting `spark.comet.explain.native.enabled=true` will cause native plans to be logged in each executor. Metrics are
46+
logged for each native plan (and there is one plan per task, so this is very verbose).
47+
48+
Here is a guide to some of the native metrics.
49+
50+
### ScanExec
51+
52+
| Metric | Description |
53+
| ----------------- | --------------------------------------------------------------------------------------------------- |
54+
| `elapsed_compute` | Total time spent in this operator, fetching batches from a JVM iterator. |
55+
| `jvm_fetch_time` | Time spent in the JVM fetching input batches to be read by this `ScanExec` instance. |
56+
| `arrow_ffi_time` | Time spent using Arrow FFI to create Arrow batches from the memory addresses returned from the JVM. |
57+
58+
### ShuffleWriterExec
59+
60+
| Metric | Description |
61+
| ----------------- | ------------------------------------------------------------- |
62+
| `elapsed_compute` | Total time excluding any child operators. |
63+
| `repart_time` | Time to repartition batches. |
64+
| `ipc_time` | Time to encode batches in IPC format and compress using ZSTD. |
65+
| `mempool_time` | Time interacting with memory pool. |
66+
| `write_time` | Time spent writing bytes to disk. |

docs/source/user-guide/tuning.md

Lines changed: 0 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -103,31 +103,6 @@ native shuffle currently only supports `HashPartitioning` and `SinglePartitionin
103103
To enable native shuffle, set `spark.comet.exec.shuffle.mode` to `native`. If this mode is explicitly set,
104104
then any shuffle operations that cannot be supported in this mode will fall back to Spark.
105105

106-
## Metrics
107-
108-
### Spark SQL Metrics
109-
110-
Some Comet metrics are not directly comparable to Spark metrics in some cases:
111-
112-
- `CometScanExec` uses nanoseconds for total scan time. Spark also measures scan time in nanoseconds but converts to
113-
milliseconds _per batch_ which can result in a large loss of precision, making it difficult to compare scan times
114-
between Spark and Comet.
115-
116-
### Native Metrics
117-
118-
Setting `spark.comet.explain.native.enabled=true` will cause native plans to be logged in each executor. Metrics are
119-
logged for each native plan (and there is one plan per task, so this is very verbose).
120-
121-
Here is a guide to some of the native metrics.
122-
123-
### ScanExec
124-
125-
| Metric | Description |
126-
| ----------------- | --------------------------------------------------------------------------------------------------- |
127-
| `elapsed_compute` | Total time spent in this operator, fetching batches from a JVM iterator. |
128-
| `jvm_fetch_time` | Time spent in the JVM fetching input batches to be read by this `ScanExec` instance. |
129-
| `arrow_ffi_time` | Time spent using Arrow FFI to create Arrow batches from the memory addresses returned from the JVM. |
130-
131106
## Explain Plan
132107
### Extended Explain
133108
With Spark 4.0.0 and newer, Comet can provide extended explain plan information in the Spark UI. Currently this lists

native/core/src/execution/shuffle/row.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ use arrow_array::{
4040
Array, ArrayRef, RecordBatch, RecordBatchOptions,
4141
};
4242
use arrow_schema::{ArrowError, DataType, Field, Schema, TimeUnit};
43+
use datafusion::physical_plan::metrics::Time;
4344
use jni::sys::{jint, jlong};
4445
use std::{
4546
fs::OpenOptions,
@@ -3354,7 +3355,10 @@ pub fn process_sorted_row_partition(
33543355
let mut frozen: Vec<u8> = vec![];
33553356
let mut cursor = Cursor::new(&mut frozen);
33563357
cursor.seek(SeekFrom::End(0))?;
3357-
written += write_ipc_compressed(&batch, &mut cursor)?;
3358+
3359+
// we do not collect metrics in Native_writeSortedFileNative
3360+
let ipc_time = Time::default();
3361+
written += write_ipc_compressed(&batch, &mut cursor, &ipc_time)?;
33583362

33593363
if let Some(checksum) = &mut current_checksum {
33603364
checksum.update(&mut cursor)?;

0 commit comments

Comments
 (0)