Skip to content

Fix PartialAggregationOptimizer for multi-partition plans#20719

Merged
bharath-techie merged 3 commits intoopensearch-project:feature/datafusionfrom
sandeshkr419:dcp
Feb 24, 2026
Merged

Fix PartialAggregationOptimizer for multi-partition plans#20719
bharath-techie merged 3 commits intoopensearch-project:feature/datafusionfrom
sandeshkr419:dcp

Conversation

@sandeshkr419
Copy link
Member

@sandeshkr419 sandeshkr419 commented Feb 23, 2026

Description

  • Fix PartialAggregationOptimizer for multi-partition plans by removing FinalPartion agg exec layer
  • Use partial for all aggs - not just approx distinct to make the plan more simple and consistent for all aggs (Keeping back dc check for now)
  • Dead code removal with changes
  • Tested dc with/without other metrics, with/without group by fields, with/without aliases
  • Tested above queries with partition values 1 and above

Problem:
With multi-partition setups, DataFusion generates a two-level aggregation plan:

ProjectionExec
  CoalescePartitionsExec
    FinalPartitioned AggregateExec    ← merges intermediate state → final values (this was problematic layer) 
      CoalesceBatchesExec
        RepartitionExec (Hash)
          RepartitionExec (RoundRobin)
            Partial AggregateExec     ← raw input → intermediate state
              DataSourceExec

The old optimizer converted the FinalPartitioned aggregate to Partial mode, creating two stacked Partial aggregates. The top one received intermediate state (e.g. HLL binary registers) from the bottom Partial and tried to interpret it as raw input → Cast error: Casting from Binary to Int64 not supported.

Single-partition plans worked because DataFusion skips the repartition/Final layer entirely.

Fix:
Instead of converting FinalPartitioned to Partial, simply remove the FinalPartitioned node and return its child. This preserves the repartition/coalesce layers for correct group-by locations while keeping the output in intermediate (partial) form. The Java side already handles merging partial results for all aggs.

Resulting plan:

ProjectionExec
  CoalescePartitionsExec
    CoalesceBatchesExec
      RepartitionExec (Hash)          ← preserved
        RepartitionExec (RoundRobin)  ← preserved
          Partial AggregateExec       ← preserved
            DataSourceExec

Also generalized the optimizer to apply to all aggregation types (not just approx_distinct), since partial and single mode produce equivalent intermediate state for all aggregates, and the Java side merges all of them.


Existing plan:

[DEBUG] Before: ProjectionExec: expr=[min(test-grab-index.common_passenger_id)@1 as min(common_passenger_id), auth.metadata.instance_id@0 as auth.metadata.instance_id, count(Int64(1))@2 as agg_for_doc_count]
  CoalescePartitionsExec: fetch=10000
    AggregateExec: mode=FinalPartitioned, gby=[auth.metadata.instance_id@0 as auth.metadata.instance_id], aggr=[min(test-grab-index.common_passenger_id), count(Int64(1))]
      CoalesceBatchesExec: target_batch_size=8192
        RepartitionExec: partitioning=Hash([auth.metadata.instance_id@0], 2), input_partitions=2
          RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
            AggregateExec: mode=Partial, gby=[auth.metadata.instance_id@0 as auth.metadata.instance_id], aggr=[min(test-grab-index.common_passenger_id), count(Int64(1))]
              DataSourceExec: file_groups={1 group: [[Users/kusandes/workplace/opensearch/OpenSearch/build/testclusters/runTask-0/data/nodes/0/indices/JiZBefTzTj-Sh7378LfMFw/0/parquet/_parquet_file_generation_0.parquet]]}, projection=[auth.metadata.instance_id, common_passenger_id], file_type=parquet

[DEBUG] After: ProjectionExec: expr=[min(test-grab-index.common_passenger_id)@1 as min(common_passenger_id), auth.metadata.instance_id@0 as auth.metadata.instance_id, count(Int64(1))@2 as agg_for_doc_count]
  CoalescePartitionsExec: fetch=10000
    <Layer to be removed - start> 
    AggregateExec: mode=FinalPartitioned, gby=[auth.metadata.instance_id@0 as auth.metadata.instance_id], aggr=[min(test-grab-index.common_passenger_id), count(Int64(1))]
    <Layer to be removed - end>
      CoalesceBatchesExec: target_batch_size=8192
        RepartitionExec: partitioning=Hash([auth.metadata.instance_id@0], 2), input_partitions=2
          RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
            AggregateExec: mode=Partial, gby=[auth.metadata.instance_id@0 as auth.metadata.instance_id], aggr=[min(test-grab-index.common_passenger_id), count(Int64(1))]
              DataSourceExec: file_groups={1 group: [[Users/kusandes/workplace/opensearch/OpenSearch/build/testclusters/runTask-0/data/nodes/0/indices/JiZBefTzTj-Sh7378LfMFw/0/parquet/_parquet_file_generation_0.parquet]]}, projection=[auth.metadata.instance_id, common_passenger_id], file_type=parquet

Related Issues

Resolves #[Issue number to be closed when this PR is merged]

Check List

  • Functionality includes testing.
  • API changes companion pull request created, if applicable.
  • Public documentation issue/PR created, if applicable.

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

Signed-off-by: Sandesh Kumar <sandeshkr419@gmail.com>
@sandeshkr419 sandeshkr419 requested a review from a team as a code owner February 23, 2026 23:09
@github-actions
Copy link
Contributor

github-actions bot commented Feb 23, 2026

PR Reviewer Guide 🔍

(Review updated until commit 8251612)

Here are some key observations to aid the review process:

🧪 No relevant tests
🔒 No security concerns identified
✅ No TODO sections
🔀 No multiple PR themes
⚡ Recommended focus areas for review

Logic Inconsistency

The needs_partial check at line 48-50 only evaluates to true for approx_distinct, but the comment at line 54-58 states "The Java side handles merging partial results across partitions" suggesting this should apply to all aggregations. The PR description mentions "Use partial for all aggs - not just approx distinct" but the code still only checks for approx_distinct. This creates a mismatch between intent and implementation.

let needs_partial = agg.aggr_expr().iter().any(|e| {
    e.fun().name().eq_ignore_ascii_case("approx_distinct")
});

if needs_partial {
    return match agg.mode() {
        AggregateMode::Final | AggregateMode::FinalPartitioned => {
            // Remove the Final/FinalPartitioned node, preserving the
            // repartition/coalesce layers underneath. The Java side
            // handles merging partial results across partitions.
            Ok(new_children[0].clone())
        }
        AggregateMode::Partial => {
            plan.with_new_children(new_children)
        }
        _ => {
            // Single/SinglePartitioned → convert to Partial
            let new_agg = AggregateExec::try_new(
                AggregateMode::Partial,
                agg.group_expr().clone(),
                agg.aggr_expr().to_vec(),
                agg.filter_expr().to_vec(),
                new_children[0].clone(),
                new_children[0].schema(),
            )?;
            Ok(Arc::new(new_agg))
        }
    };
}
Missing Recursion

When needs_partial is false at line 77, the function returns plan.with_new_children(new_children) without recursively processing the children. The children were already processed at lines 41-45, but for non-aggregate nodes below this point, they won't be recursively optimized if the parent aggregate doesn't need partial mode.

return plan.with_new_children(new_children);

@github-actions
Copy link
Contributor

github-actions bot commented Feb 23, 2026

PR Code Suggestions ✨

Latest suggestions up to 8251612
Explore these optional code suggestions:

CategorySuggestion                                                                                                                                    Impact
Possible issue
Handle Result type consistently

The Partial mode branch returns a Result from plan.with_new_children(), but other
branches return Ok(...) directly. This inconsistency could cause compilation errors
if with_new_children() returns a different error type. Wrap the Partial case in Ok()
or use ? operator consistently.

plugins/engine-datafusion/jni/src/partial_agg_optimizer.rs [53-75]

 return match agg.mode() {
     AggregateMode::Final | AggregateMode::FinalPartitioned => {
         // Remove the Final/FinalPartitioned node, preserving the
         // repartition/coalesce layers underneath. The Java side
         // handles merging partial results across partitions.
         Ok(new_children[0].clone())
     }
     AggregateMode::Partial => {
-        plan.with_new_children(new_children)
+        Ok(plan.with_new_children(new_children)?)
     }
     _ => {
         // Single/SinglePartitioned → convert to Partial
         let new_agg = AggregateExec::try_new(
             AggregateMode::Partial,
             agg.group_expr().clone(),
             agg.aggr_expr().to_vec(),
             agg.filter_expr().to_vec(),
             new_children[0].clone(),
             new_children[0].schema(),
         )?;
         Ok(Arc::new(new_agg))
     }
 };
Suggestion importance[1-10]: 9

__

Why: The Partial mode branch returns a Result directly from plan.with_new_children(), while other branches wrap values in Ok(). This type inconsistency will cause a compilation error since the match arms must return the same type. The suggestion correctly identifies and fixes this critical bug.

High

Previous suggestions

Suggestions up to commit 06e8379
CategorySuggestion                                                                                                                                    Impact
Possible issue
Validate schema compatibility before removal

Verify that removing Final/FinalPartitioned aggregates doesn't break schema
compatibility. The removed node may have a different output schema than its child,
potentially causing downstream operators to fail when they expect specific column
names or types from the final aggregation.

plugins/engine-datafusion/jni/src/partial_agg_optimizer.rs [48-70]

 return match agg.mode() {
     AggregateMode::Final | AggregateMode::FinalPartitioned => {
-        // Remove the Final/FinalPartitioned node, preserving the
-        // repartition/coalesce layers underneath. The Java side
-        // handles merging partial results across partitions.
+        // Verify schema compatibility before removing Final node
+        if agg.schema() != new_children[0].schema() {
+            return plan.with_new_children(new_children);
+        }
         Ok(new_children[0].clone())
     }
     AggregateMode::Partial => {
         plan.with_new_children(new_children)
     }
     _ => {
-        // Single/SinglePartitioned → convert to Partial
         let new_agg = AggregateExec::try_new(
             AggregateMode::Partial,
             agg.group_expr().clone(),
             agg.aggr_expr().to_vec(),
             agg.filter_expr().to_vec(),
             new_children[0].clone(),
             new_children[0].schema(),
         )?;
         Ok(Arc::new(new_agg))
     }
 };
Suggestion importance[1-10]: 7

__

Why: The suggestion raises a valid concern about schema compatibility when removing Final/FinalPartitioned nodes. However, the comment in the code explicitly states "The Java side handles merging partial results across partitions," suggesting this is an intentional design decision. The suggestion asks for verification rather than identifying a definite bug, and the schema difference between partial and final aggregations is likely expected behavior in this optimization strategy.

Medium

@github-actions
Copy link
Contributor

❌ Gradle check result for 06e8379: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

Signed-off-by: Sandesh Kumar <sandeshkr419@gmail.com>
@github-actions
Copy link
Contributor

PR Code Analyzer ❗

AI-powered 'Code-Diff-Analyzer' found issues on commit 306796f.

PathLineSeverityDescription
distribution/src/config/jvm.options100criticalJDWP debug agent enabled without address restriction (address=5006). This creates a remote code execution backdoor - anyone who can reach port 5006 can attach a debugger and execute arbitrary code, bypass authentication, and extract credentials. Debug agents should never be enabled in production configurations and must be restricted to localhost if needed for development.

The table above displays the top 10 most important findings.

Total: 1 | Critical: 1 | High: 0 | Medium: 0 | Low: 0


Pull Requests Author(s): Please update your Pull Request according to the report above.

Repository Maintainer(s): You can bypass diff analyzer by adding label skip-diff-analyzer after reviewing the changes carefully, then re-run failed actions. To re-enable the analyzer, remove the label, then re-run all actions.


⚠️ Note: The Code-Diff-Analyzer helps protect against potentially harmful code patterns. Please ensure you have thoroughly reviewed the changes beforehand.

Thanks.

@github-actions
Copy link
Contributor

Persistent review updated to latest commit 8251612

@bharath-techie bharath-techie merged commit 47e7fa2 into opensearch-project:feature/datafusion Feb 24, 2026
9 of 32 checks passed
@github-actions
Copy link
Contributor

❌ Gradle check result for 8251612: null

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants