Skip to content

Retry after all S3 get failures that made progress #88015

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/changelog/88015.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 88015
summary: Retry after all S3 get failures that made progress
area: Snapshot/Restore
type: enhancement
issues:
- 87243
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.util.Supplier;
import org.elasticsearch.Version;
import org.elasticsearch.core.IOUtils;

Expand Down Expand Up @@ -44,12 +45,13 @@ class S3RetryingInputStream extends InputStream {
private final String blobKey;
private final long start;
private final long end;
private final int maxAttempts;
private final List<IOException> failures;

private S3ObjectInputStream currentStream;
private long currentStreamFirstOffset;
private long currentStreamLastOffset;
private int attempt = 1;
private int failuresAfterMeaningfulProgress = 0;
private long currentOffset;
private boolean closed;
private boolean eof;
Expand All @@ -68,7 +70,6 @@ class S3RetryingInputStream extends InputStream {
}
this.blobStore = blobStore;
this.blobKey = blobKey;
this.maxAttempts = blobStore.getMaxRetries() + 1;
this.failures = new ArrayList<>(MAX_SUPPRESSED_EXCEPTIONS);
this.start = start;
this.end = end;
Expand All @@ -85,7 +86,8 @@ private void openStream() throws IOException {
getObjectRequest.setRange(Math.addExact(start, currentOffset), end);
}
final S3Object s3Object = SocketAccess.doPrivileged(() -> clientReference.client().getObject(getObjectRequest));
this.currentStreamLastOffset = Math.addExact(Math.addExact(start, currentOffset), getStreamLength(s3Object));
this.currentStreamFirstOffset = Math.addExact(start, currentOffset);
this.currentStreamLastOffset = Math.addExact(currentStreamFirstOffset, getStreamLength(s3Object));
this.currentStream = s3Object.getObjectContent();
} catch (final AmazonClientException e) {
if (e instanceof AmazonS3Exception amazonS3Exception) {
Expand Down Expand Up @@ -160,31 +162,32 @@ private void ensureOpen() {
}

private void reopenStreamOrFail(IOException e) throws IOException {
if (attempt >= maxAttempts) {
logger.debug(
() -> format(
"failed reading [%s/%s] at offset [%s], attempt [%s] of [%s], giving up",
blobStore.bucket(),
blobKey,
start + currentOffset,
attempt,
maxAttempts
),
e
);
throw addSuppressedExceptions(e);
final int maxAttempts = blobStore.getMaxRetries() + 1;

final long meaningfulProgressSize = Math.max(1L, blobStore.bufferSizeInBytes() / 100L);
final long currentStreamProgress = Math.subtractExact(Math.addExact(start, currentOffset), currentStreamFirstOffset);
if (currentStreamProgress >= meaningfulProgressSize) {
failuresAfterMeaningfulProgress += 1;
}
logger.debug(
() -> format(
"failed reading [%s/%s] at offset [%s], attempt [%s] of [%s], retrying",
blobStore.bucket(),
blobKey,
start + currentOffset,
attempt,
maxAttempts
),
e
final Supplier<String> messageSupplier = () -> format(
"""
failed reading [%s/%s] at offset [%s]; this was attempt [%s] to read this blob which yielded [%s] bytes; in total \
[%s] of the attempts to read this blob have made meaningful progress and do not count towards the maximum number of \
retries; the maximum number of read attempts which do not make meaningful progress is [%s]""",
blobStore.bucket(),
blobKey,
start + currentOffset,
attempt,
currentStreamProgress,
failuresAfterMeaningfulProgress,
maxAttempts
);
if (attempt >= maxAttempts + failuresAfterMeaningfulProgress) {
final var finalException = addSuppressedExceptions(e);
logger.warn(messageSupplier, finalException);
throw finalException;
}
logger.debug(messageSupplier, e);
attempt += 1;
if (failures.size() < MAX_SUPPRESSED_EXCEPTIONS) {
failures.add(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
import com.amazonaws.SdkClientException;
import com.amazonaws.services.s3.internal.MD5DigestCalculatingInputStream;
import com.amazonaws.util.Base16;
import com.sun.net.httpserver.HttpExchange;
import com.sun.net.httpserver.HttpHandler;

import org.apache.http.HttpStatus;
import org.elasticsearch.cluster.metadata.RepositoryMetadata;
Expand Down Expand Up @@ -43,6 +45,7 @@
import java.net.InetSocketAddress;
import java.net.SocketTimeoutException;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Locale;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;
Expand All @@ -57,6 +60,7 @@
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.lessThan;

/**
* This class tests how a {@link S3BlobContainer} and its underlying AWS S3 client are retrying requests when reading or writing blobs.
Expand Down Expand Up @@ -439,6 +443,79 @@ public void testWriteLargeBlobStreaming() throws Exception {
assertEquals(blobSize, bytesReceived.get());
}

public void testReadRetriesAfterMeaningfulProgress() throws Exception {
final int maxRetries = between(0, 5);
final int bufferSizeBytes = scaledRandomIntBetween(
0,
randomFrom(1000, Math.toIntExact(S3Repository.BUFFER_SIZE_SETTING.get(Settings.EMPTY).getBytes()))
);
final BlobContainer blobContainer = createBlobContainer(maxRetries, null, true, new ByteSizeValue(bufferSizeBytes));
final int meaningfulProgressBytes = Math.max(1, bufferSizeBytes / 100);

final byte[] bytes = randomBlobContent();

@SuppressForbidden(reason = "use a http server")
class FlakyReadHandler implements HttpHandler {
private int failuresWithoutProgress;

@Override
public void handle(HttpExchange exchange) throws IOException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So here basically you first exhaust the "meaningless" failures, and then have a series of failures with meaningful progress that should not finally throw an exception and the blob should be successfully read (either in its entirety or partly).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, pretty much, although note that sendIncompleteContent will sometimes send a meaningful amount of data too.

Streams.readFully(exchange.getRequestBody());
if (failuresWithoutProgress >= maxRetries) {
final int rangeStart = getRangeStart(exchange);
assertThat(rangeStart, lessThan(bytes.length));
exchange.getResponseHeaders().add("Content-Type", bytesContentType());
final var remainderLength = bytes.length - rangeStart;
exchange.sendResponseHeaders(HttpStatus.SC_OK, remainderLength);
exchange.getResponseBody()
.write(
bytes,
rangeStart,
remainderLength < meaningfulProgressBytes ? remainderLength : between(meaningfulProgressBytes, remainderLength)
);
} else if (randomBoolean()) {
failuresWithoutProgress += 1;
exchange.sendResponseHeaders(
randomFrom(
HttpStatus.SC_INTERNAL_SERVER_ERROR,
HttpStatus.SC_BAD_GATEWAY,
HttpStatus.SC_SERVICE_UNAVAILABLE,
HttpStatus.SC_GATEWAY_TIMEOUT
),
-1
);
} else if (randomBoolean()) {
final var bytesSent = sendIncompleteContent(exchange, bytes);
if (bytesSent < meaningfulProgressBytes) {
failuresWithoutProgress += 1;
} else {
exchange.getResponseBody().flush();
}
} else {
failuresWithoutProgress += 1;
}
exchange.close();
}
}

httpServer.createContext(downloadStorageEndpoint(blobContainer, "read_blob_max_retries"), new FlakyReadHandler());

try (InputStream inputStream = blobContainer.readBlob("read_blob_max_retries")) {
final int readLimit;
final InputStream wrappedStream;
if (randomBoolean()) {
// read stream only partly
readLimit = randomIntBetween(0, bytes.length);
wrappedStream = Streams.limitStream(inputStream, readLimit);
} else {
readLimit = bytes.length;
wrappedStream = inputStream;
}
final byte[] bytesRead = BytesReference.toBytes(Streams.readFully(wrappedStream));
assertArrayEquals(Arrays.copyOfRange(bytes, 0, readLimit), bytesRead);
}
}

/**
* Asserts that an InputStream is fully consumed, or aborted, when it is closed
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,7 @@ protected static OptionalInt getRangeEnd(HttpExchange exchange) {
return OptionalInt.of(Math.toIntExact(rangeEnd));
}

protected void sendIncompleteContent(HttpExchange exchange, byte[] bytes) throws IOException {
protected int sendIncompleteContent(HttpExchange exchange, byte[] bytes) throws IOException {
final int rangeStart = getRangeStart(exchange);
assertThat(rangeStart, lessThan(bytes.length));
final OptionalInt rangeEnd = getRangeEnd(exchange);
Expand All @@ -391,6 +391,7 @@ protected void sendIncompleteContent(HttpExchange exchange, byte[] bytes) throws
if (randomBoolean()) {
exchange.getResponseBody().flush();
}
return bytesToSend;
}

/**
Expand Down