Skip to content

Commit

Permalink
RetryingHttpRequesterFilter: add on request retry callback (#2916)
Browse files Browse the repository at this point in the history
Motivation:

There are use-cases when users need to perform an action before the
retry, like updating meta-data or logging/metrics. Because all retry
functions return `BackoffPolicy`, it's still not known inside the
function of `BackoffPolicy` will retry or not because of the retry
counts limit.

Modifications:
- Add `RetryingHttpRequesterFilter.Builder.onRequestRetry(RetryCallbacks)`
that users can use to intercept every retry attempt;
- Test that the new callback works for request retries;

Result:

Users can see when the retry actually happens, after backoff time.

If there will be a similar use-case for `reserveConnection` in the
future, we will add a separate callback.
  • Loading branch information
idelpivnitskiy authored May 14, 2024
1 parent a65238c commit 7919d3a
Show file tree
Hide file tree
Showing 3 changed files with 119 additions and 35 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright © 2021 Apple Inc. and the ServiceTalk project authors
* Copyright © 2021-2024 Apple Inc. and the ServiceTalk project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -36,6 +36,7 @@
import io.servicetalk.http.api.HttpRequestMetaData;
import io.servicetalk.http.api.HttpResponseMetaData;
import io.servicetalk.http.api.HttpResponseStatus;
import io.servicetalk.http.api.StreamingHttpClient;
import io.servicetalk.http.api.StreamingHttpClientFilter;
import io.servicetalk.http.api.StreamingHttpClientFilterFactory;
import io.servicetalk.http.api.StreamingHttpRequest;
Expand Down Expand Up @@ -90,10 +91,10 @@ public final class RetryingHttpRequesterFilter
static final int DEFAULT_MAX_TOTAL_RETRIES = 4;
private static final RetryingHttpRequesterFilter DISABLE_AUTO_RETRIES =
new RetryingHttpRequesterFilter(true, false, false, 1, null,
(__, ___) -> NO_RETRIES);
(__, ___) -> NO_RETRIES, null);
private static final RetryingHttpRequesterFilter DISABLE_ALL_RETRIES =
new RetryingHttpRequesterFilter(false, true, false, 0, null,
(__, ___) -> NO_RETRIES);
(__, ___) -> NO_RETRIES, null);

private final boolean waitForLb;
private final boolean ignoreSdErrors;
Expand All @@ -102,18 +103,22 @@ public final class RetryingHttpRequesterFilter
@Nullable
private final Function<HttpResponseMetaData, HttpResponseException> responseMapper;
private final BiFunction<HttpRequestMetaData, Throwable, BackOffPolicy> retryFor;
@Nullable
private final RetryCallbacks onRequestRetry;

RetryingHttpRequesterFilter(
final boolean waitForLb, final boolean ignoreSdErrors, final boolean mayReplayRequestPayload,
final int maxTotalRetries,
@Nullable final Function<HttpResponseMetaData, HttpResponseException> responseMapper,
final BiFunction<HttpRequestMetaData, Throwable, BackOffPolicy> retryFor) {
final BiFunction<HttpRequestMetaData, Throwable, BackOffPolicy> retryFor,
@Nullable final RetryCallbacks onRequestRetry) {
this.waitForLb = waitForLb;
this.ignoreSdErrors = ignoreSdErrors;
this.mayReplayRequestPayload = mayReplayRequestPayload;
this.maxTotalRetries = maxTotalRetries;
this.responseMapper = responseMapper;
this.retryFor = retryFor;
this.onRequestRetry = onRequestRetry;
}

@Override
Expand Down Expand Up @@ -151,15 +156,20 @@ void inject(@Nullable final Publisher<Object> lbEventStream,
private final class OuterRetryStrategy implements BiIntFunction<Throwable, Completable> {
private final Executor executor;
private final HttpRequestMetaData requestMetaData;
@Nullable
private final RetryCallbacks retryCallbacks;
/**
* The outer retry strategy handles both "load balancer not ready" and "request failed" cases. This count
* discounts the former so the ladder strategies only count actual request attempts.
*/
private int lbNotReadyCount;

private OuterRetryStrategy(final Executor executor, final HttpRequestMetaData requestMetaData) {
private OuterRetryStrategy(final Executor executor,
final HttpRequestMetaData requestMetaData,
@Nullable final RetryCallbacks retryCallbacks) {
this.executor = executor;
this.requestMetaData = requestMetaData;
this.retryCallbacks = retryCallbacks;
}

@Override
Expand All @@ -179,40 +189,48 @@ public Completable apply(final int count, final Throwable t) {
!(lbEvent instanceof LoadBalancerReadyEvent &&
((LoadBalancerReadyEvent) lbEvent).isReady()))
.ignoreElements();
return sdStatus == null ? onHostsAvailable : onHostsAvailable.ambWith(sdStatus);
return applyRetryCallbacks(
sdStatus == null ? onHostsAvailable : onHostsAvailable.ambWith(sdStatus), count, t);
}

final BackOffPolicy backOffPolicy = retryFor.apply(requestMetaData, t);
if (backOffPolicy != NO_RETRIES) {
final int offsetCount = count - lbNotReadyCount;
Completable retryWhen = backOffPolicy.newStrategy(executor).apply(offsetCount, t);
if (t instanceof DelayedRetry) {
final Duration constant = ((DelayedRetry) t).delay();
return backOffPolicy.newStrategy(executor).apply(offsetCount, t)
.concat(executor.timer(constant));
retryWhen = retryWhen.concat(executor.timer(constant));
}

return backOffPolicy.newStrategy(executor).apply(offsetCount, t);
return applyRetryCallbacks(retryWhen, count, t);
}

return failed(t);
}

Completable applyRetryCallbacks(final Completable completable, final int retryCount, final Throwable t) {
return retryCallbacks == null ? completable :
completable.beforeOnComplete(() -> retryCallbacks.beforeRetry(retryCount, requestMetaData, t));
}
}

// Visible for testing
BiIntFunction<Throwable, Completable> retryStrategy(final HttpRequestMetaData requestMetaData,
final ExecutionContext<HttpExecutionStrategy> context) {
final ExecutionContext<HttpExecutionStrategy> context,
final boolean forRequest) {
final HttpExecutionStrategy strategy = requestMetaData.context()
.getOrDefault(HTTP_EXECUTION_STRATEGY_KEY, context.executionStrategy());
assert strategy != null;
return new OuterRetryStrategy(strategy.isRequestResponseOffloaded() ?
context.executor() : context.ioExecutor(), requestMetaData);
context.executor() : context.ioExecutor(), requestMetaData,
forRequest ? onRequestRetry : null);
}

@Override
public Single<? extends FilterableReservedStreamingHttpConnection> reserveConnection(
final HttpRequestMetaData metaData) {
return delegate().reserveConnection(metaData)
.retryWhen(retryStrategy(metaData, executionContext()));
.retryWhen(retryStrategy(metaData, executionContext(), false));
}

@Override
Expand Down Expand Up @@ -251,7 +269,7 @@ protected Single<StreamingHttpResponse> request(final StreamingHttpRequester del
// 1. Metadata is shared across retries
// 2. Publisher state is restored to original state for each retry
// duplicatedRequest isn't used below because retryWhen must be applied outside the defer operator for (2).
return single.retryWhen(retryStrategy(request, executionContext()));
return single.retryWhen(retryStrategy(request, executionContext(), true));
}
}

Expand Down Expand Up @@ -402,6 +420,18 @@ public static final class BackOffPolicy {
this.maxRetries = ensureNonNegative(maxRetries, "maxRetries");
}

@Override
public String toString() {
return getClass().getSimpleName() +
"{maxRetries=" + maxRetries +
", initialDelay=" + initialDelay +
", jitter=" + jitter +
", maxDelay=" + maxDelay +
", exponential=" + exponential +
", timerExecutor=" + timerExecutor +
'}';
}

/**
* Creates a new {@link BackOffPolicy} that retries failures instantly up-to 3 max retries.
*
Expand Down Expand Up @@ -642,6 +672,22 @@ public interface DelayedRetry {
Duration delay();
}

/**
* Callbacks invoked on a retry attempt.
*/
@FunctionalInterface
public interface RetryCallbacks {

/**
* Called after a retry decision has been made, but before the retry is performed.
*
* @param retryCount a current retry counter value for this attempt
* @param requestMetaData {@link HttpRequestMetaData} that is being retried
* @param cause {@link Throwable} cause for the retry
*/
void beforeRetry(int retryCount, HttpRequestMetaData requestMetaData, Throwable cause);
}

/**
* A builder for {@link RetryingHttpRequesterFilter}, which puts an upper bound on retry attempts.
* To configure the maximum number of retry attempts see {@link #maxTotalRetries(int)}.
Expand All @@ -665,20 +711,19 @@ public static final class Builder {
private Function<HttpResponseMetaData, HttpResponseException> responseMapper;

@Nullable
private BiFunction<HttpRequestMetaData, IOException, BackOffPolicy>
retryIdempotentRequests;
private BiFunction<HttpRequestMetaData, IOException, BackOffPolicy> retryIdempotentRequests;

@Nullable
private BiFunction<HttpRequestMetaData, DelayedRetry, BackOffPolicy> retryDelayedRetries;

@Nullable
private BiFunction<HttpRequestMetaData, DelayedRetry, BackOffPolicy>
retryDelayedRetries;
private BiFunction<HttpRequestMetaData, HttpResponseException, BackOffPolicy> retryResponses;

@Nullable
private BiFunction<HttpRequestMetaData, HttpResponseException, BackOffPolicy>
retryResponses;
private BiFunction<HttpRequestMetaData, Throwable, BackOffPolicy> retryOther;

@Nullable
private BiFunction<HttpRequestMetaData, Throwable, BackOffPolicy>
retryOther;
private RetryCallbacks onRequestRetry;

/**
* By default, automatic retries wait for the associated {@link LoadBalancer} to be
Expand Down Expand Up @@ -842,6 +887,22 @@ public Builder retryOther(
return this;
}

/**
* Callback invoked on every {@link StreamingHttpClient#request(StreamingHttpRequest) request} retry attempt.
* <p>
* This can be used to track when {@link BackOffPolicy} actually decides to retry a request, to update
* {@link HttpRequestMetaData request meta-data} before a retry, or implement logging/metrics. However, it
* can not be used to influence the retry decision, use other "retry*" functions for that purpose.
*
* @param onRequestRetry {@link RetryCallbacks} to get notified on every
* {@link StreamingHttpClient#request(StreamingHttpRequest) request} retry attempt
* @return {@code this}
*/
public Builder onRequestRetry(final RetryCallbacks onRequestRetry) {
this.onRequestRetry = requireNonNull(onRequestRetry);
return this;
}

/**
* Builds a retrying {@link RetryingHttpRequesterFilter} with this' builders configuration.
*
Expand Down Expand Up @@ -921,7 +982,7 @@ public RetryingHttpRequesterFilter build() {
return NO_RETRIES;
};
return new RetryingHttpRequesterFilter(waitForLb, ignoreSdErrors, mayReplayRequestPayload,
maxTotalRetries, responseMapper, allPredicate);
maxTotalRetries, responseMapper, allPredicate, onRequestRetry);
}
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright © 2021 Apple Inc. and the ServiceTalk project authors
* Copyright © 2019-2024 Apple Inc. and the ServiceTalk project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -38,6 +38,7 @@
import org.mockito.stubbing.Answer;

import java.net.UnknownHostException;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.Nonnull;

import static io.servicetalk.buffer.netty.BufferAllocators.DEFAULT_ALLOCATOR;
Expand Down Expand Up @@ -101,13 +102,16 @@ void disableWaitForLb(boolean offloading) {
@ParameterizedTest
@ValueSource(booleans = {false, true})
void disableRetryAllRetryableExWithRetryable(boolean offloading) {
final ContextAwareRetryingHttpClientFilter filter =
newFilter(new RetryingHttpRequesterFilter.Builder()
.retryRetryableExceptions((__, ___) -> ofNoRetries()), offloading);
AtomicInteger onRequestRetryCounter = new AtomicInteger();
final ContextAwareRetryingHttpClientFilter filter = newFilter(new RetryingHttpRequesterFilter.Builder()
.retryRetryableExceptions((__, ___) -> ofNoRetries())
.onRequestRetry((count, req, t) -> assertThat(onRequestRetryCounter.incrementAndGet(), is(count))),
offloading);

Completable retry = applyRetry(filter, 1, RETRYABLE_EXCEPTION);
toSource(retry).subscribe(retrySubscriber);
verifyRetryResultError(RETRYABLE_EXCEPTION);
assertThat("Unexpected calls to onRequestRetry.", onRequestRetryCounter.get(), is(0));
}

@ParameterizedTest
Expand Down Expand Up @@ -170,13 +174,16 @@ void defaultForRetryableEx(boolean offloading) {
@ParameterizedTest
@ValueSource(booleans = {false, true})
void defaultForNoAvailableHost(boolean offloading) {
final ContextAwareRetryingHttpClientFilter filter =
newFilter(new RetryingHttpRequesterFilter.Builder(), offloading);
AtomicInteger onRequestRetryCounter = new AtomicInteger();
final ContextAwareRetryingHttpClientFilter filter = newFilter(new RetryingHttpRequesterFilter.Builder()
.onRequestRetry((count, req, t) -> assertThat(onRequestRetryCounter.incrementAndGet(), is(count))),
offloading);
Completable retry = applyRetry(filter, 1, NO_AVAILABLE_HOST);
toSource(retry).subscribe(retrySubscriber);
assertThat(retrySubscriber.pollTerminal(10, MILLISECONDS), is(nullValue()));
lbEvents.onNext(LOAD_BALANCER_READY_EVENT);
verifyRetryResultCompleted();
assertThat("Unexpected calls to onRequestRetry.", onRequestRetryCounter.get(), is(1));
}

@ParameterizedTest
Expand Down Expand Up @@ -255,7 +262,7 @@ void noActiveHostException(boolean offloading) {
.maxTotalRetries(Integer.MAX_VALUE), offloading);
lbEvents.onNext(LOAD_BALANCER_READY_EVENT); // LB is ready before subscribing to the response
BiIntFunction<Throwable, Completable> retryStrategy =
filter.retryStrategy(REQUEST_META_DATA, filter.executionContext());
filter.retryStrategy(REQUEST_META_DATA, filter.executionContext(), true);
for (int i = 1; i <= DEFAULT_MAX_TOTAL_RETRIES * 2; i++) {
Completable retry = retryStrategy.apply(i, NO_ACTIVE_HOST);
TestCompletableSubscriber subscriber = new TestCompletableSubscriber();
Expand Down Expand Up @@ -316,6 +323,6 @@ private ContextAwareRetryingHttpClientFilter newFilter(final RetryingHttpRequest
@Nonnull
private Completable applyRetry(final ContextAwareRetryingHttpClientFilter filter,
final int count, final Throwable t) {
return filter.retryStrategy(REQUEST_META_DATA, filter.executionContext()).apply(count, t);
return filter.retryStrategy(REQUEST_META_DATA, filter.executionContext(), true).apply(count, t);
}
}
Loading

0 comments on commit 7919d3a

Please sign in to comment.