Skip to content

Commit

Permalink
Merge branch 'apache:main' into bloom_field_agg
Browse files Browse the repository at this point in the history
  • Loading branch information
mbutrovich authored Oct 3, 2024
2 parents 7a81f35 + 3413397 commit 013513e
Show file tree
Hide file tree
Showing 5 changed files with 125 additions and 41 deletions.
2 changes: 1 addition & 1 deletion common/src/main/java/org/apache/comet/NativeBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ static synchronized void load() {

// Try to load Comet library from the java.library.path.
try {
System.loadLibrary(libraryToLoad);
System.loadLibrary(NATIVE_LIB_NAME);
loaded = true;
} catch (UnsatisfiedLinkError ex) {
// Doesn't exist, so proceed to loading bundled library.
Expand Down
3 changes: 2 additions & 1 deletion docs/source/contributor-guide/benchmarking.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,14 @@ $SPARK_HOME/bin/spark-submit \
--conf spark.executor.memory=32G \
--conf spark.executor.cores=8 \
--conf spark.cores.max=8 \
--conf spark.memory.offHeap.enabled=true \
--conf spark.memory.offHeap.size=10g \
--jars $COMET_JAR \
--conf spark.driver.extraClassPath=$COMET_JAR \
--conf spark.executor.extraClassPath=$COMET_JAR \
--conf spark.plugins=org.apache.spark.CometPlugin \
--conf spark.comet.cast.allowIncompatible=true \
--conf spark.comet.exec.shuffle.enabled=true \
--conf spark.comet.exec.shuffle.mode=auto \
--conf spark.shuffle.manager=org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager \
tpcbench.py \
--benchmark tpch \
Expand Down
102 changes: 65 additions & 37 deletions docs/source/user-guide/tuning.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,45 +21,66 @@ under the License.

Comet provides some tuning options to help you get the best performance from your queries.

## Metrics
## Memory Tuning

Comet metrics are not directly comparable to Spark metrics in some cases.
Comet provides two options for memory management:

`CometScanExec` uses nanoseconds for total scan time. Spark also measures scan time in nanoseconds but converts to
milliseconds *per batch* which can result in a large loss of precision. In one case we saw total scan time
of 41 seconds reported as 23 seconds for example.
- **Unified Memory Management** shares an off-heap memory pool between Spark and Comet. This is the recommended option.
- **Native Memory Management** leverages DataFusion's memory management for the native plans and allocates memory independently of Spark.

## Memory Tuning
### Unified Memory Management

This option is automatically enabled when `spark.memory.offHeap.enabled=true`.

Each executor will have a single memory pool which will be shared by all native plans being executed within that
process, and by Spark itself. The size of the pool is specified by `spark.memory.offHeap.size`.

### Native Memory Management

This option is automatically enabled when `spark.memory.offHeap.enabled=false`.

Each native plan has a dedicated memory pool.

By default, the size of each pool is `spark.comet.memory.overhead.factor * spark.executor.memory`. The default value
for `spark.comet.memory.overhead.factor` is `0.2`.

Comet currently doesn't share the memory allocation from Spark but owns its own memory allocation.
That's said, Comet requires additional memory to be allocated. Comet provides some memory related configurations to help you tune the memory usage.
It is important to take executor concurrency into account. The maximum number of concurrent plans in an executor can
be calculated with `spark.executor.cores / spark.task.cpus`.

By default, the amount of memory is `spark.comet.memory.overhead.factor` * `spark.executor.memory`.
The default value for `spark.comet.memory.overhead.factor` is 0.2. You can increase the factor to require more
memory for Comet to use, if you see OOM error. Generally, increasing memory overhead will improve the performance of your queries.
For example, some operators like `SortMergeJoin` and `HashAggregate` may require more memory to run.
Once the memory is not enough, the operator will spill to disk, which will slow down the query.
For example, if the executor can execute 4 plans concurrently, then the total amount of memory allocated will be
`4 * spark.comet.memory.overhead.factor * spark.executor.memory`.

Besides, you can also set the memory explicitly by setting `spark.comet.memoryOverhead` to the desired value.
Comet will allocate at least `spark.comet.memory.overhead.min` memory.
It is also possible to set `spark.comet.memoryOverhead` to the desired size for each pool, rather than calculating
it based on `spark.comet.memory.overhead.factor`.

If both `spark.comet.memoryOverhead` and `spark.comet.memory.overhead.factor` are set, the former will be used.

## Memory Tuning using CometPlugin
Configuring memory for Spark and Comet might be a tedious task as it requires to tune Spark executor overhead memory and Comet memory overhead configs. Comet provides a Spark plugin `CometPlugin` which can be set up to your Spark application to help memory settings.
Comet will allocate at least `spark.comet.memory.overhead.min` memory per pool.

For users running the Comet in clusters like Kubernetes or YARN, `CometPlugin` can also make the resource manager respect correctly Comet memory parameters `spark.comet.memory*`.
it is needed to pass to the starting command line additional Spark configuration parameter `--conf spark.plugins=org.apache.spark.CometPlugin`
### Determining How Much Memory to Allocate

The resource managers respects Apache Spark memory configuration before starting the containers.
Generally, increasing memory overhead will improve query performance, especially for queries containing joins and
aggregates.

The `CometPlugin` plugin overrides `spark.executor.memoryOverhead` adding up the Comet memory configuration.
Once a memory pool is exhausted, the native plan will start spilling to disk, which will slow down the query.

Insufficient memory allocation can also lead to out-of-memory (OOM) errors.

## Configuring spark.executor.memoryOverhead

In some environments, such as Kubernetes and YARN, it is important to correctly set `spark.executor.memoryOverhead` so
that it is possible to allocate off-heap memory.

Comet will automatically set `spark.executor.memoryOverhead` based on the `spark.comet.memory*` settings so that
resource managers respect Apache Spark memory configuration before starting the containers.

Note that there is currently a known issue where this will be inaccurate when using Native Memory Management because it
does not take executor concurrency into account. The tracking issue for this is
https://github.com/apache/datafusion-comet/issues/949.

## Shuffle

Comet provides Comet shuffle features that can be used to improve the performance of your queries.
The following sections describe the different shuffle options available in Comet.
Comet provides accelerated shuffle implementations that can be used to improve the performance of your queries.

To enable Comet shuffle, set the following configuration in your Spark configuration:

Expand All @@ -71,30 +92,37 @@ spark.comet.exec.shuffle.enabled=true
`spark.shuffle.manager` is a Spark static configuration which cannot be changed at runtime.
It must be set before the Spark context is created. You can enable or disable Comet shuffle
at runtime by setting `spark.comet.exec.shuffle.enabled` to `true` or `false`.
Once it is disabled, Comet will fallback to the default Spark shuffle manager.

> **_NOTE:_** At the moment Comet Shuffle is not compatible with Spark AQE partition coalesce. To disable set `spark.sql.adaptive.coalescePartitions.enabled` to `false`.
Once it is disabled, Comet will fall back to the default Spark shuffle manager.

### Shuffle Mode

Comet provides three shuffle modes: Columnar Shuffle, Native Shuffle and Auto Mode.

#### Columnar Shuffle
#### Auto Mode

`spark.comet.exec.shuffle.mode` to `auto` will let Comet choose the best shuffle mode based on the query plan. This
is the default.

By default, once `spark.comet.exec.shuffle.enabled` is enabled, Comet uses JVM-based columnar shuffle
to improve the performance of shuffle operations. Columnar shuffle supports HashPartitioning,
RoundRobinPartitioning, RangePartitioning and SinglePartitioning. This mode has the highest
query coverage.
#### Columnar (JVM) Shuffle

Columnar shuffle can be enabled by setting `spark.comet.exec.shuffle.mode` to `jvm`.
Comet Columnar shuffle is JVM-based and supports `HashPartitioning`, `RoundRobinPartitioning`, `RangePartitioning`, and
`SinglePartitioning`. This mode has the highest query coverage.

Columnar shuffle can be enabled by setting `spark.comet.exec.shuffle.mode` to `jvm`. If this mode is explicitly set,
then any shuffle operations that cannot be supported in this mode will fall back to Spark.

#### Native Shuffle

Comet also provides a fully native shuffle implementation that can be used to improve the performance.
To enable native shuffle, just set `spark.comet.exec.shuffle.mode` to `native`
Comet also provides a fully native shuffle implementation, which generally provides the best performance. However,
native shuffle currently only supports `HashPartitioning` and `SinglePartitioning`.

To enable native shuffle, set `spark.comet.exec.shuffle.mode` to `native`. If this mode is explicitly set,
then any shuffle operations that cannot be supported in this mode will fall back to Spark.

Native shuffle only supports HashPartitioning and SinglePartitioning.
## Metrics

### Auto Mode
Comet metrics are not directly comparable to Spark metrics in some cases.

`spark.comet.exec.shuffle.mode` to `auto` will let Comet choose the best shuffle mode based on the query plan.
`CometScanExec` uses nanoseconds for total scan time. Spark also measures scan time in nanoseconds but converts to
milliseconds _per batch_ which can result in a large loss of precision. In one case we saw total scan time
of 41 seconds reported as 23 seconds for example.
8 changes: 6 additions & 2 deletions native/core/src/jvm_bridge/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -339,7 +339,7 @@ fn get_throwable_message(
throwable: &JThrowable,
) -> CometResult<String> {
unsafe {
let message = env
let message: JString = env
.call_method_unchecked(
throwable,
jvm_classes.throwable_get_message_method,
Expand All @@ -348,7 +348,11 @@ fn get_throwable_message(
)?
.l()?
.into();
let message_str = env.get_string(&message)?.into();
let message_str = if !message.is_null() {
env.get_string(&message)?.into()
} else {
String::from("null")
};

let cause: JThrowable = env
.call_method_unchecked(
Expand Down
51 changes: 51 additions & 0 deletions spark/src/test/scala/org/apache/comet/CometNativeSuite.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.comet

import org.apache.spark.SparkException
import org.apache.spark.sql.CometTestBase
import org.apache.spark.sql.catalyst.expressions.PrettyAttribute
import org.apache.spark.sql.comet.{CometExec, CometExecUtils}
import org.apache.spark.sql.types.LongType
import org.apache.spark.sql.vectorized.ColumnarBatch

class CometNativeSuite extends CometTestBase {
test("test handling NPE thrown by JVM") {
val rdd = spark.range(0, 1).rdd.map { value =>
val limitOp =
CometExecUtils.getLimitNativePlan(Seq(PrettyAttribute("test", LongType)), 100).get
val cometIter = CometExec.getCometIterator(
Seq(new Iterator[ColumnarBatch] {
override def hasNext: Boolean = true
override def next(): ColumnarBatch = throw new NullPointerException()
}),
1,
limitOp)
cometIter.next()
cometIter.close()
value
}

val exception = intercept[SparkException] {
rdd.collect()
}
assert(exception.getMessage contains "java.lang.NullPointerException")
}
}

0 comments on commit 013513e

Please sign in to comment.