Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 42 additions & 17 deletions docs/source/contributor-guide/plugin_overview.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,26 @@ under the License.

# Comet Plugin Architecture

## Overview

The Comet plugin enhances Spark SQL by introducing optimized query execution and shuffle mechanisms leveraging
native code. It integrates with Spark's plugin framework and extension API to replace or extend Spark's
default behavior.

---

# Plugin Components

## Comet SQL Plugin

The entry point to Comet is the `org.apache.spark.CometPlugin` class, which can be registered with Spark by adding the
following setting to the Spark configuration when launching `spark-shell` or `spark-submit`:
The entry point to Comet is the org.apache.spark.CometPlugin class, which is registered in Spark using the following
configuration:

```
--conf spark.plugins=org.apache.spark.CometPlugin
```

This class is loaded by Spark's plugin framework. It will be instantiated in the Spark driver only. Comet does not
provide any executor plugins.
The plugin is loaded on the Spark driver and does not provide executor-side plugins.

The plugin will update the current `SparkConf` with the extra configuration provided by Comet, such as executor memory
configuration.
Expand Down Expand Up @@ -87,37 +96,53 @@ In the native code there is a `PhysicalPlanner` struct (in `planner.rs`) which c
Apache DataFusion `ExecutionPlan`. In some cases, Comet provides specialized physical operators and expressions to
override the DataFusion versions to ensure compatibility with Apache Spark.

The leaf nodes in the physical plan are always `ScanExec` and each of these operators will make a JNI call to
The leaf nodes in the physical plan are always `ScanExec` and each of these operators will make a JNI call to
`CometBatchIterator.next()` to fetch the next input batch. The input could be a Comet native Parquet scan,
a Spark exchange, or another native plan.

`CometNativeExec` creates a `CometExecIterator` and applies this iterator to the input RDD
partitions. Each call to `CometExecIterator.next()` will invoke `Native.executePlan`. Once the plan finishes
executing, the resulting Arrow batches are imported into the JVM using Arrow FFI.

## Arrow
## Shuffle

Due to the hybrid execution model, it is necessary to pass batches of data between the JVM and native code.
Comet integrates with Spark's shuffle mechanism, optimizing both shuffle writes and reads. Comet's shuffle manager
must be registered with Spark using the following configuration:

```
--conf spark.shuffle.manager=org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager
```

### Shuffle Writes

Comet uses a combination of Arrow FFI and Arrow IPC to achieve this.
For shuffle writes, a `ShuffleMapTask` runs in the executors. This task contains a `ShuffleDependency` that is
broadcast to all of the executors. It then passes the input RDD to `ShuffleWriteProcessor.write()` which
requests a `ShuffleWriter` from the shuffle manager, and this is where it gets a Comet shuffle writer.

### Arrow FFI
`ShuffleWriteProcessor` then invokes the dependency RDD and fetches rows/batches and passes them to Comet's
shuffle writer, which writes batches to disk in Arrow IPC format.

The foundation for Arrow FFI is the [Arrow C Data Interface], which provides a stable ABI-compatible interface for
As a result, we cannot avoid having one native plan to produce the shuffle input and another native plan for
writing the batches to the shuffle file.

### Shuffle Reads

For shuffle reads a `ShuffledRDD` requests a `ShuffleReader` from the shuffle manager. Comet provides a
`CometBlockStoreShuffleReader` which is implemented in JVM and fetches blocks from Spark and then creates an
`ArrowReaderIterator` to process the blocks using Arrow's `StreamReader` for decoding IPC batches.

## Arrow FFI

Due to the hybrid execution model, it is necessary to pass batches of data between the JVM and native code.

The foundation for Arrow FFI is the [Arrow C Data Interface], which provides a stable ABI-compatible interface for
accessing Arrow data structures from multiple languages.

[Arrow C Data Interface]: https://arrow.apache.org/docs/format/CDataInterface.html

- `CometExecIterator` invokes native plans and uses Arrow FFI to read the output batches
- Native `ScanExec` operators call `CometBatchIterator` via JNI to fetch input batches from the JVM

### Arrow IPC

Comet native shuffle uses Arrow IPC to write batches to the shuffle files.

- `CometShuffleWriteProcessor` invokes a native plan to fetch batches and then passes them to native `ShuffleWriterExec`
- `CometBlockStoreShuffleReader` reads batches from shuffle files

## End to End Flow

The following diagram shows an example of the end-to-end flow for a query stage.
Expand Down