-
Notifications
You must be signed in to change notification settings - Fork 0
2758: docs: Add new section to contributor guide, explaining how to add a new operator #17
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
WalkthroughAdds comprehensive documentation for implementing new Spark physical operators in Apache DataFusion Comet, covering native (Rust) and JVM (Scala) implementations, protobuf integration, serde creation, registration procedures, and testing strategies. Updates the contributor guide index accordingly. Changes
✨ Finishing touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
PR Review: Adding a New Operator GuideI've completed a comprehensive review of this documentation PR. Overall, this is an excellent addition to the contributor guide that will be invaluable for developers adding new operators to DataFusion Comet. The documentation is well-structured, technically accurate, and follows best practices. ✅ Strengths
📝 Suggestions for Improvement1. Minor Technical ClarificationsLine 270: The test example shows // Suggestion: Add a brief comment
checkSparkAnswerAndOperator(s"SELECT * FROM $table")
// This validates: 1) Results match Spark's native execution
// 2) Comet operator is actually used (no fallback)The documentation does explain this on line 274-277, but having a comment in the code example would reinforce the learning. 2. Protobuf Field NumberingLines 65-76: The example shows choosing field number 112 for a new operator. While the actual 3. Rust Planner ExampleLines 208-226: The planner example shows the pattern but could mention where in the file to add this code. Consider adding: // In the create_plan or similar method:
// (Located around line 300-500 depending on the match statement)
match op.op_struct.as_ref() {4. Missing Configuration DocumentationLines 199-203: The configuration entry example is good, but could mention that these configs should also be documented in the user-facing configuration documentation: // Add this config, then update docs/source/user-guide/latest/configs.md
val COMET_EXEC_YOUR_OPERATOR_ENABLED: ConfigEntry[Boolean] = ...5. Test Suite LocationLines 255-278: The testing section mentions // For operator tests, add to:
// spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala🔍 Verification NotesI've verified the following against the actual codebase: ✅ 🎯 Best Practices Observed
🔐 Security ConsiderationsNo security concerns identified. The documentation:
📊 Test CoverageThe documentation adequately covers testing requirements:
📚 Documentation Quality
✨ Final RecommendationApprove with minor suggestions. This documentation is production-ready and will significantly help contributors. The suggestions above are optional enhancements that would make a great document even better, but they're not blocking issues. The guide successfully complements the existing "Adding a New Expression" documentation and provides crucial knowledge for extending Comet's operator support. Estimated Impact: This will likely reduce the learning curve for new contributors by several hours and decrease the number of questions about operator implementation patterns. Review generated with assistance from Claude Code |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
docs/source/contributor-guide/adding_a_new_operator.md(1 hunks)docs/source/contributor-guide/index.md(1 hunks)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: claude-review
🔇 Additional comments (10)
docs/source/contributor-guide/index.md (1)
33-33: Index entry placement is logical and well-positioned.The new entry is correctly placed between Benchmarking Guide and Adding a New Expression, which follows a sensible progression from framework understanding to concrete extension patterns.
docs/source/contributor-guide/adding_a_new_operator.md (9)
20-56: Overview section clearly explains operator types.The categorization of Comet operators (native, JVM, and sinks) is well-structured and helps guide readers to the correct implementation path.
57-102: Step 1-2 protobuf and Serde pattern guidance is clear.The examples for defining protobuf messages and implementing CometOperatorSerde are realistic, but could benefit from verification that the
CometFilterandCometProjectpatterns match actual implementations in the codebase. The inheritance pattern and method signatures should be confirmed against the actual trait definition.
104-156: Code examples are well-commented but need verification for accuracy.The Filter and Project implementation examples follow sensible patterns, but I recommend verifying:
- That
exprToProtohelper exists and has the documented signature- That the
withInfofallback tagging pattern is accurate- That the builder pattern matches actual protobuf code generation
#!/bin/bash # Description: Verify specific code patterns in QueryPlanSerde # Check for exprToProto method echo "=== Checking exprToProto helper ===" rg -A5 "def exprToProto" --type=scala | head -20 # Check for withInfo pattern echo "=== Checking withInfo fallback helper ===" rg -n "def withInfo|withInfo\(" --type=scala | grep -i query | head -10
158-183: getSupportLevel pattern documentation is clear.The three support levels (Compatible, Incompatible, Unsupported) are well-explained with good conditional logic examples.
184-208: Registration and configuration steps are practical.Lines 184-208 provide clear guidance for the opSerdeMap registration and CometConf entry pattern. These should match actual project patterns.
Verify that the configuration key pattern
spark.comet.exec.yourOperator.enabledmatches conventions used for existing operators.
317-347: Common patterns section provides useful helper reference.The section on expression conversion, fallback handling, and child validation patterns is well-organized and references actual QueryPlanSerde utilities.
257-281: Test guidance forcheckSparkAnswerAndOperatorhelper is accurate.Verification confirms the function exists in
spark/src/test/scala/org/apache/spark/sql/CometTestBase.scalaas a protected member with multiple overloaded signatures. It is extensively used throughout the test suite to verify both correctness and operator execution. The documentation guidance is correct.
210-256: All documented file references and error handling patterns are verified and accurate.The documentation correctly references:
expand.rsexists atnative/core/src/execution/operators/expand.rsscan.rsexists atnative/core/src/execution/operators/scan.rsExecutionErroris defined as a public enum innative/core/src/execution/operators/mod.rsThe code examples showing
ExecutionErrorreturn types andExecutionPlantrait usage match actual project patterns used throughout the codebase.
282-300: Clarify documentation update guidance: referenceoperators.mdnotcompatibility.md.The path
docs/source/user-guide/latest/compatibility.mdexists and is correct, but it documents known incompatibilities and limitations—not the supported operators list. The actual list of supported Spark operators is indocs/source/user-guide/latest/operators.md, which contains a compatibility table. Step 7 should direct contributors to update that operators table instead, or clarify specifically what needs to be added to compatibility.md (e.g., documenting any known limitations of the new operator).There is no
OPERATORS.mdfile;operators.mdis a reference table, not a location for operator examples.
| <!--- | ||
| Licensed to the Apache Software Foundation (ASF) under one | ||
| or more contributor license agreements. See the NOTICE file | ||
| distributed with this work for additional information | ||
| regarding copyright ownership. The ASF licenses this file | ||
| to you under the Apache License, Version 2.0 (the | ||
| "License"); you may not use this file except in compliance | ||
| with the License. You may obtain a copy of the License at | ||
|
|
||
| http://www.apache.org/licenses/LICENSE-2.0 | ||
|
|
||
| Unless required by applicable law or agreed to in writing, | ||
| software distributed under the License is distributed on an | ||
| "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| KIND, either express or implied. See the License for the | ||
| specific language governing permissions and limitations | ||
| under the License. | ||
| --> | ||
|
|
||
| # Adding a New Operator | ||
|
|
||
| This guide explains how to add support for a new Spark physical operator in Apache DataFusion Comet. | ||
|
|
||
| ## Overview | ||
|
|
||
| `CometExecRule` is responsible for replacing Spark operators with Comet operators. There are different approaches to implementing Comet operators depending on where they execute and how they integrate with the native execution engine. | ||
|
|
||
| ### Types of Comet Operators | ||
|
|
||
| #### 1. Comet Native Operators | ||
|
|
||
| These operators run entirely in native Rust code and are the primary way to accelerate Spark workloads. For native operators, `CometExecRule` delegates to `QueryPlanSerde.operator2Proto` to: | ||
|
|
||
| - Check if the operator is enabled or disabled via configuration | ||
| - Validate if the operator can be supported | ||
| - Tag the operator with fallback reasons if conversion fails | ||
| - Serialize the operator to protobuf for native execution | ||
|
|
||
| Examples: `ProjectExec`, `FilterExec`, `SortExec`, `HashAggregateExec`, `SortMergeJoinExec` | ||
|
|
||
| #### 2. Comet JVM Operators | ||
|
|
||
| These operators run in the JVM but are part of the Comet execution path. For JVM operators, all checks happen in `CometExecRule` rather than `QueryPlanSerde`, because they don't need protobuf serialization. | ||
|
|
||
| Examples: `CometBroadcastExchangeExec`, `CometShuffleExchangeExec` | ||
|
|
||
| #### 3. Comet Sinks | ||
|
|
||
| Some operators serve as "sinks" or entry points for native execution, meaning they can be leaf nodes that feed data into native execution blocks. | ||
|
|
||
| Examples: `CometScanExec`, `CometBatchScanExec`, `UnionExec`, `CometSparkToColumnarExec` | ||
|
|
||
| ## Implementing a Native Operator | ||
|
|
||
| This section focuses on adding a native operator, which is the most common and complex case. | ||
|
|
||
| ### Step 1: Define the Protobuf Message | ||
|
|
||
| First, add the operator definition to `native/proto/src/proto/operator.proto`. | ||
|
|
||
| #### Add to the Operator Message | ||
|
|
||
| Add your new operator to the `oneof op_struct` in the main `Operator` message: | ||
|
|
||
| ```proto | ||
| message Operator { | ||
| repeated Operator children = 1; | ||
| uint32 plan_id = 2; | ||
|
|
||
| oneof op_struct { | ||
| Scan scan = 100; | ||
| Projection projection = 101; | ||
| Filter filter = 102; | ||
| // ... existing operators ... | ||
| YourNewOperator your_new_operator = 112; // Choose next available number | ||
| } | ||
| } | ||
| ``` | ||
|
|
||
| #### Define the Operator Message | ||
|
|
||
| Create a message for your operator with the necessary fields: | ||
|
|
||
| ```proto | ||
| message YourNewOperator { | ||
| // Fields specific to your operator | ||
| repeated spark.spark_expression.Expr expressions = 1; | ||
| // Add other configuration fields as needed | ||
| } | ||
| ``` | ||
|
|
||
| For reference, see existing operators like `Filter` (simple), `HashAggregate` (complex), or `Sort` (with ordering). | ||
|
|
||
| ### Step 2: Create a CometOperatorSerde Implementation | ||
|
|
||
| Create a new Scala file in `spark/src/main/scala/org/apache/comet/serde/operator/` (e.g., `CometYourOperator.scala`) that extends `CometOperatorSerde[T]` where `T` is the Spark operator type. | ||
|
|
||
| The `CometOperatorSerde` trait provides three key methods: | ||
|
|
||
| - `enabledConfig: Option[ConfigEntry[Boolean]]` - Configuration to enable/disable this operator | ||
| - `getSupportLevel(operator: T): SupportLevel` - Determines if the operator is supported | ||
| - `convert(op: T, builder: Operator.Builder, childOp: Operator*): Option[Operator]` - Converts to protobuf | ||
|
|
||
| #### Simple Example (Filter) | ||
|
|
||
| ```scala | ||
| object CometFilter extends CometOperatorSerde[FilterExec] { | ||
|
|
||
| override def enabledConfig: Option[ConfigEntry[Boolean]] = | ||
| Some(CometConf.COMET_EXEC_FILTER_ENABLED) | ||
|
|
||
| override def convert( | ||
| op: FilterExec, | ||
| builder: Operator.Builder, | ||
| childOp: OperatorOuterClass.Operator*): Option[OperatorOuterClass.Operator] = { | ||
| val cond = exprToProto(op.condition, op.child.output) | ||
|
|
||
| if (cond.isDefined && childOp.nonEmpty) { | ||
| val filterBuilder = OperatorOuterClass.Filter | ||
| .newBuilder() | ||
| .setPredicate(cond.get) | ||
| Some(builder.setFilter(filterBuilder).build()) | ||
| } else { | ||
| withInfo(op, op.condition, op.child) | ||
| None | ||
| } | ||
| } | ||
| } | ||
| ``` | ||
|
|
||
| #### More Complex Example (Project) | ||
|
|
||
| ```scala | ||
| object CometProject extends CometOperatorSerde[ProjectExec] { | ||
|
|
||
| override def enabledConfig: Option[ConfigEntry[Boolean]] = | ||
| Some(CometConf.COMET_EXEC_PROJECT_ENABLED) | ||
|
|
||
| override def convert( | ||
| op: ProjectExec, | ||
| builder: Operator.Builder, | ||
| childOp: Operator*): Option[OperatorOuterClass.Operator] = { | ||
| val exprs = op.projectList.map(exprToProto(_, op.child.output)) | ||
|
|
||
| if (exprs.forall(_.isDefined) && childOp.nonEmpty) { | ||
| val projectBuilder = OperatorOuterClass.Projection | ||
| .newBuilder() | ||
| .addAllProjectList(exprs.map(_.get).asJava) | ||
| Some(builder.setProjection(projectBuilder).build()) | ||
| } else { | ||
| withInfo(op, op.projectList: _*) | ||
| None | ||
| } | ||
| } | ||
| } | ||
| ``` | ||
|
|
||
| #### Using getSupportLevel | ||
|
|
||
| Override `getSupportLevel` to control operator support based on specific conditions: | ||
|
|
||
| ```scala | ||
| override def getSupportLevel(operator: YourOperatorExec): SupportLevel = { | ||
| // Check for unsupported features | ||
| if (operator.hasUnsupportedFeature) { | ||
| return Unsupported(Some("Feature X is not supported")) | ||
| } | ||
|
|
||
| // Check for incompatible behavior | ||
| if (operator.hasKnownDifferences) { | ||
| return Incompatible(Some("Known differences in edge case Y")) | ||
| } | ||
|
|
||
| Compatible() | ||
| } | ||
| ``` | ||
|
|
||
| Support levels: | ||
|
|
||
| - **`Compatible()`** - Fully compatible with Spark (default) | ||
| - **`Incompatible()`** - Supported but may differ; requires explicit opt-in | ||
| - **`Unsupported()`** - Not supported under current conditions | ||
|
|
||
| ### Step 3: Register the Operator | ||
|
|
||
| Add your operator to the `opSerdeMap` in `QueryPlanSerde.scala`: | ||
|
|
||
| ```scala | ||
| private val opSerdeMap: Map[Class[_ <: SparkPlan], CometOperatorSerde[_]] = | ||
| Map( | ||
| classOf[ProjectExec] -> CometProject, | ||
| classOf[FilterExec] -> CometFilter, | ||
| // ... existing operators ... | ||
| classOf[YourOperatorExec] -> CometYourOperator, | ||
| ) | ||
| ``` | ||
|
|
||
| ### Step 4: Add Configuration Entry | ||
|
|
||
| Add a configuration entry in `common/src/main/scala/org/apache/comet/CometConf.scala`: | ||
|
|
||
| ```scala | ||
| val COMET_EXEC_YOUR_OPERATOR_ENABLED: ConfigEntry[Boolean] = | ||
| conf("spark.comet.exec.yourOperator.enabled") | ||
| .doc("Whether to enable your operator in Comet") | ||
| .booleanConf | ||
| .createWithDefault(true) | ||
| ``` | ||
|
|
||
| ### Step 5: Implement the Native Operator in Rust | ||
|
|
||
| #### Update the Planner | ||
|
|
||
| In `native/core/src/execution/planner.rs`, add a match case in the operator deserialization logic to handle your new protobuf message: | ||
|
|
||
| ```rust | ||
| use datafusion_comet_proto::spark_operator::operator::OpStruct; | ||
|
|
||
| // In the create_plan or similar method: | ||
| match op.op_struct.as_ref() { | ||
| Some(OpStruct::Scan(scan)) => { | ||
| // ... existing cases ... | ||
| } | ||
| Some(OpStruct::YourNewOperator(your_op)) => { | ||
| create_your_operator_exec(your_op, children, session_ctx) | ||
| } | ||
| // ... other cases ... | ||
| } | ||
| ``` | ||
|
|
||
| #### Implement the Operator | ||
|
|
||
| Create the operator implementation, either in an existing file or a new file in `native/core/src/execution/operators/`: | ||
|
|
||
| ```rust | ||
| use datafusion::physical_plan::{ExecutionPlan, ...}; | ||
| use datafusion_comet_proto::spark_operator::YourNewOperator; | ||
|
|
||
| pub fn create_your_operator_exec( | ||
| op: &YourNewOperator, | ||
| children: Vec<Arc<dyn ExecutionPlan>>, | ||
| session_ctx: &SessionContext, | ||
| ) -> Result<Arc<dyn ExecutionPlan>, ExecutionError> { | ||
| // Deserialize expressions and configuration | ||
| // Create and return the execution plan | ||
|
|
||
| // Option 1: Use existing DataFusion operator | ||
| // Ok(Arc::new(SomeDataFusionExec::try_new(...)?)) | ||
|
|
||
| // Option 2: Implement custom operator (see ExpandExec for example) | ||
| // Ok(Arc::new(YourCustomExec::new(...))) | ||
| } | ||
| ``` | ||
|
|
||
| For custom operators, you'll need to implement the `ExecutionPlan` trait. See `native/core/src/execution/operators/expand.rs` or `scan.rs` for examples. | ||
|
|
||
| ### Step 6: Add Tests | ||
|
|
||
| #### Scala Integration Tests | ||
|
|
||
| Add tests in `spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala` or a related test suite: | ||
|
|
||
| ```scala | ||
| test("your operator") { | ||
| withTable("test_table") { | ||
| sql("CREATE TABLE test_table(col1 INT, col2 STRING) USING parquet") | ||
| sql("INSERT INTO test_table VALUES (1, 'a'), (2, 'b')") | ||
|
|
||
| // Test query that uses your operator | ||
| checkSparkAnswerAndOperator( | ||
| "SELECT * FROM test_table WHERE col1 > 1" | ||
| ) | ||
| } | ||
| } | ||
| ``` | ||
|
|
||
| The `checkSparkAnswerAndOperator` helper verifies: | ||
|
|
||
| 1. Results match Spark's native execution | ||
| 2. Your operator is actually being used (not falling back) | ||
|
|
||
| #### Rust Unit Tests | ||
|
|
||
| Add unit tests in your Rust implementation file: | ||
|
|
||
| ```rust | ||
| #[cfg(test)] | ||
| mod tests { | ||
| use super::*; | ||
|
|
||
| #[test] | ||
| fn test_your_operator() { | ||
| // Test operator creation and execution | ||
| } | ||
| } | ||
| ``` | ||
|
|
||
| ### Step 7: Update Documentation | ||
|
|
||
| Add your operator to the supported operators list in `docs/source/user-guide/latest/compatibility.md` or similar documentation. | ||
|
|
||
| ## Implementing a JVM Operator | ||
|
|
||
| For operators that run in the JVM: | ||
|
|
||
| 1. Create a new operator class extending appropriate Spark base classes in `spark/src/main/scala/org/apache/comet/` | ||
| 2. Add matching logic in `CometExecRule.scala` to transform the Spark operator | ||
| 3. No protobuf or Rust implementation needed | ||
|
|
||
| Example pattern from `CometExecRule.scala`: | ||
|
|
||
| ```scala | ||
| case s: ShuffleExchangeExec if nativeShuffleSupported(s) => | ||
| CometShuffleExchangeExec(s, shuffleType = CometNativeShuffle) | ||
| ``` | ||
|
|
||
| ## Common Patterns and Helpers | ||
|
|
||
| ### Expression Conversion | ||
|
|
||
| Use `QueryPlanSerde.exprToProto` to convert Spark expressions to protobuf: | ||
|
|
||
| ```scala | ||
| val protoExpr = exprToProto(sparkExpr, inputSchema) | ||
| ``` | ||
|
|
||
| ### Handling Fallback | ||
|
|
||
| Use `withInfo` to tag operators with fallback reasons: | ||
|
|
||
| ```scala | ||
| if (!canConvert) { | ||
| withInfo(op, "Reason for fallback", childNodes: _*) | ||
| return None | ||
| } | ||
| ``` | ||
|
|
||
| ### Child Operator Validation | ||
|
|
||
| Always check that child operators were successfully converted: | ||
|
|
||
| ```scala | ||
| if (childOp.isEmpty) { | ||
| // Cannot convert if children failed | ||
| return None | ||
| } | ||
| ``` | ||
|
|
||
| ## Debugging Tips | ||
|
|
||
| 1. **Enable verbose logging**: Set `spark.comet.explain.verbose=true` to see detailed plan transformations | ||
| 2. **Check fallback reasons**: Set `spark.comet.logFallbackReasons=true` to log why operators fall back to Spark | ||
| 3. **Verify protobuf**: Add debug prints in Rust to inspect deserialized operators | ||
| 4. **Use EXPLAIN**: Run `EXPLAIN EXTENDED` on queries to see the physical plan |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
Comprehensive and well-structured operator implementation guide.
The documentation provides clear step-by-step guidance for contributors implementing new operators. The structure progresses logically from protobuf definition through Rust implementation and testing.
However, I recommend verifying that specific code utilities and file paths referenced in the guide actually exist in the current codebase:
🏁 Script executed:
#!/bin/bash
# Description: Verify key file paths and utilities referenced in the operator guide
# Check for protobuf operator schema file
echo "=== Checking protobuf operator schema ==="
fd -type f -name "operator.proto" | grep -E "native|proto/src"
# Check for CometOperatorSerde pattern
echo "=== Checking for CometOperatorSerde implementations ==="
fd -type f -name "Comet*.scala" | grep -E "serde/operator|exec" | head -10
# Check for test utilities like checkSparkAnswerAndOperator
echo "=== Checking for test utilities ==="
rg -n "checkSparkAnswerAndOperator" --type=scala
# Check for CometConf configuration pattern
echo "=== Checking for CometConf ==="
fd -type f -name "CometConf.scala"
# Check for planner.rs in Rust code
echo "=== Checking for Rust planner ==="
fd -type f -name "planner.rs" | grep -E "native|core/src"
# Check for operator examples (Filter, Project)
echo "=== Checking for example operators in serde ==="
fd -type f -name "*.scala" | xargs rg -l "CometFilter|CometProject" | head -5Length of output: 74961
[/run_scripts]
🏁 Script executed:
#!/bin/bash
# Corrected verification script for operator guide file paths
echo "=== Checking protobuf operator schema ==="
find . -name "operator.proto" -type f 2>/dev/null | head -5
echo "=== Checking for CometOperatorSerde ==="
find . -name "CometOperatorSerde*" -type f 2>/dev/null | head -5
echo "=== Checking for Scala serde operator implementations ==="
find . -path "*/serde/operator/*.scala" -type f 2>/dev/null | head -10
echo "=== Checking for CometConf.scala ==="
find . -name "CometConf.scala" -type f 2>/dev/null
echo "=== Checking for Rust planner ==="
find . -name "planner.rs" -type f 2>/dev/null | head -5
echo "=== Checking for CometFilter and CometProject Scala files ==="
find . -path "*/serde/operator/*" -name "*Filter*.scala" -o -path "*/serde/operator/*" -name "*Project*.scala" 2>/dev/null | head -10
echo "=== Verify test suite exists ==="
find . -name "CometExecSuite.scala" -type f 2>/dev/nullLength of output: 1480
[/run_scripts]
🏁 Script executed:
#!/bin/bash
# Verify code patterns match documentation
echo "=== CometOperatorSerde trait definition ==="
head -80 ./spark/src/main/scala/org/apache/comet/serde/CometOperatorSerde.scala
echo -e "\n=== CometFilter implementation (first 50 lines) ==="
head -50 ./spark/src/main/scala/org/apache/comet/serde/operator/CometFilter.scala
echo -e "\n=== CometProject implementation (first 50 lines) ==="
head -50 ./spark/src/main/scala/org/apache/comet/serde/operator/CometProject.scala
echo -e "\n=== Check QueryPlanSerde for opSerdeMap ==="
grep -n "opSerdeMap" ./spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala | head -5
echo -e "\n=== Sample QueryPlanSerde registration ==="
grep -A 30 "private val opSerdeMap" ./spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala | head -40
echo -e "\n=== CometExecRule for JVM operators ==="
grep -n "CometShuffleExchangeExec\|CometBroadcastExchangeExec" ./spark/src/main/scala/org/apache/comet/exec/CometExecRule.scala | head -5Length of output: 8063
[/run_scripts]
🏁 Script executed:
#!/bin/bash
# Find CometExecRule and verify JVM operator patterns
echo "=== Search for CometExecRule ==="
find . -name "*CometExecRule*" -type f 2>/dev/null
echo -e "\n=== Search for ShuffleExchangeExec handling ==="
find . -name "*.scala" -type f -exec grep -l "ShuffleExchangeExec\|CometShuffleExchangeExec" {} \; 2>/dev/null | head -5
echo -e "\n=== Search for CometBroadcastExchangeExec ==="
find . -name "*.scala" -type f -exec grep -l "CometBroadcastExchangeExec" {} \; 2>/dev/null | head -5
echo -e "\n=== Check for rule files in exec directory ==="
find ./spark/src/main/scala/org/apache/comet -name "*Rule*.scala" -type f 2>/dev/null
echo -e "\n=== Verify expand.rs exists for custom Rust operator example ==="
find . -name "expand.rs" -type f 2>/dev/null
echo -e "\n=== Verify scan.rs exists ==="
find . -name "scan.rs" -type f 2>/dev/nullLength of output: 1306
[/run_scripts]
🏁 Script executed:
#!/bin/bash
# Verify JVM operator patterns and check for incorrect path references
echo "=== JVM operator pattern in CometExecRule ==="
grep -n "ShuffleExchangeExec\|CometShuffleExchangeExec\|CometNativeShuffle" ./spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala | head -10
echo -e "\n=== Extract sample JVM operator transformation ==="
grep -A 5 "case.*ShuffleExchangeExec" ./spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala | head -20
echo -e "\n=== Verify CometBroadcastExchangeExec location ==="
ls -la ./spark/src/main/scala/org/apache/spark/sql/comet/CometBroadcastExchangeExec.scala
echo -e "\n=== Verify native/proto/src/proto structure matches doc ==="
ls -la ./native/proto/src/proto/ | grep operator
echo -e "\n=== Verify native/core/src/execution structure ==="
ls -la ./native/core/src/execution/planner.rs
ls -la ./native/core/src/execution/operators/Length of output: 2845
Update file path for CometExecRule.
The documentation references an incorrect directory for CometExecRule.scala. It should be located in the rules package, not the exec package.
Adding a New Operator
This guide explains how to add support for a new Spark physical operator in Apache DataFusion Comet.
Overview
CometExecRule is responsible for replacing Spark operators with Comet operators. There are different approaches to implementing Comet operators depending on where they execute and how they integrate with the native execution engine.
Types of Comet Operators
1. Comet Native Operators
These operators run entirely in native Rust code and are the primary way to accelerate Spark workloads. For native operators, CometExecRule delegates to QueryPlanSerde.operator2Proto to:
- Check if the operator is enabled or disabled via configuration
- Validate if the operator can be supported
- Tag the operator with fallback reasons if conversion fails
- Serialize the operator to protobuf for native execution
Examples: ProjectExec, FilterExec, SortExec, HashAggregateExec, SortMergeJoinExec
2. Comet JVM Operators
These operators run in the JVM but are part of the Comet execution path. For JVM operators, all checks happen in CometExecRule rather than QueryPlanSerde, because they don't need protobuf serialization.
Examples: CometBroadcastExchangeExec, CometShuffleExchangeExec
3. Comet Sinks
Some operators serve as "sinks" or entry points for native execution, meaning they can be leaf nodes that feed data into native execution blocks.
Examples: CometScanExec, CometBatchScanExec, UnionExec, CometSparkToColumnarExec
Implementing a Native Operator
This section focuses on adding a native operator, which is the most common and complex case.
Step 1: Define the Protobuf Message
First, add the operator definition to native/proto/src/proto/operator.proto.
Add to the Operator Message
Add your new operator to the oneof op_struct in the main Operator message:
message Operator {
repeated Operator children = 1;
uint32 plan_id = 2;
oneof op_struct {
Scan scan = 100;
Projection projection = 101;
Filter filter = 102;
// ... existing operators ...
YourNewOperator your_new_operator = 112; // Choose next available number
}
}Define the Operator Message
Create a message for your operator with the necessary fields:
message YourNewOperator {
// Fields specific to your operator
repeated spark.spark_expression.Expr expressions = 1;
// Add other configuration fields as needed
}For reference, see existing operators like Filter (simple), HashAggregate (complex), or Sort (with ordering).
Step 2: Create a CometOperatorSerde Implementation
Create a new Scala file in spark/src/main/scala/org/apache/comet/serde/operator/ (e.g., CometYourOperator.scala) that extends CometOperatorSerde[T] where T is the Spark operator type.
The CometOperatorSerde trait provides three key methods:
enabledConfig: Option[ConfigEntry[Boolean]]- Configuration to enable/disable this operatorgetSupportLevel(operator: T): SupportLevel- Determines if the operator is supportedconvert(op: T, builder: Operator.Builder, childOp: Operator*): Option[Operator]- Converts to protobuf
Simple Example (Filter)
object CometFilter extends CometOperatorSerde[FilterExec] {
override def enabledConfig: Option[ConfigEntry[Boolean]] =
Some(CometConf.COMET_EXEC_FILTER_ENABLED)
override def convert(
op: FilterExec,
builder: Operator.Builder,
childOp: OperatorOuterClass.Operator*): Option[OperatorOuterClass.Operator] = {
val cond = exprToProto(op.condition, op.child.output)
if (cond.isDefined && childOp.nonEmpty) {
val filterBuilder = OperatorOuterClass.Filter
.newBuilder()
.setPredicate(cond.get)
Some(builder.setFilter(filterBuilder).build())
} else {
withInfo(op, op.condition, op.child)
None
}
}
}More Complex Example (Project)
object CometProject extends CometOperatorSerde[ProjectExec] {
override def enabledConfig: Option[ConfigEntry[Boolean]] =
Some(CometConf.COMET_EXEC_PROJECT_ENABLED)
override def convert(
op: ProjectExec,
builder: Operator.Builder,
childOp: Operator*): Option[OperatorOuterClass.Operator] = {
val exprs = op.projectList.map(exprToProto(_, op.child.output))
if (exprs.forall(_.isDefined) && childOp.nonEmpty) {
val projectBuilder = OperatorOuterClass.Projection
.newBuilder()
.addAllProjectList(exprs.map(_.get).asJava)
Some(builder.setProjection(projectBuilder).build())
} else {
withInfo(op, op.projectList: _*)
None
}
}
}Using getSupportLevel
Override getSupportLevel to control operator support based on specific conditions:
override def getSupportLevel(operator: YourOperatorExec): SupportLevel = {
// Check for unsupported features
if (operator.hasUnsupportedFeature) {
return Unsupported(Some("Feature X is not supported"))
}
// Check for incompatible behavior
if (operator.hasKnownDifferences) {
return Incompatible(Some("Known differences in edge case Y"))
}
Compatible()
}Support levels:
Compatible()- Fully compatible with Spark (default)Incompatible()- Supported but may differ; requires explicit opt-inUnsupported()- Not supported under current conditions
Step 3: Register the Operator
Add your operator to the opSerdeMap in QueryPlanSerde.scala:
private val opSerdeMap: Map[Class[_ <: SparkPlan], CometOperatorSerde[_]] =
Map(
classOf[ProjectExec] -> CometProject,
classOf[FilterExec] -> CometFilter,
// ... existing operators ...
classOf[YourOperatorExec] -> CometYourOperator,
)Step 4: Add Configuration Entry
Add a configuration entry in common/src/main/scala/org/apache/comet/CometConf.scala:
val COMET_EXEC_YOUR_OPERATOR_ENABLED: ConfigEntry[Boolean] =
conf("spark.comet.exec.yourOperator.enabled")
.doc("Whether to enable your operator in Comet")
.booleanConf
.createWithDefault(true)Step 5: Implement the Native Operator in Rust
Update the Planner
In native/core/src/execution/planner.rs, add a match case in the operator deserialization logic to handle your new protobuf message:
use datafusion_comet_proto::spark_operator::operator::OpStruct;
// In the create_plan or similar method:
match op.op_struct.as_ref() {
Some(OpStruct::Scan(scan)) => {
// ... existing cases ...
}
Some(OpStruct::YourNewOperator(your_op)) => {
create_your_operator_exec(your_op, children, session_ctx)
}
// ... other cases ...
}Implement the Operator
Create the operator implementation, either in an existing file or a new file in native/core/src/execution/operators/:
use datafusion::physical_plan::{ExecutionPlan, ...};
use datafusion_comet_proto::spark_operator::YourNewOperator;
pub fn create_your_operator_exec(
op: &YourNewOperator,
children: Vec<Arc<dyn ExecutionPlan>>,
session_ctx: &SessionContext,
) -> Result<Arc<dyn ExecutionPlan>, ExecutionError> {
// Deserialize expressions and configuration
// Create and return the execution plan
// Option 1: Use existing DataFusion operator
// Ok(Arc::new(SomeDataFusionExec::try_new(...)?))
// Option 2: Implement custom operator (see ExpandExec for example)
// Ok(Arc::new(YourCustomExec::new(...)))
}For custom operators, you'll need to implement the ExecutionPlan trait. See native/core/src/execution/operators/expand.rs or scan.rs for examples.
Step 6: Add Tests
Scala Integration Tests
Add tests in spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala or a related test suite:
test("your operator") {
withTable("test_table") {
sql("CREATE TABLE test_table(col1 INT, col2 STRING) USING parquet")
sql("INSERT INTO test_table VALUES (1, 'a'), (2, 'b')")
// Test query that uses your operator
checkSparkAnswerAndOperator(
"SELECT * FROM test_table WHERE col1 > 1"
)
}
}The checkSparkAnswerAndOperator helper verifies:
- Results match Spark's native execution
- Your operator is actually being used (not falling back)
Rust Unit Tests
Add unit tests in your Rust implementation file:
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_your_operator() {
// Test operator creation and execution
}
}Step 7: Update Documentation
Add your operator to the supported operators list in docs/source/user-guide/latest/compatibility.md or similar documentation.
Implementing a JVM Operator
For operators that run in the JVM:
- Create a new operator class extending appropriate Spark base classes in
spark/src/main/scala/org/apache/spark/sql/comet/ - Add matching logic in
spark/src/main/scala/org/apache/comet/rules/CometExecRule.scalato transform the Spark operator - No protobuf or Rust implementation needed
Example pattern from CometExecRule.scala:
case s: ShuffleExchangeExec if nativeShuffleSupported(s) =>
CometShuffleExchangeExec(s, shuffleType = CometNativeShuffle)Common Patterns and Helpers
Expression Conversion
Use QueryPlanSerde.exprToProto to convert Spark expressions to protobuf:
val protoExpr = exprToProto(sparkExpr, inputSchema)Handling Fallback
Use withInfo to tag operators with fallback reasons:
if (!canConvert) {
withInfo(op, "Reason for fallback", childNodes: _*)
return None
}Child Operator Validation
Always check that child operators were successfully converted:
if (childOp.isEmpty) {
// Cannot convert if children failed
return None
}Debugging Tips
- Enable verbose logging: Set
spark.comet.explain.verbose=trueto see detailed plan transformations - Check fallback reasons: Set
spark.comet.logFallbackReasons=trueto log why operators fall back to Spark - Verify protobuf: Add debug prints in Rust to inspect deserialized operators
- Use EXPLAIN: Run
EXPLAIN EXTENDEDon queries to see the physical plan
🤖 Prompt for AI Agents
In docs/source/contributor-guide/adding_a_new_operator.md lines 1-354, the
documentation incorrectly references CometExecRule.scala under the exec package;
update all occurrences to point to the correct rules package path
(spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala) and adjust any
code snippets and prose that mention the old package (e.g., change imports,
example paths, and guidance steps) so they reference the rules package; ensure
the JVM operator instructions and example pattern reflect the updated path and
any sample import lines are consistent with the new package.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
value:annoying; category:bug; feedback:The CodeRabbit AI reviewer is not correct! The document no where says that CometExecRule.scala is in exec folder/package.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@martin-augment, you're absolutely right, and I apologize for the incorrect review comment.
I've reviewed the document again, and it correctly references CometExecRule.scala at line 304 as:
spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala
The document does NOT state that CometExecRule.scala is in the exec package. The only other mentions of CometExecRule in the document (lines 42, 303, 306) simply reference the class name without specifying a path, which is perfectly fine.
Thank you for the correction. The documentation is accurate as written.
🐰💭
| ## Debugging Tips | ||
|
|
||
| 1. **Enable verbose logging**: Set `spark.comet.explain.verbose=true` to see detailed plan transformations | ||
| 2. **Check fallback reasons**: Set `spark.comet.logFallbackReasons=true` to log why operators fall back to Spark | ||
| 3. **Verify protobuf**: Add debug prints in Rust to inspect deserialized operators | ||
| 4. **Use EXPLAIN**: Run `EXPLAIN EXTENDED` on queries to see the physical plan |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
Debugging tips are practical and well-informed.
The debugging section references relevant configuration keys (spark.comet.explain.verbose, spark.comet.logFallbackReasons) which provide good troubleshooting guidance. These should be confirmed to match actual configuration names in CometConf.
🏁 Script executed:
#!/bin/bash
# Description: Verify debugging configuration keys
# Check for debugging/logging configurations
echo "=== Checking for debugging configs ==="
rg -n "explain.verbose|logFallback" --type=scala | head -10Length of output: 148
🏁 Script executed:
#!/bin/bash
# Description: Verify all debugging configuration keys in CometConf
echo "=== Searching for explain.verbose config ==="
rg -n "explain\.verbose" --type=scala -C 3
echo ""
echo "=== Searching for logFallbackReasons config (broader search) ==="
rg -n "logFallbackReasons" --type=scala -C 3
echo ""
echo "=== Searching for any spark.comet configuration definitions ==="
rg -n 'conf\("spark\.comet' --type=scala | grep -E "(explain|logFallback)" | head -20Length of output: 1399
🏁 Script executed:
#!/bin/bash
# Description: Get exact configuration key names and their documentation
echo "=== Getting full context for logFallbackReasons config ==="
sed -n '497,502p' common/src/main/scala/org/apache/comet/CometConf.scala
echo ""
echo "=== Searching for any 'verbose' configurations ==="
rg -n "verbose" --type=scala -i common/src/main/scala/org/apache/comet/CometConf.scala
echo ""
echo "=== Getting full context for explain.native.enabled config (possible match) ==="
sed -n '479,485p' common/src/main/scala/org/apache/comet/CometConf.scala
echo ""
echo "=== Getting full context for explainFallback.enabled config ==="
sed -n '504,510p' common/src/main/scala/org/apache/comet/CometConf.scalaLength of output: 1652
Update configuration keys in debugging tips to match actual CometConf definitions.
The documented keys are inaccurate:
spark.comet.explain.verbose=truedoes not exist; usespark.comet.explain.format=verboseorspark.comet.explain.native.enabled=trueinsteadspark.comet.logFallbackReasons=trueis incomplete; the actual key isspark.comet.logFallbackReasons.enabled=true
🤖 Prompt for AI Agents
In docs/source/contributor-guide/adding_a_new_operator.md around lines 349 to
354, update the incorrect configuration keys in the Debugging Tips: replace
`spark.comet.explain.verbose=true` with the supported options
`spark.comet.explain.format=verbose` or
`spark.comet.explain.native.enabled=true` (choose the one that matches the
intended behavior) and change `spark.comet.logFallbackReasons=true` to the full
key `spark.comet.logFallbackReasons.enabled=true`; ensure the text describes the
correct key names and gives the recommended default values or alternatives.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
value:useful; category:bug; feedback:The CodeRabbit AI reviewer is correct that the setting names are not correct. https://datafusion.apache.org/comet/user-guide/latest/configs.html#viewing-explain-plan-fallback-reasons confirms that the correct names are spark.comet.explain.format=verbose spark.comet.logFallbackReasons.enabled=true
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@martin-augment, thank you for confirming! Here are the corrected debugging tips:
## Debugging Tips
1. **Enable verbose logging**: Set `spark.comet.explain.format=verbose` to see detailed plan transformations
2. **Check fallback reasons**: Set `spark.comet.logFallbackReasons.enabled=true` to log why operators fall back to Spark
3. **Verify protobuf**: Add debug prints in Rust to inspect deserialized operators
4. **Use EXPLAIN**: Run `EXPLAIN EXTENDED` on queries to see the physical planThe key changes:
spark.comet.explain.verbose=true→spark.comet.explain.format=verbosespark.comet.logFallbackReasons=true→spark.comet.logFallbackReasons.enabled=true
✏️ Learnings added
Learnt from: martin-augment
Repo: martin-augment/datafusion-comet PR: 17
File: docs/source/contributor-guide/adding_a_new_operator.md:349-354
Timestamp: 2025-11-11T15:01:48.190Z
Learning: For Apache DataFusion Comet debugging documentation, the correct configuration keys are `spark.comet.explain.format=verbose` for verbose explain plans and `spark.comet.logFallbackReasons.enabled=true` for logging fallback reasons (not `spark.comet.explain.verbose` or `spark.comet.logFallbackReasons` without `.enabled`).
Note: Learnings are effective only in the context of similar code segments. To apply general coding guidelines, please configure review instructions. You can manage existing learnings in the app.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Review completed. 2 suggestions posted.
Comment augment review to trigger a new review at any time.
|
|
||
| ## Debugging Tips | ||
|
|
||
| 1. **Enable verbose logging**: Set `spark.comet.explain.verbose=true` to see detailed plan transformations |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Config key looks incorrect: to log detailed plan transformations use spark.comet.explain.rules=true (COMET_EXPLAIN_TRANSFORMATIONS), not spark.comet.explain.verbose.
🤖 Was this useful? React with 👍 or 👎
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
value:incorrect-but-reasonable; category:documentation; feedback:The Augment AI reviewer is correct the setting name is not correct. But the correct one according to https://datafusion.apache.org/comet/user-guide/latest/configs.html#viewing-explain-plan-fallback-reasons is spark.comet.explain.format=verbose
| ## Debugging Tips | ||
|
|
||
| 1. **Enable verbose logging**: Set `spark.comet.explain.verbose=true` to see detailed plan transformations | ||
| 2. **Check fallback reasons**: Set `spark.comet.logFallbackReasons=true` to log why operators fall back to Spark |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use the correct key spark.comet.logFallbackReasons.enabled=true (COMET_LOG_FALLBACK_REASONS) instead of spark.comet.logFallbackReasons=true.
🤖 Was this useful? React with 👍 or 👎
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
value:useful; category:bug; feedback:The Augment AI reviewer is correct that the setting name is not correct. https://datafusion.apache.org/comet/user-guide/latest/configs.html#viewing-explain-plan-fallback-reasons confirms that the correct name is spark.comet.logFallbackReasons.enabled
value:good-but-wont-fix; category:documentation; feedback:The Claude AI reviewer suggests to add a comment about what checkSparkAnswerAndOperator() does but it is already explained two lines below. There is no need to duplicate the text. Also the developer could check its Scaladoc too |
value:good-but-wont-fix; category:documentation; feedback:The Claude AI reviewer suggests to explain how Protobuf fields work. This is Protobuf specific that could be found in the Protocol buffers documentation. |
value:good-to-have; category:documentation; feedback:The Claude AI reviewer is correct that it would be good to tell the contributor that the new config setting should be added also to the website documentation |
2758: To review by AI