Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 13 additions & 5 deletions docs/source/user-guide/tuning.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ Comet provides some tuning options to help you get the best performance from you

## Memory Tuning

Comet shares an off-heap memory pool between Spark and Comet. This requires setting `spark.memory.offHeap.enabled=true`.
Comet shares an off-heap memory pool between Spark and Comet. This requires setting `spark.memory.offHeap.enabled=true`.
If this setting is not enabled, Comet will not accelerate queries and will fall back to Spark.

Each executor will have a single memory pool which will be shared by all native plans being executed within that
Expand Down Expand Up @@ -105,8 +105,16 @@ then any shuffle operations that cannot be supported in this mode will fall back

## Metrics

Comet metrics are not directly comparable to Spark metrics in some cases.
Some Comet metrics are not directly comparable to Spark metrics in some cases:

`CometScanExec` uses nanoseconds for total scan time. Spark also measures scan time in nanoseconds but converts to
milliseconds _per batch_ which can result in a large loss of precision. In one case we saw total scan time
of 41 seconds reported as 23 seconds for example.
- `CometScanExec` uses nanoseconds for total scan time. Spark also measures scan time in nanoseconds but converts to
milliseconds _per batch_ which can result in a large loss of precision, making it difficult to compare scan times
between Spark and Comet.

Comet also adds some custom metrics:

### ShuffleWriterExec

| Metric | Description |
| ---------------- | --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| `jvm_fetch_time` | Measure the time it takes for `ShuffleWriterExec` to fetch batches from the JVM. Note that this does not include the execution time of the query that produced the input batches. |
26 changes: 20 additions & 6 deletions native/core/src/execution/datafusion/shuffle_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ impl ExecutionPlan for ShuffleWriterExec {
) -> Result<SendableRecordBatchStream> {
let input = self.input.execute(partition, Arc::clone(&context))?;
let metrics = ShuffleRepartitionerMetrics::new(&self.metrics, 0);
let jvm_fetch_time = MetricBuilder::new(&self.metrics).subset_time("jvm_fetch_time", 0);

Ok(Box::pin(RecordBatchStreamAdapter::new(
self.schema(),
Expand All @@ -151,6 +152,7 @@ impl ExecutionPlan for ShuffleWriterExec {
self.partitioning.clone(),
metrics,
context,
jvm_fetch_time,
)
.map_err(|e| ArrowError::ExternalError(Box::new(e))),
)
Expand Down Expand Up @@ -1083,6 +1085,7 @@ impl Debug for ShuffleRepartitioner {
}
}

#[allow(clippy::too_many_arguments)]
async fn external_shuffle(
mut input: SendableRecordBatchStream,
partition_id: usize,
Expand All @@ -1091,6 +1094,7 @@ async fn external_shuffle(
partitioning: Partitioning,
metrics: ShuffleRepartitionerMetrics,
context: Arc<TaskContext>,
jvm_fetch_time: Time,
) -> Result<SendableRecordBatchStream> {
let schema = input.schema();
let mut repartitioner = ShuffleRepartitioner::new(
Expand All @@ -1104,13 +1108,23 @@ async fn external_shuffle(
context.session_config().batch_size(),
);

while let Some(batch) = input.next().await {
// Block on the repartitioner to insert the batch and shuffle the rows
// into the corresponding partition buffer.
// Otherwise, pull the next batch from the input stream might overwrite the
// current batch in the repartitioner.
block_on(repartitioner.insert_batch(batch?))?;
loop {
let mut timer = jvm_fetch_time.timer();
let b = input.next().await;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so its a timer how long it takes to get new item from the stream

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, and in the case of ShuffleWriterExec, we know the input is always a ScanExec that is reading batches from the JVM

timer.stop();

match b {
Some(batch_result) => {
// Block on the repartitioner to insert the batch and shuffle the rows
// into the corresponding partition buffer.
// Otherwise, pull the next batch from the input stream might overwrite the
// current batch in the repartitioner.
block_on(repartitioner.insert_batch(batch_result?))?;
}
_ => break,
}
}

repartitioner.shuffle_write().await
}

Expand Down
5 changes: 4 additions & 1 deletion native/core/src/execution/operators/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,9 @@ use std::{
};

/// ScanExec reads batches of data from Spark via JNI. The source of the scan could be a file
/// scan or the result of reading a broadcast or shuffle exchange.
/// scan or the result of reading a broadcast or shuffle exchange. ScanExec isn't invoked
/// until the data is already available in the JVM. When CometExecIterator invokes
/// Native.executePlan, it passes in the memory addresses of the input batches.
#[derive(Debug, Clone)]
pub struct ScanExec {
/// The ID of the execution context that owns this subquery. We use this ID to retrieve the JVM
Expand All @@ -73,6 +75,7 @@ pub struct ScanExec {
cache: PlanProperties,
/// Metrics collector
metrics: ExecutionPlanMetricsSet,
/// Baseline metrics
baseline_metrics: BaselineMetrics,
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,9 @@ case class CometShuffleExchangeExec(
SQLShuffleReadMetricsReporter.createShuffleReadMetrics(sparkContext)
override lazy val metrics: Map[String, SQLMetric] = Map(
"dataSize" -> SQLMetrics.createSizeMetric(sparkContext, "data size"),
"jvm_fetch_time" -> SQLMetrics.createNanoTimingMetric(
sparkContext,
"time fetching batches from JVM"),
"numPartitions" -> SQLMetrics.createMetric(
sparkContext,
"number of partitions")) ++ readMetrics ++ writeMetrics
Expand Down Expand Up @@ -480,7 +483,14 @@ class CometShuffleWriteProcessor(
"output_rows" -> metrics(SQLShuffleWriteMetricsReporter.SHUFFLE_RECORDS_WRITTEN),
"data_size" -> metrics("dataSize"),
"elapsed_compute" -> metrics(SQLShuffleWriteMetricsReporter.SHUFFLE_WRITE_TIME))
val nativeMetrics = CometMetricNode(nativeSQLMetrics)

val nativeMetrics = if (metrics.contains("jvm_fetch_time")) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know why, but data_size is also repeated between metrics and nativeMetrics yet handled differently. Not super important though.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, will look into that

CometMetricNode(
nativeSQLMetrics ++ Map("jvm_fetch_time" ->
metrics("jvm_fetch_time")))
} else {
CometMetricNode(nativeSQLMetrics)
}

// Getting rid of the fake partitionId
val newInputs = inputs.asInstanceOf[Iterator[_ <: Product2[Any, Any]]].map(_._2)
Expand Down
Loading