Skip to content

HDDS-2032. Ozone client should retry writes in case of any ratis/stateMachine exceptions. #1420

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Sep 18, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import org.apache.hadoop.hdds.protocolPB.SCMSecurityProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdds.protocolPB.SCMSecurityProtocolPB;
import org.apache.hadoop.hdds.scm.XceiverClientManager.ScmClientConfig;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerNotOpenException;
import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol;
import org.apache.hadoop.hdds.scm.protocolPB.ScmBlockLocationProtocolPB;
import org.apache.hadoop.io.retry.RetryPolicies;
Expand Down Expand Up @@ -86,7 +86,7 @@ private HddsClientUtils() {
private static final List<Class<? extends Exception>> EXCEPTION_LIST =
new ArrayList<Class<? extends Exception>>() {{
add(TimeoutException.class);
add(ContainerNotOpenException.class);
add(StorageContainerException.class);
add(RaftRetryFailureException.class);
add(AlreadyClosedException.class);
add(GroupMismatchException.class);
Expand Down Expand Up @@ -313,7 +313,7 @@ public static SCMSecurityProtocol getScmSecurityClient(
return scmSecurityClient;
}

public static Throwable checkForException(Exception e) throws IOException {
public static Throwable checkForException(Exception e) {
Throwable t = e;
while (t != null) {
for (Class<? extends Exception> cls : getExceptionList()) {
Expand All @@ -323,8 +323,7 @@ public static Throwable checkForException(Exception e) throws IOException {
}
t = t.getCause();
}

throw e instanceof IOException ? (IOException)e : new IOException(e);
return t;
}

public static RetryPolicy createRetryPolicy(int maxRetryCount,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChecksumType;
import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerNotOpenException;
import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
import org.apache.hadoop.io.retry.RetryPolicies;
Expand All @@ -37,8 +37,6 @@
import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol;
import org.apache.hadoop.hdds.scm.XceiverClientManager;
import org.apache.ratis.protocol.AlreadyClosedException;
import org.apache.ratis.protocol.GroupMismatchException;
import org.apache.ratis.protocol.NotReplicatedException;
import org.apache.ratis.protocol.RaftRetryFailureException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -49,7 +47,6 @@
import java.util.List;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -256,18 +253,19 @@ private void handleWrite(byte[] b, int off, long len, boolean retry)
private void handleException(BlockOutputStreamEntry streamEntry,
IOException exception) throws IOException {
Throwable t = HddsClientUtils.checkForException(exception);
Preconditions.checkNotNull(t);
boolean retryFailure = checkForRetryFailure(t);
boolean closedContainerException = false;
boolean containerExclusionException = false;
if (!retryFailure) {
closedContainerException = checkIfContainerIsClosed(t);
containerExclusionException = checkIfContainerToExclude(t);
}
Pipeline pipeline = streamEntry.getPipeline();
PipelineID pipelineId = pipeline.getId();
long totalSuccessfulFlushedData = streamEntry.getTotalAckDataLength();
//set the correct length for the current stream
streamEntry.setCurrentPosition(totalSuccessfulFlushedData);
long bufferedDataLen = blockOutputStreamEntryPool.computeBufferData();
if (closedContainerException) {
if (containerExclusionException) {
LOG.debug(
"Encountered exception {}. The last committed block length is {}, "
+ "uncommitted data length is {} retry count {}", exception,
Expand All @@ -290,11 +288,12 @@ private void handleException(BlockOutputStreamEntry streamEntry,
if (!failedServers.isEmpty()) {
excludeList.addDatanodes(failedServers);
}
if (closedContainerException) {

// if the container needs to be excluded , add the container to the
// exclusion list , otherwise add the pipeline to the exclusion list
if (containerExclusionException) {
excludeList.addConatinerId(ContainerID.valueof(containerId));
} else if (retryFailure || t instanceof TimeoutException
|| t instanceof GroupMismatchException
|| t instanceof NotReplicatedException) {
} else {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So apart from SCE, all exceptions are expected to be related to the pipeline ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes...If dn reports an StorageContainerException , its specific to containers in dns but other that if ratis reports any other exceptions , it implies issues in the pipeline itself

excludeList.addPipeline(pipelineId);
}
// just clean up the current stream.
Expand All @@ -303,7 +302,7 @@ private void handleException(BlockOutputStreamEntry streamEntry,
// discard all subsequent blocks the containers and pipelines which
// are in the exclude list so that, the very next retry should never
// write data on the closed container/pipeline
if (closedContainerException) {
if (containerExclusionException) {
// discard subsequent pre allocated blocks from the streamEntries list
// from the closed container
blockOutputStreamEntryPool
Expand Down Expand Up @@ -386,8 +385,10 @@ private boolean checkForRetryFailure(Throwable t) {
|| t instanceof AlreadyClosedException;
}

private boolean checkIfContainerIsClosed(Throwable t) {
return t instanceof ContainerNotOpenException;
// Every container specific exception from datatnode will be seen as
// StorageContainerException
private boolean checkIfContainerToExclude(Throwable t) {
return t instanceof StorageContainerException;
}

@Override
Expand Down