Skip to content

Commit

Permalink
Revert "Add basic thorttler/exponential backoff policy for retry/Defi…
Browse files Browse the repository at this point in the history
…nation o… (opensearch-project#3527)" (opensearch-project#3852)

This reverts commit b8db5bf.

Signed-off-by: Daniel (dB.) Doubrovkine <dblock@amazon.com>
  • Loading branch information
dblock authored Jul 11, 2022
1 parent 33138ed commit f35b42f
Show file tree
Hide file tree
Showing 11 changed files with 81 additions and 445 deletions.
9 changes: 0 additions & 9 deletions server/src/main/java/org/opensearch/OpenSearchException.java
Original file line number Diff line number Diff line change
Expand Up @@ -1601,15 +1601,6 @@ private enum OpenSearchExceptionHandle {
org.opensearch.indices.replication.common.ReplicationFailedException::new,
161,
V_2_1_0
),
/**
* TODO: Change the version number of check as per version in which this change will be merged.
*/
MASTER_TASK_THROTTLED_EXCEPTION(
org.opensearch.cluster.service.MasterTaskThrottlingException.class,
org.opensearch.cluster.service.MasterTaskThrottlingException::new,
162,
Version.V_3_0_0
);

final Class<? extends OpenSearchException> exceptionClass;
Expand Down
73 changes: 0 additions & 73 deletions server/src/main/java/org/opensearch/action/bulk/BackoffPolicy.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@

package org.opensearch.action.bulk;

import org.opensearch.common.Randomness;
import org.opensearch.common.unit.TimeValue;

import java.util.Iterator;
Expand Down Expand Up @@ -106,19 +105,6 @@ public static BackoffPolicy exponentialBackoff(TimeValue initialDelay, int maxNu
return new ExponentialBackoff((int) checkDelay(initialDelay).millis(), maxNumberOfRetries);
}

/**
* It provides exponential backoff between retries until it reaches maxDelayForRetry.
* It uses equal jitter scheme as it is being used for throttled exceptions.
* It will make random distribution and also guarantees a minimum delay.
*
* @param baseDelay BaseDelay for exponential Backoff
* @param maxDelayForRetry MaxDelay that can be returned from backoff policy
* @return A backoff policy with exponential backoff with equal jitter which can't return delay more than given max delay
*/
public static BackoffPolicy exponentialEqualJitterBackoff(int baseDelay, int maxDelayForRetry) {
return new ExponentialEqualJitterBackoff(baseDelay, maxDelayForRetry);
}

/**
* Wraps the backoff policy in one that calls a method every time a new backoff is taken from the policy.
*/
Expand Down Expand Up @@ -211,65 +197,6 @@ public TimeValue next() {
}
}

private static class ExponentialEqualJitterBackoff extends BackoffPolicy {
private final int maxDelayForRetry;
private final int baseDelay;

private ExponentialEqualJitterBackoff(int baseDelay, int maxDelayForRetry) {
this.maxDelayForRetry = maxDelayForRetry;
this.baseDelay = baseDelay;
}

@Override
public Iterator<TimeValue> iterator() {
return new ExponentialEqualJitterBackoffIterator(baseDelay, maxDelayForRetry);
}
}

private static class ExponentialEqualJitterBackoffIterator implements Iterator<TimeValue> {
/**
* Retry limit to avoids integer overflow issues.
* Post this limit, max delay will be returned with Equal Jitter.
*
* NOTE: If the value is greater than 30, there can be integer overflow
* issues during delay calculation.
**/
private final int RETRIES_TILL_JITTER_INCREASE = 30;

/**
* Exponential increase in delay will happen till it reaches maxDelayForRetry.
* Once delay has exceeded maxDelayForRetry, it will return maxDelayForRetry only
* and not increase the delay.
*/
private final int maxDelayForRetry;
private final int baseDelay;
private int retriesAttempted;

private ExponentialEqualJitterBackoffIterator(int baseDelay, int maxDelayForRetry) {
this.baseDelay = baseDelay;
this.maxDelayForRetry = maxDelayForRetry;
}

/**
* There is not any limit for this BackOff.
* This Iterator will always return back off delay.
*
* @return true
*/
@Override
public boolean hasNext() {
return true;
}

@Override
public TimeValue next() {
int retries = Math.min(retriesAttempted, RETRIES_TILL_JITTER_INCREASE);
int exponentialDelay = (int) Math.min((1L << retries) * baseDelay, maxDelayForRetry);
retriesAttempted++;
return TimeValue.timeValueMillis((exponentialDelay / 2) + Randomness.get().nextInt(exponentialDelay / 2 + 1));
}
}

/**
* Concrete Constant Back Off Policy
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
import org.opensearch.client.Client;
import org.opensearch.client.IndicesAdminClient;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.AdjustableSemaphore;
import org.opensearch.common.inject.Inject;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Setting;
Expand All @@ -54,6 +53,8 @@
import org.opensearch.index.Index;
import org.opensearch.index.mapper.Mapping;

import java.util.concurrent.Semaphore;

/**
* Called by shards in the cluster when their mapping was dynamically updated and it needs to be updated
* in the cluster state meta data (and broadcast to all members).
Expand Down Expand Up @@ -169,4 +170,35 @@ private static RuntimeException unwrapEsException(OpenSearchException esEx) {
}
return new UncategorizedExecutionException("Failed execution", root);
}

/**
* An adjustable semaphore
*
* @opensearch.internal
*/
static class AdjustableSemaphore extends Semaphore {

private final Object maxPermitsMutex = new Object();
private int maxPermits;

AdjustableSemaphore(int maxPermits, boolean fair) {
super(maxPermits, fair);
this.maxPermits = maxPermits;
}

void setMaxPermits(int permits) {
synchronized (maxPermitsMutex) {
final int diff = Math.subtractExact(permits, maxPermits);
if (diff > 0) {
// add permits
release(diff);
} else if (diff < 0) {
// remove permits
reducePermits(Math.negateExact(diff));
}

maxPermits = permits;
}
}
}
}

This file was deleted.

93 changes: 0 additions & 93 deletions server/src/main/java/org/opensearch/cluster/service/Throttler.java

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.routing.ShardRoutingState;
import org.opensearch.cluster.routing.TestShardRouting;
import org.opensearch.cluster.service.MasterTaskThrottlingException;
import org.opensearch.common.ParsingException;
import org.opensearch.common.Strings;
import org.opensearch.common.UUIDs;
Expand Down Expand Up @@ -852,7 +851,6 @@ public void testIds() {
ids.put(159, NodeHealthCheckFailureException.class);
ids.put(160, NoSeedNodeLeftException.class);
ids.put(161, ReplicationFailedException.class);
ids.put(162, MasterTaskThrottlingException.class);

Map<Class<? extends OpenSearchException>, Integer> reverse = new HashMap<>();
for (Map.Entry<Integer, Class<? extends OpenSearchException>> entry : ids.entrySet()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,27 +75,4 @@ public void testWrapBackoffPolicy() {
assertEquals(expectedRetries, retries.get());
}
}

public void testEqualJitterExponentialBackOffPolicy() {
int baseDelay = 10;
int maxDelay = 10000;
BackoffPolicy policy = BackoffPolicy.exponentialEqualJitterBackoff(baseDelay, maxDelay);
Iterator<TimeValue> iterator = policy.iterator();

// Assert equal jitter
int retriesTillMaxDelay = 10;
for (int i = 0; i < retriesTillMaxDelay; i++) {
TimeValue delay = iterator.next();
assertTrue(delay.getMillis() >= baseDelay * (1L << i) / 2);
assertTrue(delay.getMillis() <= baseDelay * (1L << i));
}

// Now policy should return max delay for next retries.
int retriesAfterMaxDelay = randomInt(10);
for (int i = 0; i < retriesAfterMaxDelay; i++) {
TimeValue delay = iterator.next();
assertTrue(delay.getMillis() >= maxDelay / 2);
assertTrue(delay.getMillis() <= maxDelay);
}
}
}
Loading

0 comments on commit f35b42f

Please sign in to comment.