Skip to content

Commit 76e462f

Browse files
committed
HADOOP-19221. S3A: Unable to recover from failure of multipart block upload attempt (#6938)
This is a major change which handles 400 error responses when uploading large files from memory heap/buffer (or staging committer) and the remote S3 store returns a 500 response from a upload of a block in a multipart upload. The SDK's own streaming code seems unable to fully replay the upload; at attempts to but then blocks and the S3 store returns a 400 response "Your socket connection to the server was not read from or written to within the timeout period. Idle connections will be closed. (Service: S3, Status Code: 400...)" There is an option to control whether or not the S3A client itself attempts to retry on a 50x error other than 503 throttling events (which are independently processed as before) Option: fs.s3a.retry.http.5xx.errors Default: true 500 errors are very rare from standard AWS S3, which has a five nines SLA. It may be more common against S3 Express which has lower guarantees. Third party stores have unknown guarantees, and the exception may indicate a bad server configuration. Consider setting fs.s3a.retry.http.5xx.errors to false when working with such stores. Signification Code changes: There is now a custom set of implementations of software.amazon.awssdk.http.ContentStreamProvidercontent in the class org.apache.hadoop.fs.s3a.impl.UploadContentProviders. These: * Restart on failures * Do not copy buffers/byte buffers into new private byte arrays, so avoid exacerbating memory problems.. There new IOStatistics for specific http error codes -these are collected even when all recovery is performed within the SDK. S3ABlockOutputStream has major changes, including handling of Thread.interrupt() on the main thread, which now triggers and briefly awaits cancellation of any ongoing uploads. If the writing thread is interrupted in close(), it is mapped to an InterruptedIOException. Applications like Hive and Spark must catch these after cancelling a worker thread. Contributed by Steve Loughran
1 parent 5635e34 commit 76e462f

File tree

53 files changed

+4250
-985
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

53 files changed

+4250
-985
lines changed

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StoreStatisticNames.java

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -384,6 +384,47 @@ public final class StoreStatisticNames {
384384
public static final String ACTION_HTTP_PATCH_REQUEST
385385
= "action_http_patch_request";
386386

387+
/**
388+
* HTTP error response: {@value}.
389+
*/
390+
public static final String HTTP_RESPONSE_400
391+
= "http_response_400";
392+
393+
/**
394+
* HTTP error response: {@value}.
395+
* Returned by some stores for throttling events.
396+
*/
397+
public static final String HTTP_RESPONSE_429
398+
= "http_response_429";
399+
400+
/**
401+
* Other 4XX HTTP response: {@value}.
402+
* (404 responses are excluded as they are rarely 'errors'
403+
* and will be reported differently if they are.
404+
*/
405+
public static final String HTTP_RESPONSE_4XX
406+
= "http_response_4XX";
407+
408+
/**
409+
* HTTP error response: {@value}.
410+
* Sign of server-side problems, possibly transient
411+
*/
412+
public static final String HTTP_RESPONSE_500
413+
= "http_response_500";
414+
415+
/**
416+
* HTTP error response: {@value}.
417+
* AWS Throttle.
418+
*/
419+
public static final String HTTP_RESPONSE_503
420+
= "http_response_503";
421+
422+
/**
423+
* Other 5XX HTTP response: {@value}.
424+
*/
425+
public static final String HTTP_RESPONSE_5XX
426+
= "http_response_5XX";
427+
387428
/**
388429
* An HTTP POST request was made: {@value}.
389430
*/
Lines changed: 199 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,199 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.hadoop.fs.store;
20+
21+
import java.io.EOFException;
22+
import java.io.IOException;
23+
import java.io.InputStream;
24+
import java.nio.ByteBuffer;
25+
26+
import org.slf4j.Logger;
27+
import org.slf4j.LoggerFactory;
28+
29+
import org.apache.hadoop.fs.FSExceptionMessages;
30+
import org.apache.hadoop.util.Preconditions;
31+
32+
/**
33+
* Provide an input stream from a byte buffer; supporting
34+
* {@link #mark(int)}.
35+
*/
36+
public final class ByteBufferInputStream extends InputStream {
37+
private static final Logger LOG =
38+
LoggerFactory.getLogger(ByteBufferInputStream.class);
39+
40+
/** Size of the buffer. */
41+
private final int size;
42+
43+
/**
44+
* Not final so that in close() it will be set to null, which
45+
* may result in faster cleanup of the buffer.
46+
*/
47+
private ByteBuffer byteBuffer;
48+
49+
public ByteBufferInputStream(int size,
50+
ByteBuffer byteBuffer) {
51+
LOG.debug("Creating ByteBufferInputStream of size {}", size);
52+
this.size = size;
53+
this.byteBuffer = byteBuffer;
54+
}
55+
56+
/**
57+
* After the stream is closed, set the local reference to the byte
58+
* buffer to null; this guarantees that future attempts to use
59+
* stream methods will fail.
60+
*/
61+
@Override
62+
public synchronized void close() {
63+
LOG.debug("ByteBufferInputStream.close()");
64+
byteBuffer = null;
65+
}
66+
67+
/**
68+
* Is the stream open?
69+
* @return true if the stream has not been closed.
70+
*/
71+
public synchronized boolean isOpen() {
72+
return byteBuffer != null;
73+
}
74+
75+
/**
76+
* Verify that the stream is open.
77+
* @throws IOException if the stream is closed
78+
*/
79+
private void verifyOpen() throws IOException {
80+
if (byteBuffer == null) {
81+
throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
82+
}
83+
}
84+
85+
/**
86+
* Check the open state.
87+
* @throws IllegalStateException if the stream is closed.
88+
*/
89+
private void checkOpenState() {
90+
Preconditions.checkState(isOpen(),
91+
FSExceptionMessages.STREAM_IS_CLOSED);
92+
}
93+
94+
public synchronized int read() throws IOException {
95+
if (available() > 0) {
96+
return byteBuffer.get() & 0xFF;
97+
} else {
98+
return -1;
99+
}
100+
}
101+
102+
@Override
103+
public synchronized long skip(long offset) throws IOException {
104+
verifyOpen();
105+
long newPos = position() + offset;
106+
if (newPos < 0) {
107+
throw new EOFException(FSExceptionMessages.NEGATIVE_SEEK);
108+
}
109+
if (newPos > size) {
110+
throw new EOFException(FSExceptionMessages.CANNOT_SEEK_PAST_EOF);
111+
}
112+
byteBuffer.position((int) newPos);
113+
return newPos;
114+
}
115+
116+
@Override
117+
public synchronized int available() {
118+
checkOpenState();
119+
return byteBuffer.remaining();
120+
}
121+
122+
/**
123+
* Get the current buffer position.
124+
* @return the buffer position
125+
*/
126+
public synchronized int position() {
127+
checkOpenState();
128+
return byteBuffer.position();
129+
}
130+
131+
/**
132+
* Check if there is data left.
133+
* @return true if there is data remaining in the buffer.
134+
*/
135+
public synchronized boolean hasRemaining() {
136+
checkOpenState();
137+
return byteBuffer.hasRemaining();
138+
}
139+
140+
@Override
141+
public synchronized void mark(int readlimit) {
142+
LOG.debug("mark at {}", position());
143+
checkOpenState();
144+
byteBuffer.mark();
145+
}
146+
147+
@Override
148+
public synchronized void reset() throws IOException {
149+
LOG.debug("reset");
150+
checkOpenState();
151+
byteBuffer.reset();
152+
}
153+
154+
@Override
155+
public boolean markSupported() {
156+
return true;
157+
}
158+
159+
/**
160+
* Read in data.
161+
* @param b destination buffer.
162+
* @param offset offset within the buffer.
163+
* @param length length of bytes to read.
164+
* @throws EOFException if the position is negative
165+
* @throws IndexOutOfBoundsException if there isn't space for the
166+
* amount of data requested.
167+
* @throws IllegalArgumentException other arguments are invalid.
168+
*/
169+
@SuppressWarnings("NullableProblems")
170+
public synchronized int read(byte[] b, int offset, int length)
171+
throws IOException {
172+
Preconditions.checkArgument(length >= 0, "length is negative");
173+
Preconditions.checkArgument(b != null, "Null buffer");
174+
if (b.length - offset < length) {
175+
throw new IndexOutOfBoundsException(
176+
FSExceptionMessages.TOO_MANY_BYTES_FOR_DEST_BUFFER
177+
+ ": request length =" + length
178+
+ ", with offset =" + offset
179+
+ "; buffer capacity =" + (b.length - offset));
180+
}
181+
verifyOpen();
182+
if (!hasRemaining()) {
183+
return -1;
184+
}
185+
186+
int toRead = Math.min(length, available());
187+
byteBuffer.get(b, offset, toRead);
188+
return toRead;
189+
}
190+
191+
@Override
192+
public String toString() {
193+
return "ByteBufferInputStream{" +
194+
"size=" + size +
195+
", byteBuffer=" + byteBuffer +
196+
((byteBuffer != null) ? ", available=" + byteBuffer.remaining() : "") +
197+
"} " + super.toString();
198+
}
199+
}

0 commit comments

Comments
 (0)