Skip to content

Conversation

@martin-augment
Copy link
Owner

2758: To review by AI

@coderabbitai
Copy link

coderabbitai bot commented Nov 11, 2025

Walkthrough

Adds comprehensive documentation for implementing new Spark physical operators in Apache DataFusion Comet, covering native (Rust) and JVM (Scala) implementations, protobuf integration, serde creation, registration procedures, and testing strategies. Updates the contributor guide index accordingly.

Changes

Cohort / File(s) Change Summary
Documentation - Contributor Guide
docs/source/contributor-guide/adding_a_new_operator.md, docs/source/contributor-guide/index.md
Introduces new guide documenting the complete workflow for adding Spark physical operators to Comet, including protobuf schema updates, operator registration in opSerdeMap, support level determination via getSupportLevel, native Rust implementation patterns, JVM operator guidance, and integration testing approaches. Updates contributor guide index to reference the new operator implementation documentation.
✨ Finishing touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch pr-2758-2025-11-11-14-20-09

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@claude
Copy link

claude bot commented Nov 11, 2025

PR Review: Adding a New Operator Guide

I've completed a comprehensive review of this documentation PR. Overall, this is an excellent addition to the contributor guide that will be invaluable for developers adding new operators to DataFusion Comet. The documentation is well-structured, technically accurate, and follows best practices.

✅ Strengths

  1. Comprehensive Coverage: The guide covers all three types of operators (Native, JVM, and Sinks) with clear explanations of when to use each approach.

  2. Step-by-Step Structure: The numbered steps for implementing a native operator are logical and easy to follow, from protobuf definition through Rust implementation and testing.

  3. Excellent Code Examples: The examples match the actual codebase patterns:

    • The CometFilter and CometProject examples accurately reflect the real implementations
    • Protobuf examples match the structure in operator.proto
    • The QueryPlanSerde registration examples are correct
  4. Support Level Guidance: The detailed explanation of getSupportLevel with Compatible, Incompatible, and Unsupported states is very helpful and matches the framework's design.

  5. Consistency with Existing Docs: The document follows the same Apache license header format, markdown style, and structure as the companion "Adding a New Expression" guide.

📝 Suggestions for Improvement

1. Minor Technical Clarifications

Line 270: The test example shows checkSparkAnswerAndOperator usage but could benefit from a note about what this method validates:

// Suggestion: Add a brief comment
checkSparkAnswerAndOperator(s"SELECT * FROM $table")
// This validates: 1) Results match Spark's native execution
//                 2) Comet operator is actually used (no fallback)

The documentation does explain this on line 274-277, but having a comment in the code example would reinforce the learning.

2. Protobuf Field Numbering

Lines 65-76: The example shows choosing field number 112 for a new operator. While the actual operator.proto ends at 111, it would be helpful to add a note:

YourNewOperator your_new_operator = 112;  // Choose next available number
// Note: Field numbers should never be reused after deletion

3. Rust Planner Example

Lines 208-226: The planner example shows the pattern but could mention where in the file to add this code. Consider adding:

// In the create_plan or similar method:
// (Located around line 300-500 depending on the match statement)
match op.op_struct.as_ref() {

4. Missing Configuration Documentation

Lines 199-203: The configuration entry example is good, but could mention that these configs should also be documented in the user-facing configuration documentation:

// Add this config, then update docs/source/user-guide/latest/configs.md
val COMET_EXEC_YOUR_OPERATOR_ENABLED: ConfigEntry[Boolean] = ...

5. Test Suite Location

Lines 255-278: The testing section mentions CometExecSuite but doesn't clarify that expression-related tests go in CometExpressionSuite (as shown in the expression guide). For consistency, perhaps add:

// For operator tests, add to:
// spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala

🔍 Verification Notes

I've verified the following against the actual codebase:

CometFilter example matches /spark/src/main/scala/org/apache/comet/serde/operator/CometFilter.scala
CometProject example matches /spark/src/main/scala/org/apache/comet/serde/operator/CometProject.scala
✅ Protobuf structure matches /native/proto/src/proto/operator.proto
✅ QueryPlanSerde registration pattern matches actual implementation
✅ Test patterns align with CometExecSuite structure
✅ Documentation style consistent with adding_a_new_expression.md

🎯 Best Practices Observed

  1. Clear Type Safety: Examples show proper use of type-safe CometOperatorSerde[T] trait
  2. Error Handling: Proper use of Option types and withInfo for fallback tagging
  3. Child Validation: Examples correctly check childOp.nonEmpty before building operators
  4. Configuration Management: Good guidance on enabledConfig and support levels

🔐 Security Considerations

No security concerns identified. The documentation:

  • Doesn't expose sensitive information
  • Follows secure coding patterns (no unsafe operations)
  • Properly handles configuration and validation

📊 Test Coverage

The documentation adequately covers testing requirements:

  • Integration tests with checkSparkAnswerAndOperator
  • Rust unit tests mention
  • Explains what each test validates

📚 Documentation Quality

  • Clarity: Excellent. Technical concepts are well-explained
  • Completeness: Comprehensive coverage of the topic
  • Organization: Logical flow from overview to implementation
  • Examples: High-quality, accurate code samples

✨ Final Recommendation

Approve with minor suggestions. This documentation is production-ready and will significantly help contributors. The suggestions above are optional enhancements that would make a great document even better, but they're not blocking issues.

The guide successfully complements the existing "Adding a New Expression" documentation and provides crucial knowledge for extending Comet's operator support.

Estimated Impact: This will likely reduce the learning curve for new contributors by several hours and decrease the number of questions about operator implementation patterns.


Review generated with assistance from Claude Code

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 1e90cce and 692b83c.

📒 Files selected for processing (2)
  • docs/source/contributor-guide/adding_a_new_operator.md (1 hunks)
  • docs/source/contributor-guide/index.md (1 hunks)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: claude-review
🔇 Additional comments (10)
docs/source/contributor-guide/index.md (1)

33-33: Index entry placement is logical and well-positioned.

The new entry is correctly placed between Benchmarking Guide and Adding a New Expression, which follows a sensible progression from framework understanding to concrete extension patterns.

docs/source/contributor-guide/adding_a_new_operator.md (9)

20-56: Overview section clearly explains operator types.

The categorization of Comet operators (native, JVM, and sinks) is well-structured and helps guide readers to the correct implementation path.


57-102: Step 1-2 protobuf and Serde pattern guidance is clear.

The examples for defining protobuf messages and implementing CometOperatorSerde are realistic, but could benefit from verification that the CometFilter and CometProject patterns match actual implementations in the codebase. The inheritance pattern and method signatures should be confirmed against the actual trait definition.


104-156: Code examples are well-commented but need verification for accuracy.

The Filter and Project implementation examples follow sensible patterns, but I recommend verifying:

  1. That exprToProto helper exists and has the documented signature
  2. That the withInfo fallback tagging pattern is accurate
  3. That the builder pattern matches actual protobuf code generation
#!/bin/bash
# Description: Verify specific code patterns in QueryPlanSerde

# Check for exprToProto method
echo "=== Checking exprToProto helper ==="
rg -A5 "def exprToProto" --type=scala | head -20

# Check for withInfo pattern
echo "=== Checking withInfo fallback helper ==="
rg -n "def withInfo|withInfo\(" --type=scala | grep -i query | head -10

158-183: getSupportLevel pattern documentation is clear.

The three support levels (Compatible, Incompatible, Unsupported) are well-explained with good conditional logic examples.


184-208: Registration and configuration steps are practical.

Lines 184-208 provide clear guidance for the opSerdeMap registration and CometConf entry pattern. These should match actual project patterns.

Verify that the configuration key pattern spark.comet.exec.yourOperator.enabled matches conventions used for existing operators.


317-347: Common patterns section provides useful helper reference.

The section on expression conversion, fallback handling, and child validation patterns is well-organized and references actual QueryPlanSerde utilities.


257-281: Test guidance for checkSparkAnswerAndOperator helper is accurate.

Verification confirms the function exists in spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala as a protected member with multiple overloaded signatures. It is extensively used throughout the test suite to verify both correctness and operator execution. The documentation guidance is correct.


210-256: All documented file references and error handling patterns are verified and accurate.

The documentation correctly references:

  • expand.rs exists at native/core/src/execution/operators/expand.rs
  • scan.rs exists at native/core/src/execution/operators/scan.rs
  • ExecutionError is defined as a public enum in native/core/src/execution/operators/mod.rs

The code examples showing ExecutionError return types and ExecutionPlan trait usage match actual project patterns used throughout the codebase.


282-300: Clarify documentation update guidance: reference operators.md not compatibility.md.

The path docs/source/user-guide/latest/compatibility.md exists and is correct, but it documents known incompatibilities and limitations—not the supported operators list. The actual list of supported Spark operators is in docs/source/user-guide/latest/operators.md, which contains a compatibility table. Step 7 should direct contributors to update that operators table instead, or clarify specifically what needs to be added to compatibility.md (e.g., documenting any known limitations of the new operator).

There is no OPERATORS.md file; operators.md is a reference table, not a location for operator examples.

Comment on lines +1 to +354
<!---
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->

# Adding a New Operator

This guide explains how to add support for a new Spark physical operator in Apache DataFusion Comet.

## Overview

`CometExecRule` is responsible for replacing Spark operators with Comet operators. There are different approaches to implementing Comet operators depending on where they execute and how they integrate with the native execution engine.

### Types of Comet Operators

#### 1. Comet Native Operators

These operators run entirely in native Rust code and are the primary way to accelerate Spark workloads. For native operators, `CometExecRule` delegates to `QueryPlanSerde.operator2Proto` to:

- Check if the operator is enabled or disabled via configuration
- Validate if the operator can be supported
- Tag the operator with fallback reasons if conversion fails
- Serialize the operator to protobuf for native execution

Examples: `ProjectExec`, `FilterExec`, `SortExec`, `HashAggregateExec`, `SortMergeJoinExec`

#### 2. Comet JVM Operators

These operators run in the JVM but are part of the Comet execution path. For JVM operators, all checks happen in `CometExecRule` rather than `QueryPlanSerde`, because they don't need protobuf serialization.

Examples: `CometBroadcastExchangeExec`, `CometShuffleExchangeExec`

#### 3. Comet Sinks

Some operators serve as "sinks" or entry points for native execution, meaning they can be leaf nodes that feed data into native execution blocks.

Examples: `CometScanExec`, `CometBatchScanExec`, `UnionExec`, `CometSparkToColumnarExec`

## Implementing a Native Operator

This section focuses on adding a native operator, which is the most common and complex case.

### Step 1: Define the Protobuf Message

First, add the operator definition to `native/proto/src/proto/operator.proto`.

#### Add to the Operator Message

Add your new operator to the `oneof op_struct` in the main `Operator` message:

```proto
message Operator {
repeated Operator children = 1;
uint32 plan_id = 2;

oneof op_struct {
Scan scan = 100;
Projection projection = 101;
Filter filter = 102;
// ... existing operators ...
YourNewOperator your_new_operator = 112; // Choose next available number
}
}
```

#### Define the Operator Message

Create a message for your operator with the necessary fields:

```proto
message YourNewOperator {
// Fields specific to your operator
repeated spark.spark_expression.Expr expressions = 1;
// Add other configuration fields as needed
}
```

For reference, see existing operators like `Filter` (simple), `HashAggregate` (complex), or `Sort` (with ordering).

### Step 2: Create a CometOperatorSerde Implementation

Create a new Scala file in `spark/src/main/scala/org/apache/comet/serde/operator/` (e.g., `CometYourOperator.scala`) that extends `CometOperatorSerde[T]` where `T` is the Spark operator type.

The `CometOperatorSerde` trait provides three key methods:

- `enabledConfig: Option[ConfigEntry[Boolean]]` - Configuration to enable/disable this operator
- `getSupportLevel(operator: T): SupportLevel` - Determines if the operator is supported
- `convert(op: T, builder: Operator.Builder, childOp: Operator*): Option[Operator]` - Converts to protobuf

#### Simple Example (Filter)

```scala
object CometFilter extends CometOperatorSerde[FilterExec] {

override def enabledConfig: Option[ConfigEntry[Boolean]] =
Some(CometConf.COMET_EXEC_FILTER_ENABLED)

override def convert(
op: FilterExec,
builder: Operator.Builder,
childOp: OperatorOuterClass.Operator*): Option[OperatorOuterClass.Operator] = {
val cond = exprToProto(op.condition, op.child.output)

if (cond.isDefined && childOp.nonEmpty) {
val filterBuilder = OperatorOuterClass.Filter
.newBuilder()
.setPredicate(cond.get)
Some(builder.setFilter(filterBuilder).build())
} else {
withInfo(op, op.condition, op.child)
None
}
}
}
```

#### More Complex Example (Project)

```scala
object CometProject extends CometOperatorSerde[ProjectExec] {

override def enabledConfig: Option[ConfigEntry[Boolean]] =
Some(CometConf.COMET_EXEC_PROJECT_ENABLED)

override def convert(
op: ProjectExec,
builder: Operator.Builder,
childOp: Operator*): Option[OperatorOuterClass.Operator] = {
val exprs = op.projectList.map(exprToProto(_, op.child.output))

if (exprs.forall(_.isDefined) && childOp.nonEmpty) {
val projectBuilder = OperatorOuterClass.Projection
.newBuilder()
.addAllProjectList(exprs.map(_.get).asJava)
Some(builder.setProjection(projectBuilder).build())
} else {
withInfo(op, op.projectList: _*)
None
}
}
}
```

#### Using getSupportLevel

Override `getSupportLevel` to control operator support based on specific conditions:

```scala
override def getSupportLevel(operator: YourOperatorExec): SupportLevel = {
// Check for unsupported features
if (operator.hasUnsupportedFeature) {
return Unsupported(Some("Feature X is not supported"))
}

// Check for incompatible behavior
if (operator.hasKnownDifferences) {
return Incompatible(Some("Known differences in edge case Y"))
}

Compatible()
}
```

Support levels:

- **`Compatible()`** - Fully compatible with Spark (default)
- **`Incompatible()`** - Supported but may differ; requires explicit opt-in
- **`Unsupported()`** - Not supported under current conditions

### Step 3: Register the Operator

Add your operator to the `opSerdeMap` in `QueryPlanSerde.scala`:

```scala
private val opSerdeMap: Map[Class[_ <: SparkPlan], CometOperatorSerde[_]] =
Map(
classOf[ProjectExec] -> CometProject,
classOf[FilterExec] -> CometFilter,
// ... existing operators ...
classOf[YourOperatorExec] -> CometYourOperator,
)
```

### Step 4: Add Configuration Entry

Add a configuration entry in `common/src/main/scala/org/apache/comet/CometConf.scala`:

```scala
val COMET_EXEC_YOUR_OPERATOR_ENABLED: ConfigEntry[Boolean] =
conf("spark.comet.exec.yourOperator.enabled")
.doc("Whether to enable your operator in Comet")
.booleanConf
.createWithDefault(true)
```

### Step 5: Implement the Native Operator in Rust

#### Update the Planner

In `native/core/src/execution/planner.rs`, add a match case in the operator deserialization logic to handle your new protobuf message:

```rust
use datafusion_comet_proto::spark_operator::operator::OpStruct;

// In the create_plan or similar method:
match op.op_struct.as_ref() {
Some(OpStruct::Scan(scan)) => {
// ... existing cases ...
}
Some(OpStruct::YourNewOperator(your_op)) => {
create_your_operator_exec(your_op, children, session_ctx)
}
// ... other cases ...
}
```

#### Implement the Operator

Create the operator implementation, either in an existing file or a new file in `native/core/src/execution/operators/`:

```rust
use datafusion::physical_plan::{ExecutionPlan, ...};
use datafusion_comet_proto::spark_operator::YourNewOperator;

pub fn create_your_operator_exec(
op: &YourNewOperator,
children: Vec<Arc<dyn ExecutionPlan>>,
session_ctx: &SessionContext,
) -> Result<Arc<dyn ExecutionPlan>, ExecutionError> {
// Deserialize expressions and configuration
// Create and return the execution plan

// Option 1: Use existing DataFusion operator
// Ok(Arc::new(SomeDataFusionExec::try_new(...)?))

// Option 2: Implement custom operator (see ExpandExec for example)
// Ok(Arc::new(YourCustomExec::new(...)))
}
```

For custom operators, you'll need to implement the `ExecutionPlan` trait. See `native/core/src/execution/operators/expand.rs` or `scan.rs` for examples.

### Step 6: Add Tests

#### Scala Integration Tests

Add tests in `spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala` or a related test suite:

```scala
test("your operator") {
withTable("test_table") {
sql("CREATE TABLE test_table(col1 INT, col2 STRING) USING parquet")
sql("INSERT INTO test_table VALUES (1, 'a'), (2, 'b')")

// Test query that uses your operator
checkSparkAnswerAndOperator(
"SELECT * FROM test_table WHERE col1 > 1"
)
}
}
```

The `checkSparkAnswerAndOperator` helper verifies:

1. Results match Spark's native execution
2. Your operator is actually being used (not falling back)

#### Rust Unit Tests

Add unit tests in your Rust implementation file:

```rust
#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_your_operator() {
// Test operator creation and execution
}
}
```

### Step 7: Update Documentation

Add your operator to the supported operators list in `docs/source/user-guide/latest/compatibility.md` or similar documentation.

## Implementing a JVM Operator

For operators that run in the JVM:

1. Create a new operator class extending appropriate Spark base classes in `spark/src/main/scala/org/apache/comet/`
2. Add matching logic in `CometExecRule.scala` to transform the Spark operator
3. No protobuf or Rust implementation needed

Example pattern from `CometExecRule.scala`:

```scala
case s: ShuffleExchangeExec if nativeShuffleSupported(s) =>
CometShuffleExchangeExec(s, shuffleType = CometNativeShuffle)
```

## Common Patterns and Helpers

### Expression Conversion

Use `QueryPlanSerde.exprToProto` to convert Spark expressions to protobuf:

```scala
val protoExpr = exprToProto(sparkExpr, inputSchema)
```

### Handling Fallback

Use `withInfo` to tag operators with fallback reasons:

```scala
if (!canConvert) {
withInfo(op, "Reason for fallback", childNodes: _*)
return None
}
```

### Child Operator Validation

Always check that child operators were successfully converted:

```scala
if (childOp.isEmpty) {
// Cannot convert if children failed
return None
}
```

## Debugging Tips

1. **Enable verbose logging**: Set `spark.comet.explain.verbose=true` to see detailed plan transformations
2. **Check fallback reasons**: Set `spark.comet.logFallbackReasons=true` to log why operators fall back to Spark
3. **Verify protobuf**: Add debug prints in Rust to inspect deserialized operators
4. **Use EXPLAIN**: Run `EXPLAIN EXTENDED` on queries to see the physical plan
Copy link

@coderabbitai coderabbitai bot Nov 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

Comprehensive and well-structured operator implementation guide.

The documentation provides clear step-by-step guidance for contributors implementing new operators. The structure progresses logically from protobuf definition through Rust implementation and testing.

However, I recommend verifying that specific code utilities and file paths referenced in the guide actually exist in the current codebase:


🏁 Script executed:

#!/bin/bash
# Description: Verify key file paths and utilities referenced in the operator guide

# Check for protobuf operator schema file
echo "=== Checking protobuf operator schema ==="
fd -type f -name "operator.proto" | grep -E "native|proto/src"

# Check for CometOperatorSerde pattern
echo "=== Checking for CometOperatorSerde implementations ==="
fd -type f -name "Comet*.scala" | grep -E "serde/operator|exec" | head -10

# Check for test utilities like checkSparkAnswerAndOperator
echo "=== Checking for test utilities ==="
rg -n "checkSparkAnswerAndOperator" --type=scala

# Check for CometConf configuration pattern
echo "=== Checking for CometConf ==="
fd -type f -name "CometConf.scala"

# Check for planner.rs in Rust code
echo "=== Checking for Rust planner ==="
fd -type f -name "planner.rs" | grep -E "native|core/src"

# Check for operator examples (Filter, Project)
echo "=== Checking for example operators in serde ==="
fd -type f -name "*.scala" | xargs rg -l "CometFilter|CometProject" | head -5

Length of output: 74961


[/run_scripts]


🏁 Script executed:

#!/bin/bash
# Corrected verification script for operator guide file paths

echo "=== Checking protobuf operator schema ==="
find . -name "operator.proto" -type f 2>/dev/null | head -5

echo "=== Checking for CometOperatorSerde ==="
find . -name "CometOperatorSerde*" -type f 2>/dev/null | head -5

echo "=== Checking for Scala serde operator implementations ==="
find . -path "*/serde/operator/*.scala" -type f 2>/dev/null | head -10

echo "=== Checking for CometConf.scala ==="
find . -name "CometConf.scala" -type f 2>/dev/null

echo "=== Checking for Rust planner ==="
find . -name "planner.rs" -type f 2>/dev/null | head -5

echo "=== Checking for CometFilter and CometProject Scala files ==="
find . -path "*/serde/operator/*" -name "*Filter*.scala" -o -path "*/serde/operator/*" -name "*Project*.scala" 2>/dev/null | head -10

echo "=== Verify test suite exists ==="
find . -name "CometExecSuite.scala" -type f 2>/dev/null

Length of output: 1480


[/run_scripts]


🏁 Script executed:

#!/bin/bash
# Verify code patterns match documentation

echo "=== CometOperatorSerde trait definition ==="
head -80 ./spark/src/main/scala/org/apache/comet/serde/CometOperatorSerde.scala

echo -e "\n=== CometFilter implementation (first 50 lines) ==="
head -50 ./spark/src/main/scala/org/apache/comet/serde/operator/CometFilter.scala

echo -e "\n=== CometProject implementation (first 50 lines) ==="
head -50 ./spark/src/main/scala/org/apache/comet/serde/operator/CometProject.scala

echo -e "\n=== Check QueryPlanSerde for opSerdeMap ==="
grep -n "opSerdeMap" ./spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala | head -5

echo -e "\n=== Sample QueryPlanSerde registration ==="
grep -A 30 "private val opSerdeMap" ./spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala | head -40

echo -e "\n=== CometExecRule for JVM operators ==="
grep -n "CometShuffleExchangeExec\|CometBroadcastExchangeExec" ./spark/src/main/scala/org/apache/comet/exec/CometExecRule.scala | head -5

Length of output: 8063


[/run_scripts]


🏁 Script executed:

#!/bin/bash
# Find CometExecRule and verify JVM operator patterns

echo "=== Search for CometExecRule ==="
find . -name "*CometExecRule*" -type f 2>/dev/null

echo -e "\n=== Search for ShuffleExchangeExec handling ==="
find . -name "*.scala" -type f -exec grep -l "ShuffleExchangeExec\|CometShuffleExchangeExec" {} \; 2>/dev/null | head -5

echo -e "\n=== Search for CometBroadcastExchangeExec ==="
find . -name "*.scala" -type f -exec grep -l "CometBroadcastExchangeExec" {} \; 2>/dev/null | head -5

echo -e "\n=== Check for rule files in exec directory ==="
find ./spark/src/main/scala/org/apache/comet -name "*Rule*.scala" -type f 2>/dev/null

echo -e "\n=== Verify expand.rs exists for custom Rust operator example ==="
find . -name "expand.rs" -type f 2>/dev/null

echo -e "\n=== Verify scan.rs exists ==="
find . -name "scan.rs" -type f 2>/dev/null

Length of output: 1306


[/run_scripts]


🏁 Script executed:

#!/bin/bash
# Verify JVM operator patterns and check for incorrect path references

echo "=== JVM operator pattern in CometExecRule ==="
grep -n "ShuffleExchangeExec\|CometShuffleExchangeExec\|CometNativeShuffle" ./spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala | head -10

echo -e "\n=== Extract sample JVM operator transformation ==="
grep -A 5 "case.*ShuffleExchangeExec" ./spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala | head -20

echo -e "\n=== Verify CometBroadcastExchangeExec location ==="
ls -la ./spark/src/main/scala/org/apache/spark/sql/comet/CometBroadcastExchangeExec.scala

echo -e "\n=== Verify native/proto/src/proto structure matches doc ==="
ls -la ./native/proto/src/proto/ | grep operator

echo -e "\n=== Verify native/core/src/execution structure ==="
ls -la ./native/core/src/execution/planner.rs
ls -la ./native/core/src/execution/operators/

Length of output: 2845


Update file path for CometExecRule.

The documentation references an incorrect directory for CometExecRule.scala. It should be located in the rules package, not the exec package.


Adding a New Operator

This guide explains how to add support for a new Spark physical operator in Apache DataFusion Comet.

Overview

CometExecRule is responsible for replacing Spark operators with Comet operators. There are different approaches to implementing Comet operators depending on where they execute and how they integrate with the native execution engine.

Types of Comet Operators

1. Comet Native Operators

These operators run entirely in native Rust code and are the primary way to accelerate Spark workloads. For native operators, CometExecRule delegates to QueryPlanSerde.operator2Proto to:

  • Check if the operator is enabled or disabled via configuration
  • Validate if the operator can be supported
  • Tag the operator with fallback reasons if conversion fails
  • Serialize the operator to protobuf for native execution

Examples: ProjectExec, FilterExec, SortExec, HashAggregateExec, SortMergeJoinExec

2. Comet JVM Operators

These operators run in the JVM but are part of the Comet execution path. For JVM operators, all checks happen in CometExecRule rather than QueryPlanSerde, because they don't need protobuf serialization.

Examples: CometBroadcastExchangeExec, CometShuffleExchangeExec

3. Comet Sinks

Some operators serve as "sinks" or entry points for native execution, meaning they can be leaf nodes that feed data into native execution blocks.

Examples: CometScanExec, CometBatchScanExec, UnionExec, CometSparkToColumnarExec

Implementing a Native Operator

This section focuses on adding a native operator, which is the most common and complex case.

Step 1: Define the Protobuf Message

First, add the operator definition to native/proto/src/proto/operator.proto.

Add to the Operator Message

Add your new operator to the oneof op_struct in the main Operator message:

message Operator {
  repeated Operator children = 1;
  uint32 plan_id = 2;

  oneof op_struct {
    Scan scan = 100;
    Projection projection = 101;
    Filter filter = 102;
    // ... existing operators ...
    YourNewOperator your_new_operator = 112;  // Choose next available number
  }
}

Define the Operator Message

Create a message for your operator with the necessary fields:

message YourNewOperator {
  // Fields specific to your operator
  repeated spark.spark_expression.Expr expressions = 1;
  // Add other configuration fields as needed
}

For reference, see existing operators like Filter (simple), HashAggregate (complex), or Sort (with ordering).

Step 2: Create a CometOperatorSerde Implementation

Create a new Scala file in spark/src/main/scala/org/apache/comet/serde/operator/ (e.g., CometYourOperator.scala) that extends CometOperatorSerde[T] where T is the Spark operator type.

The CometOperatorSerde trait provides three key methods:

  • enabledConfig: Option[ConfigEntry[Boolean]] - Configuration to enable/disable this operator
  • getSupportLevel(operator: T): SupportLevel - Determines if the operator is supported
  • convert(op: T, builder: Operator.Builder, childOp: Operator*): Option[Operator] - Converts to protobuf

Simple Example (Filter)

object CometFilter extends CometOperatorSerde[FilterExec] {

  override def enabledConfig: Option[ConfigEntry[Boolean]] =
    Some(CometConf.COMET_EXEC_FILTER_ENABLED)

  override def convert(
      op: FilterExec,
      builder: Operator.Builder,
      childOp: OperatorOuterClass.Operator*): Option[OperatorOuterClass.Operator] = {
    val cond = exprToProto(op.condition, op.child.output)

    if (cond.isDefined && childOp.nonEmpty) {
      val filterBuilder = OperatorOuterClass.Filter
        .newBuilder()
        .setPredicate(cond.get)
      Some(builder.setFilter(filterBuilder).build())
    } else {
      withInfo(op, op.condition, op.child)
      None
    }
  }
}

More Complex Example (Project)

object CometProject extends CometOperatorSerde[ProjectExec] {

  override def enabledConfig: Option[ConfigEntry[Boolean]] =
    Some(CometConf.COMET_EXEC_PROJECT_ENABLED)

  override def convert(
      op: ProjectExec,
      builder: Operator.Builder,
      childOp: Operator*): Option[OperatorOuterClass.Operator] = {
    val exprs = op.projectList.map(exprToProto(_, op.child.output))

    if (exprs.forall(_.isDefined) && childOp.nonEmpty) {
      val projectBuilder = OperatorOuterClass.Projection
        .newBuilder()
        .addAllProjectList(exprs.map(_.get).asJava)
      Some(builder.setProjection(projectBuilder).build())
    } else {
      withInfo(op, op.projectList: _*)
      None
    }
  }
}

Using getSupportLevel

Override getSupportLevel to control operator support based on specific conditions:

override def getSupportLevel(operator: YourOperatorExec): SupportLevel = {
  // Check for unsupported features
  if (operator.hasUnsupportedFeature) {
    return Unsupported(Some("Feature X is not supported"))
  }

  // Check for incompatible behavior
  if (operator.hasKnownDifferences) {
    return Incompatible(Some("Known differences in edge case Y"))
  }

  Compatible()
}

Support levels:

  • Compatible() - Fully compatible with Spark (default)
  • Incompatible() - Supported but may differ; requires explicit opt-in
  • Unsupported() - Not supported under current conditions

Step 3: Register the Operator

Add your operator to the opSerdeMap in QueryPlanSerde.scala:

private val opSerdeMap: Map[Class[_ <: SparkPlan], CometOperatorSerde[_]] =
  Map(
    classOf[ProjectExec] -> CometProject,
    classOf[FilterExec] -> CometFilter,
    // ... existing operators ...
    classOf[YourOperatorExec] -> CometYourOperator,
  )

Step 4: Add Configuration Entry

Add a configuration entry in common/src/main/scala/org/apache/comet/CometConf.scala:

val COMET_EXEC_YOUR_OPERATOR_ENABLED: ConfigEntry[Boolean] =
  conf("spark.comet.exec.yourOperator.enabled")
    .doc("Whether to enable your operator in Comet")
    .booleanConf
    .createWithDefault(true)

Step 5: Implement the Native Operator in Rust

Update the Planner

In native/core/src/execution/planner.rs, add a match case in the operator deserialization logic to handle your new protobuf message:

use datafusion_comet_proto::spark_operator::operator::OpStruct;

// In the create_plan or similar method:
match op.op_struct.as_ref() {
    Some(OpStruct::Scan(scan)) => {
        // ... existing cases ...
    }
    Some(OpStruct::YourNewOperator(your_op)) => {
        create_your_operator_exec(your_op, children, session_ctx)
    }
    // ... other cases ...
}

Implement the Operator

Create the operator implementation, either in an existing file or a new file in native/core/src/execution/operators/:

use datafusion::physical_plan::{ExecutionPlan, ...};
use datafusion_comet_proto::spark_operator::YourNewOperator;

pub fn create_your_operator_exec(
    op: &YourNewOperator,
    children: Vec<Arc<dyn ExecutionPlan>>,
    session_ctx: &SessionContext,
) -> Result<Arc<dyn ExecutionPlan>, ExecutionError> {
    // Deserialize expressions and configuration
    // Create and return the execution plan

    // Option 1: Use existing DataFusion operator
    // Ok(Arc::new(SomeDataFusionExec::try_new(...)?))

    // Option 2: Implement custom operator (see ExpandExec for example)
    // Ok(Arc::new(YourCustomExec::new(...)))
}

For custom operators, you'll need to implement the ExecutionPlan trait. See native/core/src/execution/operators/expand.rs or scan.rs for examples.

Step 6: Add Tests

Scala Integration Tests

Add tests in spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala or a related test suite:

test("your operator") {
  withTable("test_table") {
    sql("CREATE TABLE test_table(col1 INT, col2 STRING) USING parquet")
    sql("INSERT INTO test_table VALUES (1, 'a'), (2, 'b')")

    // Test query that uses your operator
    checkSparkAnswerAndOperator(
      "SELECT * FROM test_table WHERE col1 > 1"
    )
  }
}

The checkSparkAnswerAndOperator helper verifies:

  1. Results match Spark's native execution
  2. Your operator is actually being used (not falling back)

Rust Unit Tests

Add unit tests in your Rust implementation file:

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_your_operator() {
        // Test operator creation and execution
    }
}

Step 7: Update Documentation

Add your operator to the supported operators list in docs/source/user-guide/latest/compatibility.md or similar documentation.

Implementing a JVM Operator

For operators that run in the JVM:

  1. Create a new operator class extending appropriate Spark base classes in spark/src/main/scala/org/apache/spark/sql/comet/
  2. Add matching logic in spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala to transform the Spark operator
  3. No protobuf or Rust implementation needed

Example pattern from CometExecRule.scala:

case s: ShuffleExchangeExec if nativeShuffleSupported(s) =>
  CometShuffleExchangeExec(s, shuffleType = CometNativeShuffle)

Common Patterns and Helpers

Expression Conversion

Use QueryPlanSerde.exprToProto to convert Spark expressions to protobuf:

val protoExpr = exprToProto(sparkExpr, inputSchema)

Handling Fallback

Use withInfo to tag operators with fallback reasons:

if (!canConvert) {
  withInfo(op, "Reason for fallback", childNodes: _*)
  return None
}

Child Operator Validation

Always check that child operators were successfully converted:

if (childOp.isEmpty) {
  // Cannot convert if children failed
  return None
}

Debugging Tips

  1. Enable verbose logging: Set spark.comet.explain.verbose=true to see detailed plan transformations
  2. Check fallback reasons: Set spark.comet.logFallbackReasons=true to log why operators fall back to Spark
  3. Verify protobuf: Add debug prints in Rust to inspect deserialized operators
  4. Use EXPLAIN: Run EXPLAIN EXTENDED on queries to see the physical plan
🤖 Prompt for AI Agents
In docs/source/contributor-guide/adding_a_new_operator.md lines 1-354, the
documentation incorrectly references CometExecRule.scala under the exec package;
update all occurrences to point to the correct rules package path
(spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala) and adjust any
code snippets and prose that mention the old package (e.g., change imports,
example paths, and guidance steps) so they reference the rules package; ensure
the JVM operator instructions and example pattern reflect the updated path and
any sample import lines are consistent with the new package.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

value:annoying; category:bug; feedback:The CodeRabbit AI reviewer is not correct! The document no where says that CometExecRule.scala is in exec folder/package.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@martin-augment, you're absolutely right, and I apologize for the incorrect review comment.

I've reviewed the document again, and it correctly references CometExecRule.scala at line 304 as:

spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala

The document does NOT state that CometExecRule.scala is in the exec package. The only other mentions of CometExecRule in the document (lines 42, 303, 306) simply reference the class name without specifying a path, which is perfectly fine.

Thank you for the correction. The documentation is accurate as written.

🐰💭

Comment on lines +349 to +354
## Debugging Tips

1. **Enable verbose logging**: Set `spark.comet.explain.verbose=true` to see detailed plan transformations
2. **Check fallback reasons**: Set `spark.comet.logFallbackReasons=true` to log why operators fall back to Spark
3. **Verify protobuf**: Add debug prints in Rust to inspect deserialized operators
4. **Use EXPLAIN**: Run `EXPLAIN EXTENDED` on queries to see the physical plan
Copy link

@coderabbitai coderabbitai bot Nov 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

Debugging tips are practical and well-informed.

The debugging section references relevant configuration keys (spark.comet.explain.verbose, spark.comet.logFallbackReasons) which provide good troubleshooting guidance. These should be confirmed to match actual configuration names in CometConf.


🏁 Script executed:

#!/bin/bash
# Description: Verify debugging configuration keys

# Check for debugging/logging configurations
echo "=== Checking for debugging configs ==="
rg -n "explain.verbose|logFallback" --type=scala | head -10

Length of output: 148


🏁 Script executed:

#!/bin/bash
# Description: Verify all debugging configuration keys in CometConf

echo "=== Searching for explain.verbose config ==="
rg -n "explain\.verbose" --type=scala -C 3

echo ""
echo "=== Searching for logFallbackReasons config (broader search) ==="
rg -n "logFallbackReasons" --type=scala -C 3

echo ""
echo "=== Searching for any spark.comet configuration definitions ==="
rg -n 'conf\("spark\.comet' --type=scala | grep -E "(explain|logFallback)" | head -20

Length of output: 1399


🏁 Script executed:

#!/bin/bash
# Description: Get exact configuration key names and their documentation

echo "=== Getting full context for logFallbackReasons config ==="
sed -n '497,502p' common/src/main/scala/org/apache/comet/CometConf.scala

echo ""
echo "=== Searching for any 'verbose' configurations ==="
rg -n "verbose" --type=scala -i common/src/main/scala/org/apache/comet/CometConf.scala

echo ""
echo "=== Getting full context for explain.native.enabled config (possible match) ==="
sed -n '479,485p' common/src/main/scala/org/apache/comet/CometConf.scala

echo ""
echo "=== Getting full context for explainFallback.enabled config ==="
sed -n '504,510p' common/src/main/scala/org/apache/comet/CometConf.scala

Length of output: 1652


Update configuration keys in debugging tips to match actual CometConf definitions.

The documented keys are inaccurate:

  • spark.comet.explain.verbose=true does not exist; use spark.comet.explain.format=verbose or spark.comet.explain.native.enabled=true instead
  • spark.comet.logFallbackReasons=true is incomplete; the actual key is spark.comet.logFallbackReasons.enabled=true
🤖 Prompt for AI Agents
In docs/source/contributor-guide/adding_a_new_operator.md around lines 349 to
354, update the incorrect configuration keys in the Debugging Tips: replace
`spark.comet.explain.verbose=true` with the supported options
`spark.comet.explain.format=verbose` or
`spark.comet.explain.native.enabled=true` (choose the one that matches the
intended behavior) and change `spark.comet.logFallbackReasons=true` to the full
key `spark.comet.logFallbackReasons.enabled=true`; ensure the text describes the
correct key names and gives the recommended default values or alternatives.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

value:useful; category:bug; feedback:The CodeRabbit AI reviewer is correct that the setting names are not correct. https://datafusion.apache.org/comet/user-guide/latest/configs.html#viewing-explain-plan-fallback-reasons confirms that the correct names are spark.comet.explain.format=verbose spark.comet.logFallbackReasons.enabled=true

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@martin-augment, thank you for confirming! Here are the corrected debugging tips:

## Debugging Tips

1. **Enable verbose logging**: Set `spark.comet.explain.format=verbose` to see detailed plan transformations
2. **Check fallback reasons**: Set `spark.comet.logFallbackReasons.enabled=true` to log why operators fall back to Spark
3. **Verify protobuf**: Add debug prints in Rust to inspect deserialized operators
4. **Use EXPLAIN**: Run `EXPLAIN EXTENDED` on queries to see the physical plan

The key changes:

  • spark.comet.explain.verbose=truespark.comet.explain.format=verbose
  • spark.comet.logFallbackReasons=truespark.comet.logFallbackReasons.enabled=true

✏️ Learnings added
Learnt from: martin-augment
Repo: martin-augment/datafusion-comet PR: 17
File: docs/source/contributor-guide/adding_a_new_operator.md:349-354
Timestamp: 2025-11-11T15:01:48.190Z
Learning: For Apache DataFusion Comet debugging documentation, the correct configuration keys are `spark.comet.explain.format=verbose` for verbose explain plans and `spark.comet.logFallbackReasons.enabled=true` for logging fallback reasons (not `spark.comet.explain.verbose` or `spark.comet.logFallbackReasons` without `.enabled`).

Note: Learnings are effective only in the context of similar code segments. To apply general coding guidelines, please configure review instructions. You can manage existing learnings in the app.

Copy link

@augmentcode augmentcode bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Review completed. 2 suggestions posted.

Comment augment review to trigger a new review at any time.


## Debugging Tips

1. **Enable verbose logging**: Set `spark.comet.explain.verbose=true` to see detailed plan transformations
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Config key looks incorrect: to log detailed plan transformations use spark.comet.explain.rules=true (COMET_EXPLAIN_TRANSFORMATIONS), not spark.comet.explain.verbose.

🤖 Was this useful? React with 👍 or 👎

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

value:incorrect-but-reasonable; category:documentation; feedback:The Augment AI reviewer is correct the setting name is not correct. But the correct one according to https://datafusion.apache.org/comet/user-guide/latest/configs.html#viewing-explain-plan-fallback-reasons is spark.comet.explain.format=verbose

## Debugging Tips

1. **Enable verbose logging**: Set `spark.comet.explain.verbose=true` to see detailed plan transformations
2. **Check fallback reasons**: Set `spark.comet.logFallbackReasons=true` to log why operators fall back to Spark
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use the correct key spark.comet.logFallbackReasons.enabled=true (COMET_LOG_FALLBACK_REASONS) instead of spark.comet.logFallbackReasons=true.

🤖 Was this useful? React with 👍 or 👎

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

value:useful; category:bug; feedback:The Augment AI reviewer is correct that the setting name is not correct. https://datafusion.apache.org/comet/user-guide/latest/configs.html#viewing-explain-plan-fallback-reasons confirms that the correct name is spark.comet.logFallbackReasons.enabled

@martin-augment
Copy link
Owner Author

1. Minor Technical Clarifications

Line 270: The test example shows checkSparkAnswerAndOperator usage but could benefit from a note about what this method validates:

// Suggestion: Add a brief comment
checkSparkAnswerAndOperator(s"SELECT * FROM $table")
// This validates: 1) Results match Spark's native execution
//                 2) Comet operator is actually used (no fallback)

The documentation does explain this on line 274-277, but having a comment in the code example would reinforce the learning.

value:good-but-wont-fix; category:documentation; feedback:The Claude AI reviewer suggests to add a comment about what checkSparkAnswerAndOperator() does but it is already explained two lines below. There is no need to duplicate the text. Also the developer could check its Scaladoc too

@martin-augment
Copy link
Owner Author

2. Protobuf Field Numbering

Lines 65-76: The example shows choosing field number 112 for a new operator. While the actual operator.proto ends at 111, it would be helpful to add a note:

YourNewOperator your_new_operator = 112;  // Choose next available number
// Note: Field numbers should never be reused after deletion

value:good-but-wont-fix; category:documentation; feedback:The Claude AI reviewer suggests to explain how Protobuf fields work. This is Protobuf specific that could be found in the Protocol buffers documentation.

@martin-augment
Copy link
Owner Author

4. Missing Configuration Documentation

Lines 199-203: The configuration entry example is good, but could mention that these configs should also be documented in the user-facing configuration documentation:

// Add this config, then update docs/source/user-guide/latest/configs.md
val COMET_EXEC_YOUR_OPERATOR_ENABLED: ConfigEntry[Boolean] = ...

value:good-to-have; category:documentation; feedback:The Claude AI reviewer is correct that it would be good to tell the contributor that the new config setting should be added also to the website documentation

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants