From 68055fac3c9a2a2cd5177944d7ff694e196bf554 Mon Sep 17 00:00:00 2001 From: Bryce Anderson Date: Wed, 8 Jan 2025 10:17:26 -0700 Subject: [PATCH] http-netty: let RetryingHttpRequesterFilter return responses on failure (#3048) Motivation: Sometimes people just want to get the last failed response when the retry loop ends. However, right now we only yield the exceptions that where created. Users can't do this smuggling themselves in a generic way via the HttpResponseException because it could lead to resource leaks. Modifications: - Let users simply return the last failed response when the retry loop exits unsuccessfully. - Fix some potential stream leaks. --- .../netty/RetryingHttpRequesterFilter.java | 175 ++++++++++++++---- .../RetryingHttpRequesterFilterTest.java | 91 ++++++++- 2 files changed, 226 insertions(+), 40 deletions(-) diff --git a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/RetryingHttpRequesterFilter.java b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/RetryingHttpRequesterFilter.java index d5431fae35..3a82dcb06f 100644 --- a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/RetryingHttpRequesterFilter.java +++ b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/RetryingHttpRequesterFilter.java @@ -46,6 +46,10 @@ import io.servicetalk.transport.api.ExecutionContext; import io.servicetalk.transport.api.ExecutionStrategyInfluencer; import io.servicetalk.transport.api.RetryableException; +import io.servicetalk.utils.internal.ThrowableUtils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; import java.time.Duration; @@ -89,17 +93,21 @@ */ public final class RetryingHttpRequesterFilter implements StreamingHttpClientFilterFactory, ExecutionStrategyInfluencer { + + private static final Logger LOGGER = LoggerFactory.getLogger(RetryingHttpRequesterFilter.class); + static final int DEFAULT_MAX_TOTAL_RETRIES = 4; private static final RetryingHttpRequesterFilter DISABLE_AUTO_RETRIES = - new RetryingHttpRequesterFilter(true, false, false, 1, null, + new RetryingHttpRequesterFilter(true, false, false, false, 1, null, (__, ___) -> NO_RETRIES, null); private static final RetryingHttpRequesterFilter DISABLE_ALL_RETRIES = - new RetryingHttpRequesterFilter(false, true, false, 0, null, + new RetryingHttpRequesterFilter(false, true, false, false, 0, null, (__, ___) -> NO_RETRIES, null); private final boolean waitForLb; private final boolean ignoreSdErrors; private final boolean mayReplayRequestPayload; + private final boolean returnOriginalResponses; private final int maxTotalRetries; @Nullable private final Function responseMapper; @@ -109,13 +117,14 @@ public final class RetryingHttpRequesterFilter RetryingHttpRequesterFilter( final boolean waitForLb, final boolean ignoreSdErrors, final boolean mayReplayRequestPayload, - final int maxTotalRetries, + final boolean returnOriginalResponses, final int maxTotalRetries, @Nullable final Function responseMapper, final BiFunction retryFor, @Nullable final RetryCallbacks onRequestRetry) { this.waitForLb = waitForLb; this.ignoreSdErrors = ignoreSdErrors; this.mayReplayRequestPayload = mayReplayRequestPayload; + this.returnOriginalResponses = returnOriginalResponses; this.maxTotalRetries = maxTotalRetries; this.responseMapper = responseMapper; this.retryFor = retryFor; @@ -194,24 +203,53 @@ public Completable apply(final int count, final Throwable t) { sdStatus == null ? onHostsAvailable : onHostsAvailable.ambWith(sdStatus), count, t); } - final BackOffPolicy backOffPolicy = retryFor.apply(requestMetaData, t); - if (backOffPolicy != NO_RETRIES) { - final int offsetCount = count - lbNotReadyCount; - Completable retryWhen = backOffPolicy.newStrategy(executor).apply(offsetCount, t); - if (t instanceof DelayedRetryException) { - final Duration constant = ((DelayedRetryException) t).delay(); - retryWhen = retryWhen.concat(executor.timer(constant)); - } + try { + BackOffPolicy backOffPolicy = retryFor.apply(requestMetaData, t); + if (backOffPolicy != NO_RETRIES) { + final int offsetCount = count - lbNotReadyCount; + Completable retryWhen = backOffPolicy.newStrategy(executor).apply(offsetCount, t); + if (t instanceof DelayedRetryException) { + final Duration constant = ((DelayedRetryException) t).delay(); + retryWhen = retryWhen.concat(executor.timer(constant)); + } - return applyRetryCallbacks(retryWhen, count, t); + return applyRetryCallbacks(retryWhen, count, t); + } + } catch (Throwable tt) { + LOGGER.error("Unexpected exception when computing and applying backoff policy for {}({}). " + + "User-defined functions should not throw.", + RetryingHttpRequesterFilter.class.getName(), t.getMessage(), tt); + Completable result = failed(ThrowableUtils.addSuppressed(tt, t)); + if (returnOriginalResponses) { + StreamingHttpResponse response = extractStreamingResponse(t); + if (response != null) { + result = drain(response).concat(result); + } + } + return result; } return failed(t); } Completable applyRetryCallbacks(final Completable completable, final int retryCount, final Throwable t) { - return retryCallbacks == null ? completable : - completable.beforeOnComplete(() -> retryCallbacks.beforeRetry(retryCount, requestMetaData, t)); + Completable result = (retryCallbacks == null ? completable : + completable.beforeOnComplete(() -> retryCallbacks.beforeRetry(retryCount, requestMetaData, t))); + if (returnOriginalResponses) { + final StreamingHttpResponse response = extractStreamingResponse(t); + if (response != null) { + // If we succeed, we need to drain the response body before we continue. The retry completable + // fails we want to surface the original exception and don't worry about draining since the + // response will be returned to the user. + result = result.onErrorMap(backoffError -> ThrowableUtils.addSuppressed(t, backoffError)) + // If we get cancelled we also need to drain the message body as there is no guarantee + // we'll ever receive a completion event, error or success. This is okay to do since + // the subscriber has signaled they're no longer interested in the response. + .beforeCancel(() -> drain(response).subscribe()) + .concat(drain(response)); + } + } + return result; } } @@ -257,20 +295,39 @@ protected Single request(final StreamingHttpRequester del if (responseMapper != null) { single = single.flatMap(resp -> { - final HttpResponseException exception = responseMapper.apply(resp); - return (exception != null ? - // Drain response payload body before discarding it: - resp.payloadBody().ignoreElements().onErrorComplete() - .concat(Single.failed(exception)) : - Single.succeeded(resp)) - .shareContextOnSubscribe(); + final HttpResponseException exception; + try { + exception = responseMapper.apply(resp); + } catch (Throwable t) { + LOGGER.error("Unexpected exception when mapping response ({}) to an exception. User-defined " + + "functions should not throw.", resp.status(), t); + return drain(resp).concat(Single.failed(t)); + } + Single response; + if (exception == null) { + response = Single.succeeded(resp); + } else { + response = Single.failed(exception); + if (!returnOriginalResponses) { + response = drain(resp).concat(response); + } + } + return response.shareContextOnSubscribe(); }); } // 1. Metadata is shared across retries // 2. Publisher state is restored to original state for each retry // duplicatedRequest isn't used below because retryWhen must be applied outside the defer operator for (2). - return single.retryWhen(retryStrategy(request, executionContext(), true)); + single = single.retryWhen(retryStrategy(request, executionContext(), true)); + if (returnOriginalResponses) { + single = single.onErrorResume(HttpResponseException.class, t -> { + HttpResponseMetaData metaData = t.metaData(); + return (metaData instanceof StreamingHttpResponse ? + Single.succeeded((StreamingHttpResponse) metaData) : Single.failed(t)); + }); + } + return single; } } @@ -719,6 +776,7 @@ public static final class Builder { private int maxTotalRetries = DEFAULT_MAX_TOTAL_RETRIES; private boolean retryExpectationFailed; + private boolean returnOriginalResponses; private BiFunction retryRetryableExceptions = (requestMetaData, e) -> BackOffPolicy.ofImmediateBounded(); @@ -796,8 +854,13 @@ public Builder maxTotalRetries(final int maxRetries) { * retry behaviour through {@link #retryResponses(BiFunction)}. * * @param mapper a {@link Function} that maps a {@link HttpResponseMetaData} to an - * {@link HttpResponseException} or returns {@code null} if there is no mapping for response meta-data. The - * mapper should return {@code null} if no retry is needed or if it cannot be determined that a retry is needed. + * {@link HttpResponseException} or returns {@code null} if there is no mapping for response meta-data. + * In the case that the request cannot be retried, the {@link HttpResponseException} will be returned via the + * error pathway. + *

+ * It's important that this {@link Function} doesn't block to avoid performance impacts. + * It's important that this {@link Function} doesn't throw exceptions. + * * @return {@code this} */ public Builder responseMapper(final Function mapper) { @@ -810,7 +873,8 @@ public Builder responseMapper(final Function * To disable retries you can return {@link BackOffPolicy#ofNoRetries()} from the {@code mapper}. *

- * It's important that this {@link Function} doesn't block to avoid performance impacts. + * It's important that this {@link BiFunction} doesn't block to avoid performance impacts. + * It's important that this {@link BiFunction} doesn't throw exceptions. * * @param mapper The mapper to map the {@link HttpRequestMetaData} and the * {@link RetryableException} to a {@link BackOffPolicy}. @@ -833,7 +897,8 @@ public Builder retryRetryableExceptions( *

* To disable retries you can return {@link BackOffPolicy#ofNoRetries()} from the {@code mapper}. *

- * It's important that this {@link Function} doesn't block to avoid performance impacts. + * It's important that this {@link BiFunction} doesn't block to avoid performance impacts. + * It's important that this {@link BiFunction} doesn't throw exceptions. * * @param mapper The mapper to map the {@link HttpRequestMetaData} and the * {@link IOException} to a {@link BackOffPolicy}. @@ -871,7 +936,8 @@ public Builder retryExpectationFailed(boolean retryExpectationFailed) { * To disable retries and proceed evaluating other retry functions you can return, * {@link BackOffPolicy#ofNoRetries()} from the passed {@code mapper}. *

- * It's important that this {@link Function} doesn't block to avoid performance impacts. + * It's important that this {@link BiFunction} doesn't block to avoid performance impacts. + * It's important that this {@link BiFunction} doesn't throw exceptions. * * @param mapper The mapper to map the {@link HttpRequestMetaData} and the * {@link DelayedRetryException delayed-exception} to a {@link BackOffPolicy}. @@ -893,7 +959,8 @@ public Builder retryDelayedRetryExceptions( *

* To disable retries you can return {@link BackOffPolicy#NO_RETRIES} from the {@code mapper}. *

- * It's important that this {@link Function} doesn't block to avoid performance impacts. + * It's important that this {@link BiFunction} doesn't block to avoid performance impacts. + * It's important that this {@link BiFunction} doesn't throw exceptions. * * @param mapper The mapper to map the {@link HttpRequestMetaData} and the * {@link DelayedRetry delayed-exception} to a {@link BackOffPolicy}. @@ -914,7 +981,8 @@ public Builder retryDelayedRetries(// FIXME: 0.43 - remove deprecated method *

* To disable retries you can return {@link BackOffPolicy#NO_RETRIES} from the {@code mapper}. *

- * It's important that this {@link Function} doesn't block to avoid performance impacts. + * It's important that this {@link BiFunction} doesn't block to avoid performance impacts. + * It's important that this {@link BiFunction} doesn't throw exceptions. * * @param mapper The mapper to map the {@link HttpRequestMetaData} and the * {@link DelayedRetry delayed-exception} to a {@link BackOffPolicy}. @@ -922,7 +990,30 @@ public Builder retryDelayedRetries(// FIXME: 0.43 - remove deprecated method */ public Builder retryResponses( final BiFunction mapper) { + return retryResponses(mapper, false); + } + + /** + * The retrying-filter will evaluate {@link HttpResponseException} that resulted from the + * {@link #responseMapper(Function)}, and support different retry behaviour according to the + * {@link HttpRequestMetaData request} and the {@link HttpResponseMetaData response}. + *

+ * To disable retries you can return {@link BackOffPolicy#NO_RETRIES} from the {@code mapper}. + *

+ * It's important that this {@link BiFunction} doesn't block to avoid performance impacts. + * It's important that this {@link BiFunction} doesn't throw exceptions. + * + * @param mapper The mapper to map the {@link HttpRequestMetaData} and the + * {@link DelayedRetry delayed-exception} to a {@link BackOffPolicy}. + * @param returnOriginalResponses whether to unwrap the response defined by the {@link HttpResponseException} + * meta-data in the case that the request is not retried. + * @return {@code this}. + */ + public Builder retryResponses( + final BiFunction mapper, + final boolean returnOriginalResponses) { this.retryResponses = requireNonNull(mapper); + this.returnOriginalResponses = returnOriginalResponses; return this; } @@ -932,7 +1023,8 @@ public Builder retryResponses( *

* To disable retries you can return {@link BackOffPolicy#NO_RETRIES} from the {@code mapper}. *

- * It's important that this {@link Function} doesn't block to avoid performance impacts. + * It's important that this {@link BiFunction} doesn't block to avoid performance impacts. + * It's important that this {@link BiFunction} doesn't throw exceptions. * * @param mapper {@link BiFunction} that checks whether a given combination of * {@link HttpRequestMetaData meta-data} and {@link Throwable cause} should be retried, producing a @@ -1041,7 +1133,7 @@ public RetryingHttpRequesterFilter build() { if (retryResponses != null && throwable instanceof HttpResponseException) { final BackOffPolicy backOffPolicy = - retryResponses.apply(requestMetaData, (HttpResponseException) throwable); + retryResponses.apply(requestMetaData, (HttpResponseException) throwable); if (backOffPolicy != NO_RETRIES) { return backOffPolicy; } @@ -1054,7 +1146,26 @@ public RetryingHttpRequesterFilter build() { return NO_RETRIES; }; return new RetryingHttpRequesterFilter(waitForLb, ignoreSdErrors, mayReplayRequestPayload, - maxTotalRetries, responseMapper, allPredicate, onRequestRetry); + returnOriginalResponses, maxTotalRetries, responseMapper, allPredicate, onRequestRetry); + } + } + + private static Completable drain(StreamingHttpResponse response) { + return response.payloadBody().ignoreElements().onErrorComplete(); + } + + @Nullable + private static StreamingHttpResponse extractStreamingResponse(Throwable t) { + if (t instanceof HttpResponseException) { + HttpResponseException responseException = (HttpResponseException) t; + if (responseException.metaData() instanceof StreamingHttpResponse) { + return (StreamingHttpResponse) responseException.metaData(); + } else { + LOGGER.warn("Couldn't unpack response due to unexpected dynamic types. Required " + + "meta-data of type StreamingHttpResponse, found {}", + responseException.metaData().getClass()); + } } + return null; } } diff --git a/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/RetryingHttpRequesterFilterTest.java b/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/RetryingHttpRequesterFilterTest.java index d7c5136bd3..4e270d0391 100644 --- a/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/RetryingHttpRequesterFilterTest.java +++ b/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/RetryingHttpRequesterFilterTest.java @@ -62,12 +62,16 @@ import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; import org.junit.jupiter.params.provider.ValueSource; import java.net.InetSocketAddress; +import java.nio.charset.StandardCharsets; import java.util.Collection; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Predicate; +import java.util.stream.Stream; import javax.annotation.Nullable; import static io.servicetalk.concurrent.api.Single.defer; @@ -104,6 +108,7 @@ class RetryingHttpRequesterFilterTest { private static final String RETRYABLE_HEADER = "RETRYABLE"; + private static final String RESPONSE_BODY = "ok"; private final ServerContext svcCtx; private final SingleAddressHttpClientBuilder normalClientBuilder; @@ -119,7 +124,8 @@ class RetryingHttpRequesterFilterTest { RetryingHttpRequesterFilterTest() throws Exception { svcCtx = forAddress(localAddress(0)) .listenBlockingAndAwait((ctx, request, responseFactory) -> responseFactory.ok() - .addHeader(RETRYABLE_HEADER, "yes")); + .addHeader(RETRYABLE_HEADER, "yes") + .payloadBody(ctx.executionContext().bufferAllocator().fromAscii(RESPONSE_BODY))); failingConnClientBuilder = forSingleAddress(serverHostAndPort(svcCtx)) .loadBalancerFactory(new DefaultHttpLoadBalancerFactory<>(new InspectingLoadBalancerFactory<>())) .appendConnectionFactoryFilter(ClosingConnectionFactory::new); @@ -251,21 +257,24 @@ private void assertRequestRetryingPred(final BlockingHttpClient client) { assertThat("Unexpected calls to select.", (double) lbSelectInvoked.get(), closeTo(5.0, 1.0)); } - @Test - void testResponseMapper() { + @ParameterizedTest(name = "{displayName} [{index}]: returnOriginalResponses={0}") + @ValueSource(booleans = {true, false}) + void testResponseMapper(final boolean returnOriginalResponses) throws Exception { AtomicInteger newConnectionCreated = new AtomicInteger(); AtomicInteger responseDrained = new AtomicInteger(); AtomicInteger onRequestRetryCounter = new AtomicInteger(); final int maxTotalRetries = 4; + final String retryMessage = "Retryable header"; normalClient = normalClientBuilder .appendClientFilter(new Builder() .maxTotalRetries(maxTotalRetries) .responseMapper(metaData -> metaData.headers().contains(RETRYABLE_HEADER) ? - new HttpResponseException("Retryable header", metaData) : null) + new HttpResponseException(retryMessage, metaData) : null) // Disable request retrying .retryRetryableExceptions((requestMetaData, e) -> ofNoRetries()) // Retry only responses marked so - .retryResponses((requestMetaData, throwable) -> ofImmediate(maxTotalRetries - 1)) + .retryResponses((requestMetaData, throwable) -> ofImmediate(maxTotalRetries - 1), + returnOriginalResponses) .onRequestRetry((count, req, t) -> assertThat(onRequestRetryCounter.incrementAndGet(), is(count))) .build()) @@ -281,9 +290,15 @@ public Single request(final StreamingHttpRequest request) }; }) .buildBlocking(); - HttpResponseException e = assertThrows(HttpResponseException.class, - () -> normalClient.request(normalClient.get("/"))); - assertThat("Unexpected exception.", e, instanceOf(HttpResponseException.class)); + if (returnOriginalResponses) { + HttpResponse response = normalClient.request(normalClient.get("/")); + assertThat(response.status(), is(HttpResponseStatus.OK)); + assertThat(response.payloadBody().toString(StandardCharsets.US_ASCII), equalTo(RESPONSE_BODY)); + } else { + HttpResponseException e = assertThrows(HttpResponseException.class, + () -> normalClient.request(normalClient.get("/"))); + assertThat("Unexpected exception.", e, instanceOf(HttpResponseException.class)); + } // The load balancer is allowed to be not ready one time, which is counted against total retry attempts but not // against actual requests being issued. assertThat("Unexpected calls to select.", lbSelectInvoked.get(), allOf(greaterThanOrEqualTo(maxTotalRetries), @@ -295,6 +310,66 @@ public Single request(final StreamingHttpRequest request) assertThat("Unexpected number of connections was created", newConnectionCreated.get(), is(1)); } + private enum ExceptionSource { + RESPONSE_MAPPER, + RETRY_RESPONSES + } + + private static Stream lambdaExceptions() { + return Stream.of(true, false).flatMap(returnOriginalResponses -> + Stream.of(ExceptionSource.values()) + .map(lambda -> Arguments.of(returnOriginalResponses, lambda))); + } + + @ParameterizedTest(name = "{displayName} [{index}]: returnOriginalResponses={0}, thrower={1}") + @MethodSource("lambdaExceptions") + void lambdaExceptions(final boolean returnOriginalResponses, final ExceptionSource thrower) { + final AtomicInteger newConnectionCreated = new AtomicInteger(); + final AtomicInteger requestsInitiated = new AtomicInteger(); + final AtomicInteger responseDrained = new AtomicInteger(); + final AtomicInteger onRequestRetryCounter = new AtomicInteger(); + final String retryMessage = "Retryable header"; + normalClient = normalClientBuilder + .appendClientFilter(new Builder() + .maxTotalRetries(4) + .responseMapper(metaData -> { + if (thrower == ExceptionSource.RESPONSE_MAPPER) { + throw new RuntimeException("responseMapper"); + } + return metaData.headers().contains(RETRYABLE_HEADER) ? + new HttpResponseException(retryMessage, metaData) : null; + }) + // Retry only responses marked so + .retryResponses((requestMetaData, throwable) -> { + if (thrower == ExceptionSource.RETRY_RESPONSES) { + throw new RuntimeException("retryResponses"); + } + return ofImmediate(3); + }, returnOriginalResponses) + .onRequestRetry((count, req, t) -> + assertThat(onRequestRetryCounter.incrementAndGet(), is(count))) + .build()) + .appendConnectionFilter(c -> { + newConnectionCreated.incrementAndGet(); + return new StreamingHttpConnectionFilter(c) { + @Override + public Single request(final StreamingHttpRequest request) { + return Single.defer(() -> { + requestsInitiated.incrementAndGet(); + return delegate().request(request) + .map(response -> response.transformPayloadBody(payload -> payload + .whenFinally(responseDrained::incrementAndGet))); + }); + } + }; + }) + .buildBlocking(); + assertThrows(Exception.class, () -> normalClient.request(normalClient.get("/"))); + assertThat("Response payload body was not drained on every mapping", responseDrained.get(), is(1)); + assertThat("Multiple requests initiated", requestsInitiated.get(), is(1)); + assertThat("Unexpected number of connections was created", newConnectionCreated.get(), is(1)); + } + @Test void singleInstanceFilter() { Assertions.assertThrows(IllegalStateException.class, () -> forResolvedAddress(localAddress(8888))