Skip to content

Make tokio concurrency settings dynamic#21817

Open
AjayRajNelapudi wants to merge 1 commit into
opensearch-project:mainfrom
AjayRajNelapudi:feature/concurrency-settings
Open

Make tokio concurrency settings dynamic#21817
AjayRajNelapudi wants to merge 1 commit into
opensearch-project:mainfrom
AjayRajNelapudi:feature/concurrency-settings

Conversation

@AjayRajNelapudi
Copy link
Copy Markdown
Contributor

Description

Make tokio concurrency settings dynamic

% curl -s 'localhost:9200/_cluster/settings?include_defaults=true&flat_settings=true&pretty' | grep -A0 'concurrency'
    "datafusion.concurrency.coordinator_multiplier" : "1.5",
    "datafusion.concurrency.datanode_multiplier" : "1.5",
    "node.auto_force_merge.threads.concurrency_multiplier" : "2",

% curl -s 'localhost:9200/_plugins/analytics_backend_datafusion/stats?pretty' | grep -A8 'datanode_gate\|coordinator_gate'
  "datanode_gate" : {
    "max_permits" : 12,
    "active_permits" : 0,
    "total_wait_duration_ms" : 0,
    "total_batches_started" : 0,
    "poison_permits" : 0,
    "target_max_permits" : 12
  },
  "coordinator_gate" : {
    "max_permits" : 12,
    "active_permits" : 0,
    "total_wait_duration_ms" : 0,
    "total_batches_started" : 0,
    "poison_permits" : 0,
    "target_max_permits" : 12
  }
}

% curl -s -X PUT 'localhost:9200/_cluster/settings?pretty' -H 'Content-Type: application/json' -d '{
  "persistent": { "datafusion.concurrency.datanode_multiplier": 5.0 }
}'
{
  "acknowledged" : true,
  "persistent" : {
    "datafusion" : {
      "concurrency" : {
        "datanode_multiplier" : "5.0"
      }
    }
  },
  "transient" : { }
}

% curl -s -X PUT 'localhost:9200/_cluster/settings?pretty' -H 'Content-Type: application/json' -d '{
  "persistent": { "datafusion.concurrency.coordinator_multiplier": 5.0 }
}'
{
  "acknowledged" : true,
  "persistent" : {
    "datafusion" : {
      "concurrency" : {
        "coordinator_multiplier" : "5.0"
      }
    }
  },
  "transient" : { }
}

% curl -s 'localhost:9200/_plugins/analytics_backend_datafusion/stats?pretty' | grep -A8 'datanode_gate\|coordinator_gate'
  "datanode_gate" : {
    "max_permits" : 40,
    "active_permits" : 0,
    "total_wait_duration_ms" : 0,
    "total_batches_started" : 0,
    "poison_permits" : 0,
    "target_max_permits" : 40
  },
  "coordinator_gate" : {
    "max_permits" : 40,
    "active_permits" : 0,
    "total_wait_duration_ms" : 0,
    "total_batches_started" : 0,
    "poison_permits" : 0,
    "target_max_permits" : 40
  }
}

% curl -s -X PUT 'localhost:9200/_cluster/settings?pretty' -H 'Content-Type: application/json' -d '{
  "persistent": { "datafusion.concurrency.datanode_multiplier": 0.5 }
}'
{
  "acknowledged" : true,
  "persistent" : {
    "datafusion" : {
      "concurrency" : {
        "datanode_multiplier" : "0.5"
      }
    }
  },
  "transient" : { }
}

% curl -s -X PUT 'localhost:9200/_cluster/settings?pretty' -H 'Content-Type: application/json' -d '{
  "persistent": { "datafusion.concurrency.coordinator_multiplier": 0.5 }
}'
{
  "acknowledged" : true,
  "persistent" : {
    "datafusion" : {
      "concurrency" : {
        "coordinator_multiplier" : "0.5"
      }
    }
  },
  "transient" : { }
}

% curl -s 'localhost:9200/_plugins/analytics_backend_datafusion/stats?pretty' | grep -A8 'datanode_gate\|coordinator_gate'
  "datanode_gate" : {
    "max_permits" : 4,
    "active_permits" : 0,
    "total_wait_duration_ms" : 0,
    "total_batches_started" : 0,
    "poison_permits" : 1,
    "target_max_permits" : 4
  },
  "coordinator_gate" : {
    "max_permits" : 4,
    "active_permits" : 0,
    "total_wait_duration_ms" : 0,
    "total_batches_started" : 0,
    "poison_permits" : 1,
    "target_max_permits" : 4
  }
}

% curl -s -X PUT 'localhost:9200/_cluster/settings?pretty' -H 'Content-Type: application/json' -d '{
  "persistent": {
    "datafusion.concurrency.datanode_multiplier": null,
    "datafusion.concurrency.coordinator_multiplier": null
  }
}'
{
  "acknowledged" : true,
  "persistent" : { },
  "transient" : { }
}

% curl -s 'localhost:9200/_plugins/analytics_backend_datafusion/stats?pretty' | grep -A8 'datanode_gate\|coordinator_gate'
  "datanode_gate" : {
    "max_permits" : 12,
    "active_permits" : 4294967261,
    "total_wait_duration_ms" : 0,
    "total_batches_started" : 0,
    "poison_permits" : 0,
    "target_max_permits" : 12
  },
  "coordinator_gate" : {
    "max_permits" : 12,
    "active_permits" : 4294967261,
    "total_wait_duration_ms" : 0,
    "total_batches_started" : 0,
    "poison_permits" : 0,
    "target_max_permits" : 12
  }
}

% curl -s -X PUT 'localhost:9200/_cluster/settings?pretty' -H 'Content-Type: application/json' -d '{
  "persistent": { "datafusion.concurrency.datanode_multiplier": 0.05 }
}'
{
  "error" : {
    "root_cause" : [
      {
        "type" : "illegal_argument_exception",
        "reason" : "Failed to parse value [0.05] for setting [datafusion.concurrency.datanode_multiplier] must be >= 0.1"
      }
    ],
    "type" : "illegal_argument_exception",
    "reason" : "Failed to parse value [0.05] for setting [datafusion.concurrency.datanode_multiplier] must be >= 0.1"
  },
  "status" : 400
}

% curl -s -X PUT 'localhost:9200/_cluster/settings?pretty' -H 'Content-Type: application/json' -d '{
  "persistent": { "datafusion.concurrency.datanode_multiplier": 15.0 }
}'
{
  "error" : {
    "root_cause" : [
      {
        "type" : "illegal_argument_exception",
        "reason" : "Failed to parse value [15.0] for setting [datafusion.concurrency.datanode_multiplier] must be <= 10.0"
      }
    ],
    "type" : "illegal_argument_exception",
    "reason" : "Failed to parse value [15.0] for setting [datafusion.concurrency.datanode_multiplier] must be <= 10.0"
  },
  "status" : 400
}
% 

Related Issues

Resolves #[Issue number to be closed when this PR is merged]

Check List

  • Functionality includes testing.
  • API changes companion pull request created, if applicable.
  • Public documentation issue/PR created, if applicable.

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

Signed-off-by: Ajay Raj Nelapudi <ajnelapu@amazon.com>
@AjayRajNelapudi AjayRajNelapudi requested a review from a team as a code owner May 24, 2026 10:41
@github-actions
Copy link
Copy Markdown
Contributor

PR Code Analyzer ❗

AI-powered 'Code-Diff-Analyzer' found issues on commit c4e2de2.

PathLineSeverityDescription
sandbox/plugins/analytics-backend-datafusion/rust/src/executor.rs367highNew `use proptest::prelude::*;` import added in the test module implies the addition of the 'proptest' crate as a dependency. No corresponding Cargo.toml change is visible in this diff, which may mean the dependency change is in a separate commit or file not included here. Per mandatory policy, any dependency addition must be flagged regardless of apparent legitimacy — maintainers should verify proptest appears in Cargo.toml (dev-dependencies) and confirm no Cargo.lock changes introduce unexpected transitive dependencies.

The table above displays the top 10 most important findings.

Total: 1 | Critical: 0 | High: 1 | Medium: 0 | Low: 0


Pull Requests Author(s): Please update your Pull Request according to the report above.

Repository Maintainer(s): You can bypass diff analyzer by adding label skip-diff-analyzer after reviewing the changes carefully, then re-run failed actions. To re-enable the analyzer, remove the label, then re-run all actions.


⚠️ Note: The Code-Diff-Analyzer helps protect against potentially harmful code patterns. Please ensure you have thoroughly reviewed the changes beforehand.

Thanks.

@bharath-techie bharath-techie added the skip-diff-analyzer Maintainer to skip code-diff-analyzer check, after reviewing issues in AI analysis. label May 24, 2026
@github-actions
Copy link
Copy Markdown
Contributor

PR Reviewer Guide 🔍

Here are some key observations to aid the review process:

🧪 PR contains tests
🔒 No security concerns identified
✅ No TODO sections
🔀 No multiple PR themes
⚡ Recommended focus areas for review

Possible Issue

In active_permits(), the calculation self.max_permits.load(Ordering::Acquire) - self.semaphore.available_permits() as u32 can underflow if available_permits() exceeds max_permits during a scale-up-after-scale-down. This occurs because the bulk release of poison permits (via acquire_many_owned) can temporarily make available_permits() larger than max_permits. The subtraction wraps around on unsigned integers, producing an incorrect large value.

    self.max_permits.load(Ordering::Acquire) - self.semaphore.available_permits() as u32
}
Possible Issue

In resize() scale-down path, if existing_poison >= total_poison_needed, the code releases excess poison permits by popping from the vector. However, acquire_many_owned(N) returns a single OwnedSemaphorePermit representing N permits. Popping one entry releases all N permits at once, not one permit. If existing_poison counts vector entries (not individual permits), the logic is incorrect when a single entry holds multiple permits. This can cause the effective capacity to differ from the target.

// We already hold enough poison permits; release excess
let excess = existing_poison - total_poison_needed;
for _ in 0..excess {
    state.poison_permits.pop();
}
self.max_permits.store(new_max, Ordering::Release);
Possible Issue

In resize() scale-up path, the calculation permits_from_poison = poison_count.min(new_max - current_max) assumes each vector entry represents one permit. If a single OwnedSemaphorePermit entry holds multiple permits (from acquire_many_owned), popping one entry releases more permits than intended. This causes the semaphore to have more available permits than new_max, breaking the capacity invariant.

let permits_from_poison = poison_count.min(new_max - current_max);
for _ in 0..permits_from_poison {
    state.poison_permits.pop();
}
let still_needed = (new_max - current_max) - permits_from_poison;

@github-actions
Copy link
Copy Markdown
Contributor

PR Code Suggestions ✨

Explore these optional code suggestions:

CategorySuggestion                                                                                                                                    Impact
General
Refresh CPU thread count dynamically

The cpuThreads value is captured once at plugin initialization and never updated. If
the JVM's available processors change at runtime (e.g., container CPU quota
adjustment), the gate permits will be computed using stale values. Consider fetching
cpuThreads inside the lambda to reflect current processor availability.

sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/DataFusionPlugin.java [317-325]

 clusterService.getClusterSettings().addSettingsUpdateConsumer(DatafusionSettings.CONCURRENCY_DATANODE_MULTIPLIER, multiplier -> {
-    int newMax = Math.max(1, (int) (cpuThreads * multiplier));
+    int currentCpuThreads = DataFusionService.cpuThreadCount();
+    int newMax = Math.max(1, (int) (currentCpuThreads * multiplier));
     NativeBridge.updateConcurrencyGate("datanode", newMax);
 });
 
 clusterService.getClusterSettings().addSettingsUpdateConsumer(DatafusionSettings.CONCURRENCY_COORDINATOR_MULTIPLIER, multiplier -> {
-    int newMax = Math.max(1, (int) (cpuThreads * multiplier));
+    int currentCpuThreads = DataFusionService.cpuThreadCount();
+    int newMax = Math.max(1, (int) (currentCpuThreads * multiplier));
     NativeBridge.updateConcurrencyGate("coordinator", newMax);
 });
Suggestion importance[1-10]: 7

__

Why: The suggestion correctly identifies that cpuThreads is captured once at initialization. While Runtime.getRuntime().availableProcessors() can change in containerized environments, this is an edge case. The suggestion improves robustness by fetching the current value on each update, though the impact is moderate since CPU quota changes are rare.

Medium
Optimize no-op resize path

The resize operation acquires the mutex before validating the new value against the
current state. If new_max equals current_max, the mutex is held unnecessarily. Move
the no-op check before acquiring the mutex to avoid contention when settings are
repeatedly applied with the same value.

sandbox/plugins/analytics-backend-datafusion/rust/src/executor.rs [129-144]

 pub async fn resize(&self, new_max: u32, gate_name: &str) {
     // Validate bounds
     if new_max < 1 || new_max > Semaphore::MAX_PERMITS as u32 {
         warn!(
             "[{}] resize rejected: new_max {} out of bounds [1, {}]",
             gate_name, new_max, Semaphore::MAX_PERMITS
         );
         return;
     }
 
+    let current_max = self.max_permits.load(Ordering::Acquire);
+    if new_max == current_max {
+        return; // No-op: avoid acquiring mutex
+    }
+
     let mut state = self.resize_mutex.lock().await;
-    let current_max = self.max_permits.load(Ordering::Acquire);
Suggestion importance[1-10]: 6

__

Why: The suggestion correctly identifies that the no-op check occurs after acquiring the mutex. Moving the check before the mutex acquisition would reduce contention when settings are repeatedly applied with the same value. However, the current code already has a no-op check at line 142-144 inside the mutex, so the optimization is valid but offers only marginal performance improvement.

Low

@github-actions
Copy link
Copy Markdown
Contributor

✅ Gradle check result for c4e2de2: SUCCESS

@codecov
Copy link
Copy Markdown

codecov Bot commented May 24, 2026

Codecov Report

✅ All modified and coverable lines are covered by tests.
✅ Project coverage is 73.46%. Comparing base (19b99d8) to head (c4e2de2).
⚠️ Report is 1 commits behind head on main.

Additional details and impacted files
@@             Coverage Diff              @@
##               main   #21817      +/-   ##
============================================
+ Coverage     73.38%   73.46%   +0.08%     
- Complexity    75271    75355      +84     
============================================
  Files          6028     6028              
  Lines        342014   342014              
  Branches      49186    49186              
============================================
+ Hits         250972   251247     +275     
+ Misses        71110    70784     -326     
- Partials      19932    19983      +51     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@Bukhtawar Bukhtawar self-requested a review May 25, 2026 05:23
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

skip-diff-analyzer Maintainer to skip code-diff-analyzer check, after reviewing issues in AI analysis.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants