Skip to content

Commit d525633

Browse files
authored
HDDS-1395. Key write fails with BlockOutputStream has been closed exception (#749)
HDDS-1395. Key write fails with BlockOutputStream has been closed exception (#749).
1 parent 8ecbf61 commit d525633

File tree

17 files changed

+1193
-576
lines changed

17 files changed

+1193
-576
lines changed

hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import org.apache.hadoop.util.Time;
3030
import org.apache.ratis.grpc.GrpcTlsConfig;
3131
import org.apache.ratis.proto.RaftProtos;
32+
import org.apache.ratis.protocol.GroupMismatchException;
3233
import org.apache.ratis.protocol.RaftRetryFailureException;
3334
import org.apache.ratis.retry.RetryPolicy;
3435
import org.apache.ratis.thirdparty.com.google.protobuf
@@ -69,7 +70,8 @@
6970
* The underlying RPC mechanism can be chosen via the constructor.
7071
*/
7172
public final class XceiverClientRatis extends XceiverClientSpi {
72-
static final Logger LOG = LoggerFactory.getLogger(XceiverClientRatis.class);
73+
public static final Logger LOG =
74+
LoggerFactory.getLogger(XceiverClientRatis.class);
7375

7476
public static XceiverClientRatis newXceiverClientRatis(
7577
org.apache.hadoop.hdds.scm.pipeline.Pipeline pipeline,
@@ -248,13 +250,17 @@ public XceiverClientReply watchForCommit(long index, long timeout)
248250
return clientReply;
249251
}
250252
LOG.debug("commit index : {} watch timeout : {}", index, timeout);
251-
CompletableFuture<RaftClientReply> replyFuture = getClient()
252-
.sendWatchAsync(index, RaftProtos.ReplicationLevel.ALL_COMMITTED);
253253
RaftClientReply reply;
254254
try {
255+
CompletableFuture<RaftClientReply> replyFuture = getClient()
256+
.sendWatchAsync(index, RaftProtos.ReplicationLevel.ALL_COMMITTED);
255257
replyFuture.get(timeout, TimeUnit.MILLISECONDS);
256-
} catch (TimeoutException toe) {
257-
LOG.warn("3 way commit failed ", toe);
258+
} catch (Exception e) {
259+
Throwable t = HddsClientUtils.checkForException(e);
260+
LOG.warn("3 way commit failed ", e);
261+
if (t instanceof GroupMismatchException) {
262+
throw e;
263+
}
258264
reply = getClient()
259265
.sendWatchAsync(index, RaftProtos.ReplicationLevel.MAJORITY_COMMITTED)
260266
.get(timeout, TimeUnit.MILLISECONDS);

hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/client/HddsClientUtils.java

Lines changed: 69 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,11 @@
2828
import org.apache.hadoop.hdds.protocolPB.SCMSecurityProtocolClientSideTranslatorPB;
2929
import org.apache.hadoop.hdds.protocolPB.SCMSecurityProtocolPB;
3030
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
31+
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerNotOpenException;
3132
import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol;
3233
import org.apache.hadoop.hdds.scm.protocolPB.ScmBlockLocationProtocolPB;
34+
import org.apache.hadoop.io.retry.RetryPolicies;
35+
import org.apache.hadoop.io.retry.RetryPolicy;
3336
import org.apache.hadoop.ipc.Client;
3437
import org.apache.hadoop.ipc.ProtobufRpcEngine;
3538
import org.apache.hadoop.ipc.RPC;
@@ -40,6 +43,10 @@
4043
import org.apache.http.client.config.RequestConfig;
4144
import org.apache.http.impl.client.CloseableHttpClient;
4245
import org.apache.http.impl.client.HttpClients;
46+
import org.apache.ratis.protocol.AlreadyClosedException;
47+
import org.apache.ratis.protocol.GroupMismatchException;
48+
import org.apache.ratis.protocol.NotReplicatedException;
49+
import org.apache.ratis.protocol.RaftRetryFailureException;
4350
import org.slf4j.Logger;
4451
import org.slf4j.LoggerFactory;
4552

@@ -50,8 +57,12 @@
5057
import java.time.ZoneId;
5158
import java.time.ZonedDateTime;
5259
import java.time.format.DateTimeFormatter;
60+
import java.util.ArrayList;
61+
import java.util.HashMap;
62+
import java.util.List;
63+
import java.util.Map;
5364
import java.util.concurrent.TimeUnit;
54-
65+
import java.util.concurrent.TimeoutException;
5566

5667
/**
5768
* Utility methods for Ozone and Container Clients.
@@ -72,6 +83,18 @@ public final class HddsClientUtils {
7283
private HddsClientUtils() {
7384
}
7485

86+
private static final List<Class<? extends Exception>> EXCEPTION_LIST =
87+
new ArrayList<Class<? extends Exception>>() {{
88+
add(TimeoutException.class);
89+
add(ContainerNotOpenException.class);
90+
add(RaftRetryFailureException.class);
91+
add(AlreadyClosedException.class);
92+
add(GroupMismatchException.class);
93+
// Not Replicated Exception will be thrown if watch For commit
94+
// does not succeed
95+
add(NotReplicatedException.class);
96+
}};
97+
7598
/**
7699
* Date format that used in ozone. Here the format is thread safe to use.
77100
*/
@@ -290,4 +313,49 @@ public static SCMSecurityProtocol getScmSecurityClient(
290313
Client.getRpcTimeout(conf)));
291314
return scmSecurityClient;
292315
}
316+
317+
public static Throwable checkForException(Exception e) throws IOException {
318+
Throwable t = e;
319+
while (t != null) {
320+
for (Class<? extends Exception> cls : getExceptionList()) {
321+
if (cls.isInstance(t)) {
322+
return t;
323+
}
324+
}
325+
t = t.getCause();
326+
}
327+
328+
throw e instanceof IOException ? (IOException)e : new IOException(e);
329+
}
330+
331+
public static RetryPolicy createRetryPolicy(int maxRetryCount,
332+
long retryInterval) {
333+
// retry with fixed sleep between retries
334+
return RetryPolicies.retryUpToMaximumCountWithFixedSleep(
335+
maxRetryCount, retryInterval, TimeUnit.MILLISECONDS);
336+
}
337+
338+
public static Map<Class<? extends Throwable>,
339+
RetryPolicy> getRetryPolicyByException(int maxRetryCount,
340+
long retryInterval) {
341+
Map<Class<? extends Throwable>, RetryPolicy> policyMap = new HashMap<>();
342+
for (Class<? extends Exception> ex : EXCEPTION_LIST) {
343+
if (ex == TimeoutException.class
344+
|| ex == RaftRetryFailureException.class) {
345+
// retry without sleep
346+
policyMap.put(ex, createRetryPolicy(maxRetryCount, 0));
347+
} else {
348+
// retry with fixed sleep between retries
349+
policyMap.put(ex, createRetryPolicy(maxRetryCount, retryInterval));
350+
}
351+
}
352+
// Default retry policy
353+
policyMap
354+
.put(Exception.class, createRetryPolicy(maxRetryCount, retryInterval));
355+
return policyMap;
356+
}
357+
358+
public static List<Class<? extends Exception>> getExceptionList() {
359+
return EXCEPTION_LIST;
360+
}
293361
}

hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ public class BlockOutputStream extends OutputStream {
8080
public static final Logger LOG =
8181
LoggerFactory.getLogger(BlockOutputStream.class);
8282

83-
private BlockID blockID;
83+
private volatile BlockID blockID;
8484
private final String key;
8585
private final String traceID;
8686
private final BlockData.Builder containerBlockData;
@@ -574,14 +574,18 @@ public void cleanup(boolean invalidateClient) {
574574
* @throws IOException if stream is closed
575575
*/
576576
private void checkOpen() throws IOException {
577-
if (xceiverClient == null) {
577+
if (isClosed()) {
578578
throw new IOException("BlockOutputStream has been closed.");
579579
} else if (getIoException() != null) {
580580
adjustBuffersOnException();
581581
throw getIoException();
582582
}
583583
}
584584

585+
public boolean isClosed() {
586+
return xceiverClient == null;
587+
}
588+
585589
/**
586590
* Writes buffered data as a new chunk to the container and saves chunk
587591
* information to be used later in putKey call.
@@ -635,4 +639,9 @@ private void writeChunkToContainer(ByteBuffer chunk) throws IOException {
635639
+ " length " + effectiveChunkSize);
636640
containerBlockData.addChunks(chunkInfo);
637641
}
642+
643+
@VisibleForTesting
644+
public void setXceiverClient(XceiverClientSpi xceiverClient) {
645+
this.xceiverClient = xceiverClient;
646+
}
638647
}

hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/CommitWatcher.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -188,7 +188,6 @@ void releaseBuffersOnException() {
188188
*/
189189
public XceiverClientReply watchForCommit(long commitIndex)
190190
throws IOException {
191-
Preconditions.checkState(!commitIndex2flushedDataMap.isEmpty());
192191
long index;
193192
try {
194193
XceiverClientReply reply =

hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -121,12 +121,12 @@ public final class ScmConfigKeys {
121121
TimeDuration.valueOf(3000, TimeUnit.MILLISECONDS);
122122
public static final String DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_KEY =
123123
"dfs.ratis.client.request.max.retries";
124-
public static final int DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_DEFAULT = 20;
124+
public static final int DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_DEFAULT = 180;
125125
public static final String DFS_RATIS_CLIENT_REQUEST_RETRY_INTERVAL_KEY =
126126
"dfs.ratis.client.request.retry.interval";
127127
public static final TimeDuration
128128
DFS_RATIS_CLIENT_REQUEST_RETRY_INTERVAL_DEFAULT =
129-
TimeDuration.valueOf(500, TimeUnit.MILLISECONDS);
129+
TimeDuration.valueOf(1000, TimeUnit.MILLISECONDS);
130130
public static final String DFS_RATIS_SERVER_RETRY_CACHE_TIMEOUT_DURATION_KEY =
131131
"dfs.ratis.server.retry-cache.timeout.duration";
132132
public static final TimeDuration

hadoop-hdds/common/src/main/resources/ozone-default.xml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -237,13 +237,13 @@
237237
</property>
238238
<property>
239239
<name>dfs.ratis.client.request.max.retries</name>
240-
<value>20</value>
240+
<value>180</value>
241241
<tag>OZONE, RATIS, MANAGEMENT</tag>
242242
<description>Number of retries for ratis client request.</description>
243243
</property>
244244
<property>
245245
<name>dfs.ratis.client.request.retry.interval</name>
246-
<value>500ms</value>
246+
<value>1000ms</value>
247247
<tag>OZONE, RATIS, MANAGEMENT</tag>
248248
<description>Interval between successive retries for a ratis client request.
249249
</description>

hadoop-hdds/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ http://maven.apache.org/xsd/maven-4.0.0.xsd">
4747
<hdds.version>0.5.0-SNAPSHOT</hdds.version>
4848

4949
<!-- Apache Ratis version -->
50-
<ratis.version>0.3.0</ratis.version>
50+
<ratis.version>0.4.0-fe2b15d-SNAPSHOT</ratis.version>
5151

5252
<bouncycastle.version>1.60</bouncycastle.version>
5353

hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneClientUtils.java

Lines changed: 0 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -18,15 +18,11 @@
1818
package org.apache.hadoop.ozone.client;
1919

2020
import java.util.ArrayList;
21-
import java.util.HashMap;
2221
import java.util.List;
23-
import java.util.Map;
2422
import java.util.concurrent.TimeUnit;
25-
import java.util.concurrent.TimeoutException;
2623

2724
import org.apache.hadoop.hdds.client.OzoneQuota;
2825
import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
29-
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerNotOpenException;
3026
import org.apache.hadoop.io.retry.RetryPolicies;
3127
import org.apache.hadoop.io.retry.RetryPolicy;
3228
import org.apache.hadoop.ozone.OzoneConsts;
@@ -36,23 +32,11 @@
3632
import org.apache.hadoop.ozone.client.rest.response.KeyLocation;
3733
import org.apache.hadoop.ozone.client.rest.response.VolumeInfo;
3834
import org.apache.hadoop.ozone.client.rest.response.VolumeOwner;
39-
import org.apache.ratis.protocol.AlreadyClosedException;
40-
import org.apache.ratis.protocol.GroupMismatchException;
41-
import org.apache.ratis.protocol.RaftRetryFailureException;
4235

4336
/** A utility class for OzoneClient. */
4437
public final class OzoneClientUtils {
4538

4639
private OzoneClientUtils() {}
47-
48-
private static final List<Class<? extends Exception>> EXCEPTION_LIST =
49-
new ArrayList<Class<? extends Exception>>() {{
50-
add(TimeoutException.class);
51-
add(ContainerNotOpenException.class);
52-
add(RaftRetryFailureException.class);
53-
add(AlreadyClosedException.class);
54-
add(GroupMismatchException.class);
55-
}};
5640
/**
5741
* Returns a BucketInfo object constructed using fields of the input
5842
* OzoneBucket object.
@@ -141,26 +125,4 @@ public static RetryPolicy createRetryPolicy(int maxRetryCount,
141125
maxRetryCount, retryInterval, TimeUnit.MILLISECONDS);
142126
}
143127

144-
public static List<Class<? extends Exception>> getExceptionList() {
145-
return EXCEPTION_LIST;
146-
}
147-
148-
public static Map<Class<? extends Throwable>, RetryPolicy>
149-
getRetryPolicyByException(int maxRetryCount, long retryInterval) {
150-
Map<Class<? extends Throwable>, RetryPolicy> policyMap = new HashMap<>();
151-
for (Class<? extends Exception> ex : EXCEPTION_LIST) {
152-
if (ex == TimeoutException.class ||
153-
ex == RaftRetryFailureException.class) {
154-
// retry without sleep
155-
policyMap.put(ex, createRetryPolicy(maxRetryCount, 0));
156-
} else {
157-
// retry with fixed sleep between retries
158-
policyMap.put(ex, createRetryPolicy(maxRetryCount, retryInterval));
159-
}
160-
}
161-
// Default retry policy
162-
policyMap.put(Exception.class, createRetryPolicy(
163-
maxRetryCount, retryInterval));
164-
return policyMap;
165-
}
166128
}

hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,13 @@ public void close() throws IOException {
149149
}
150150
}
151151

152+
boolean isClosed() {
153+
if (outputStream != null) {
154+
return ((BlockOutputStream) outputStream).isClosed();
155+
}
156+
return false;
157+
}
158+
152159
long getTotalAckDataLength() {
153160
if (outputStream != null) {
154161
BlockOutputStream out = (BlockOutputStream) this.outputStream;

0 commit comments

Comments
 (0)