Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added in-flight cancellation of SearchShardTask based on resource consumption #4575

Merged

Conversation

ketanv3
Copy link
Contributor

@ketanv3 ketanv3 commented Sep 23, 2022

Description

This feature aims to identify and cancel resource intensive SearchShardTasks if they have breached certain thresholds. This will help in terminating problematic queries which can put nodes in duress and degrade the cluster performance.

This PR is intended to add the core framework for in-flight cancellation of search requests. The actual implementations of ResourceUsageTrackers (which decide if a task should be cancelled), along with the node stats API will be added in a later PR.

Issues Resolved

#1181

Check List

  • New functionality includes testing.
    • All tests pass
  • New functionality has been documented.
    • New functionality has javadoc added
  • Commits are signed per the DCO using --signoff
  • Commit changes are listed out in CHANGELOG.md file (See: Changelog)

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

Signed-off-by: Ketan Verma ketan9495@gmail.com

@github-actions
Copy link
Contributor

Gradle Check (Jenkins) Run Completed with:

@codecov-commenter
Copy link

codecov-commenter commented Sep 23, 2022

Codecov Report

Merging #4575 (6424470) into main (5d7d83e) will increase coverage by 0.15%.
The diff coverage is 77.92%.

@@             Coverage Diff              @@
##               main    #4575      +/-   ##
============================================
+ Coverage     70.60%   70.75%   +0.15%     
- Complexity    57603    57794     +191     
============================================
  Files          4675     4687      +12     
  Lines        276925   277290     +365     
  Branches      40347    40376      +29     
============================================
+ Hits         195517   196193     +676     
+ Misses        65156    64779     -377     
- Partials      16252    16318      +66     
Impacted Files Coverage Δ
...ain/java/org/opensearch/cluster/ClusterModule.java 100.00% <ø> (ø)
...rg/opensearch/common/settings/ClusterSettings.java 91.89% <ø> (ø)
...ain/java/org/opensearch/tasks/CancellableTask.java 82.35% <ø> (-5.89%) ⬇️
...backpressure/settings/SearchShardTaskSettings.java 51.42% <51.42%> (ø)
...kpressure/settings/SearchBackpressureSettings.java 67.34% <67.34%> (ø)
...arch/backpressure/settings/NodeDuressSettings.java 70.00% <70.00%> (ø)
...n/java/org/opensearch/common/util/TokenBucket.java 74.28% <74.28%> (ø)
...search/backpressure/SearchBackpressureService.java 77.31% <77.31%> (ø)
.../opensearch/tasks/TaskResourceTrackingService.java 80.00% <81.81%> (+0.23%) ⬆️
...in/java/org/opensearch/tasks/TaskCancellation.java 89.65% <89.65%> (ø)
... and 502 more

Help us with your feedback. Take ten seconds to tell us how you rate us. Have a feature suggestion? Share it here.

@ketanv3 ketanv3 marked this pull request as ready for review September 23, 2022 09:10
@ketanv3 ketanv3 requested review from a team and reta as code owners September 23, 2022 09:10
Copy link
Collaborator

@Bukhtawar Bukhtawar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @ketanv3 I have shared few thoughts on how we could break down the core logic into separate classes for better maintainability.

Comment on lines +34 to +44
public synchronized double record(long value) {
long delta = value - observations[(int) (count % observations.length)];
observations[(int) (count % observations.length)] = value;

count++;
sum += delta;
average = (double) sum / Math.min(count, observations.length);
return average;
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use a queue and see if we can avoid the synchronized block by using CAS if possible

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I actually plan to benchmark CAS vs synchronized approaches before committing to either one of them, especially since the operations are pretty simple and quick to execute.

If there are major gains with CAS then it makes sense to go with it. Otherwise, I would prefer keeping it simple and readable.

Copy link
Contributor Author

@ketanv3 ketanv3 Oct 2, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did not observe any major gains with queue + CAS approach as followed in indexing back-pressure (here). With higher thread contention, it performed slightly poorly.

# JMH version: 1.35
# VM version: JDK 17.0.3, OpenJDK 64-Bit Server VM, 17.0.3+7

Benchmark (1 thread - no contention)           Mode  Cnt   Score   Error  Units
MovingAverageBenchmark.timeMovingAverage       avgt    5  25.669 ± 1.884  ns/op
MovingAverageBenchmark.timeMovingAverageQueue  avgt    5  25.213 ± 0.383  ns/op

Benchmark (4 threads - low contention)         Mode  Cnt    Score   Error  Units
MovingAverageBenchmark.timeMovingAverage       avgt    5  217.714 ± 6.676  ns/op
MovingAverageBenchmark.timeMovingAverageQueue  avgt    5  223.088 ± 3.651  ns/op

Benchmark (16 threads - high contention)       Mode  Cnt    Score   Error   Units
MovingAverageBenchmark.timeMovingAverage       avgt    5  785.830 ± 13.446  ns/op
MovingAverageBenchmark.timeMovingAverageQueue  avgt    5  792.442 ± 64.234  ns/op

It is also worth noting that the current implementation of moving average in shard indexing back-pressure has subtle race-condition bugs. Two or more concurrent threads could reach this point and remove excessive elements from the queue leading to incorrect results, or even NPE in the worst case. Another problem is lack of causal ordering – an older thread may overwrite a new thread's average at this point.

@ketanv3 ketanv3 force-pushed the feature/inflight-cancellation-core branch from 4456edd to caad78d Compare September 24, 2022 13:17
@github-actions
Copy link
Contributor

Gradle Check (Jenkins) Run Completed with:

@ketanv3 ketanv3 force-pushed the feature/inflight-cancellation-core branch from caad78d to ef01ba3 Compare September 24, 2022 14:09
@github-actions
Copy link
Contributor

Gradle Check (Jenkins) Run Completed with:

@ketanv3 ketanv3 force-pushed the feature/inflight-cancellation-core branch from ef01ba3 to 9d0f2f0 Compare September 24, 2022 14:37
@github-actions
Copy link
Contributor

Gradle Check (Jenkins) Run Completed with:

@ketanv3 ketanv3 force-pushed the feature/inflight-cancellation-core branch from 9d0f2f0 to 1f33317 Compare October 4, 2022 13:43
@github-actions
Copy link
Contributor

github-actions bot commented Oct 4, 2022

Gradle Check (Jenkins) Run Completed with:

@ketanv3 ketanv3 force-pushed the feature/inflight-cancellation-core branch from 1f33317 to 6445b4a Compare October 4, 2022 14:13
@github-actions
Copy link
Contributor

github-actions bot commented Oct 4, 2022

Gradle Check (Jenkins) Run Completed with:

@ketanv3 ketanv3 requested review from nssuresh2007 and Bukhtawar and removed request for Bukhtawar and nssuresh2007 October 4, 2022 20:08
Copy link

@nssuresh2007 nssuresh2007 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

server/src/main/java/org/opensearch/common/Streak.java Outdated Show resolved Hide resolved
Comment on lines +35 to +44
*/
public synchronized double record(long value) {
long delta = value - observations[(int) (count % observations.length)];
observations[(int) (count % observations.length)] = value;

count++;
sum += delta;
average = (double) sum / Math.min(count, observations.length);
return average;
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use CAS here with a do-while loop

Copy link
Contributor Author

@ketanv3 ketanv3 Oct 10, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using CAS backed by a queue or a ring-buffer, it may be possible to track the moving average similar to how it's done in indexing backpressure (though the current implementation has subtle race-condition bugs which I have highlighted in an above comment).

To successfully implement this, we need to ensure the last 'n' items, running sum, and count of inserted items are updated atomically; which may not be possible with CAS alone. We need to treat the entire state (even the backing queue/buffer) as immutable and create a new copy with every update (similar concept to CopyOnWriteArrayList).

Our use-case is write heavy (on task completion) with infrequent reads (on search backpressure service iteration), creating copies may be very expensive especially for larger window sizes. I'm still inclined to use the current approach.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Benchmark results comparing the existing approach v/s CAS backed by an immutable ring-buffer.

Using synchronized (current approach):

10 observations 100 observations 1000 observations
1 thread 14.123 ns/op 14.258 ns/op 14.544 ns/op
4 threads 368.087 ns/op 364.879 ns/op 378.378 ns/op
16 threads 1400.703 ns/op 1506.456 ns/op 1809.835 ns/op

Using compare-and-set backed by an immutable ring-buffer (implementation reference):

10 observations 100 observations 1000 observations
1 thread 27.438 ns/op 112.920 ns/op 1082.057 ns/op
4 threads 798.966 ns/op 1077.040 ns/op 4675.375 ns/op
16 threads 5820.276 ns/op 8374.579 ns/op 36474.605 ns/op

Key observations:

  • Synchronized approach doesn't have to clone the backing buffer, so, the time complexity doesn't grow proportional to the window size. This is a major drawback with the CAS approach as repeated array copy is expensive and creates a lot of GC overhead too.
  • Even with smaller window sizes, the performance of CAS becomes poorer as the number of threads (and the contention) grows.
  • Synchronized approach performed between 2–20x better for these measurements.

Comment on lines 82 to 90
synchronized (this) {
refill();

if (tokens >= n) {
tokens -= n;
return true;
}

return false;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use CAS here as well?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be doable. Will make the changes.

import java.util.concurrent.atomic.AtomicLong;
import java.util.function.LongSupplier;

public class TokenBucketTests extends OpenSearchTestCase {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We might need multi-threaded tests for guaranteeing thread safety on all new utilities we are building

Comment on lines +28 to +34
public long incrementCancellations() {
return cancellations.incrementAndGet();
}

public long getCancellations() {
return cancellations.get();
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we delegate this to TaskCancellation instead

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here we are tracking the total cancellations done by a task resource usage tracker.

On the other hand, TaskCancellation is a wrapper for a single cancellable task along with the list of cancellation reasons and callbacks. The callbacks in it are responsible for calling incrementCancellations() on the relevant trackers.

Snippet for reference:

TaskCancellation getTaskCancellation(CancellableTask task) {
    List<TaskCancellation.Reason> reasons = new ArrayList<>();
    List<Runnable> callbacks = new ArrayList<>();

    for (TaskResourceUsageTracker tracker : taskResourceUsageTrackers) {
        Optional<TaskCancellation.Reason> reason = tracker.cancellationReason(task);
        if (reason.isPresent()) {
            reasons.add(reason.get());
            callbacks.add(tracker::incrementCancellations);
        }
    }

    return new TaskCancellation(task, reasons, callbacks);
}

@ketanv3 ketanv3 force-pushed the feature/inflight-cancellation-core branch 3 times, most recently from 64b3123 to 8d46e74 Compare October 10, 2022 12:38
@github-actions
Copy link
Contributor

Gradle Check (Jenkins) Run Completed with:

@github-actions
Copy link
Contributor

Gradle Check (Jenkins) Run Completed with:

@github-actions
Copy link
Contributor

Gradle Check (Jenkins) Run Completed with:

@ketanv3 ketanv3 force-pushed the feature/inflight-cancellation-core branch from 8d46e74 to 69a2ff8 Compare October 10, 2022 22:57
@github-actions
Copy link
Contributor

Gradle Check (Jenkins) Run Completed with:

Copy link
Collaborator

@Bukhtawar Bukhtawar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @ketanv3 please add links to follow ups for

  1. Multi-threaded tests
  2. Coordinator cancellation at parent

@github-actions
Copy link
Contributor

Gradle Check (Jenkins) Run Completed with:

…sumption

This feature aims to identify and cancel resource intensive SearchShardTasks if they have breached certain
thresholds. This will help in terminating problematic queries which can put nodes in duress and degrade the
cluster performance.

Signed-off-by: Ketan Verma <ketan9495@gmail.com>
@ketanv3 ketanv3 force-pushed the feature/inflight-cancellation-core branch from 2421979 to 3c19186 Compare October 14, 2022 05:18
@github-actions
Copy link
Contributor

Gradle Check (Jenkins) Run Completed with:

@github-actions
Copy link
Contributor

Gradle Check (Jenkins) Run Completed with:

@ketanv3 ketanv3 force-pushed the feature/inflight-cancellation-core branch from 3c19186 to bfbebef Compare October 14, 2022 05:51
@github-actions
Copy link
Contributor

Gradle Check (Jenkins) Run Completed with:

@ketanv3 ketanv3 force-pushed the feature/inflight-cancellation-core branch from bfbebef to 6282515 Compare October 14, 2022 06:19
@github-actions
Copy link
Contributor

Gradle Check (Jenkins) Run Completed with:

Signed-off-by: Ketan Verma <ketan9495@gmail.com>
@ketanv3 ketanv3 force-pushed the feature/inflight-cancellation-core branch from 6282515 to 6424470 Compare October 14, 2022 07:01
@github-actions
Copy link
Contributor

Gradle Check (Jenkins) Run Completed with:

@Bukhtawar Bukhtawar merged commit d708860 into opensearch-project:main Oct 14, 2022
ketanv3 added a commit to ketanv3/OpenSearch that referenced this pull request Nov 1, 2022
…on resource consumption (opensearch-project#4575)

This feature aims to identify and cancel resource intensive SearchShardTasks if they have breached certain
thresholds. This will help in terminating problematic queries which can put nodes in duress and degrade the
cluster performance.

Signed-off-by: Ketan Verma <ketan9495@gmail.com>
ketanv3 added a commit to ketanv3/OpenSearch that referenced this pull request Nov 2, 2022
…on resource consumption (opensearch-project#4575)

This feature aims to identify and cancel resource intensive SearchShardTasks if they have breached certain
thresholds. This will help in terminating problematic queries which can put nodes in duress and degrade the
cluster performance.

Signed-off-by: Ketan Verma <ketan9495@gmail.com>
Bukhtawar pushed a commit that referenced this pull request Nov 3, 2022
…ource consumption (#5039)

* [Backport 2.x] Added in-flight cancellation of SearchShardTask based on resource consumption (#4575)

This feature aims to identify and cancel resource intensive SearchShardTasks if they have breached certain
thresholds. This will help in terminating problematic queries which can put nodes in duress and degrade the
cluster performance.

* [Backport 2.x] Added resource usage trackers for in-flight cancellation of SearchShardTask (#4805)

1. CpuUsageTracker: cancels tasks if they consume too much CPU
2. ElapsedTimeTracker: cancels tasks if they consume too much time
3. HeapUsageTracker: cancels tasks if they consume too much heap

* [Backport 2.x]Added search backpressure stats API

Added search backpressure stats to the existing node/stats API to describe:
1. the number of cancellations (currently for SearchShardTask only)
2. the current state of TaskResourceUsageTracker

Signed-off-by: Ketan Verma <ketan9495@gmail.com>
ketanv3 added a commit to ketanv3/OpenSearch that referenced this pull request Nov 3, 2022
…ource consumption (opensearch-project#5039)

* [Backport 2.x] Added in-flight cancellation of SearchShardTask based on resource consumption (opensearch-project#4575)

This feature aims to identify and cancel resource intensive SearchShardTasks if they have breached certain
thresholds. This will help in terminating problematic queries which can put nodes in duress and degrade the
cluster performance.

* [Backport 2.x] Added resource usage trackers for in-flight cancellation of SearchShardTask (opensearch-project#4805)

1. CpuUsageTracker: cancels tasks if they consume too much CPU
2. ElapsedTimeTracker: cancels tasks if they consume too much time
3. HeapUsageTracker: cancels tasks if they consume too much heap

* [Backport 2.x]Added search backpressure stats API

Added search backpressure stats to the existing node/stats API to describe:
1. the number of cancellations (currently for SearchShardTask only)
2. the current state of TaskResourceUsageTracker

Signed-off-by: Ketan Verma <ketan9495@gmail.com>
opensearch-trigger-bot bot pushed a commit that referenced this pull request Nov 3, 2022
…ource consumption (#5039)

* [Backport 2.x] Added in-flight cancellation of SearchShardTask based on resource consumption (#4575)

This feature aims to identify and cancel resource intensive SearchShardTasks if they have breached certain
thresholds. This will help in terminating problematic queries which can put nodes in duress and degrade the
cluster performance.

* [Backport 2.x] Added resource usage trackers for in-flight cancellation of SearchShardTask (#4805)

1. CpuUsageTracker: cancels tasks if they consume too much CPU
2. ElapsedTimeTracker: cancels tasks if they consume too much time
3. HeapUsageTracker: cancels tasks if they consume too much heap

* [Backport 2.x]Added search backpressure stats API

Added search backpressure stats to the existing node/stats API to describe:
1. the number of cancellations (currently for SearchShardTask only)
2. the current state of TaskResourceUsageTracker

Signed-off-by: Ketan Verma <ketan9495@gmail.com>
(cherry picked from commit 7c521b9)
Bukhtawar pushed a commit that referenced this pull request Nov 3, 2022
…ource consumption (#5039) (#5058)

* [Backport 2.x] Added in-flight cancellation of SearchShardTask based on resource consumption (#4575)

This feature aims to identify and cancel resource intensive SearchShardTasks if they have breached certain
thresholds. This will help in terminating problematic queries which can put nodes in duress and degrade the
cluster performance.

* [Backport 2.x] Added resource usage trackers for in-flight cancellation of SearchShardTask (#4805)

1. CpuUsageTracker: cancels tasks if they consume too much CPU
2. ElapsedTimeTracker: cancels tasks if they consume too much time
3. HeapUsageTracker: cancels tasks if they consume too much heap

* [Backport 2.x]Added search backpressure stats API

Added search backpressure stats to the existing node/stats API to describe:
1. the number of cancellations (currently for SearchShardTask only)
2. the current state of TaskResourceUsageTracker

Signed-off-by: Ketan Verma <ketan9495@gmail.com>
(cherry picked from commit 7c521b9)

Co-authored-by: Ketan Verma <ketanv3@users.noreply.github.com>
ashking94 pushed a commit to ashking94/OpenSearch that referenced this pull request Nov 7, 2022
…sumption (opensearch-project#4575)

* Added in-flight cancellation of SearchShardTask based on resource consumption

This feature aims to identify and cancel resource intensive SearchShardTasks if they have breached certain
thresholds. This will help in terminating problematic queries which can put nodes in duress and degrade the
cluster performance.

Signed-off-by: Ketan Verma <ketan9495@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants