Skip to content

HDDS-1224. Restructure code to validate the response from server in the Read path #806

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Jun 4, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,14 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.HddsUtils;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.XceiverClientProtocolServiceGrpc;
import org.apache.hadoop.hdds.protocol.datanode.proto.XceiverClientProtocolServiceGrpc.XceiverClientProtocolServiceStub;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.storage.CheckedBiFunction;
import org.apache.hadoop.hdds.security.exception.SCMSecurityException;
import org.apache.hadoop.hdds.security.x509.SecurityConfig;
import org.apache.hadoop.hdds.tracing.GrpcClientInterceptor;
Expand Down Expand Up @@ -62,7 +62,6 @@
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;

/**
* A Client for the storageContainer protocol.
Expand All @@ -83,15 +82,15 @@ public class XceiverClientGrpc extends XceiverClientSpi {
* data nodes.
*
* @param pipeline - Pipeline that defines the machines.
* @param config -- Ozone Config
* @param config -- Ozone Config
*/
public XceiverClientGrpc(Pipeline pipeline, Configuration config) {
super();
Preconditions.checkNotNull(pipeline);
Preconditions.checkNotNull(config);
this.pipeline = pipeline;
this.config = config;
this.secConfig = new SecurityConfig(config);
this.secConfig = new SecurityConfig(config);
this.semaphore =
new Semaphore(HddsClientUtils.getMaxOutstandingRequests(config));
this.metrics = XceiverClientManager.getXceiverClientMetrics();
Expand All @@ -101,7 +100,7 @@ public XceiverClientGrpc(Pipeline pipeline, Configuration config) {

/**
* To be used when grpc token is not enabled.
* */
*/
@Override
public void connect() throws Exception {
// leader by default is the 1st datanode in the datanode list of pipleline
Expand All @@ -112,7 +111,7 @@ public void connect() throws Exception {

/**
* Passed encoded token to GRPC header when security is enabled.
* */
*/
@Override
public void connect(String encodedToken) throws Exception {
// leader by default is the 1st datanode in the datanode list of pipleline
Expand All @@ -132,11 +131,10 @@ private void connectToDatanode(DatanodeDetails dn, String encodedToken)
}

// Add credential context to the client call
String userName = UserGroupInformation.getCurrentUser()
.getShortUserName();
String userName = UserGroupInformation.getCurrentUser().getShortUserName();
LOG.debug("Connecting to server Port : " + dn.getIpAddress());
NettyChannelBuilder channelBuilder = NettyChannelBuilder.forAddress(dn
.getIpAddress(), port).usePlaintext()
NettyChannelBuilder channelBuilder =
NettyChannelBuilder.forAddress(dn.getIpAddress(), port).usePlaintext()
.maxInboundMessageSize(OzoneConsts.OZONE_SCM_CHUNK_MAX_SIZE)
.intercept(new ClientCredentialInterceptor(userName, encodedToken),
new GrpcClientInterceptor());
Expand All @@ -149,8 +147,8 @@ private void connectToDatanode(DatanodeDetails dn, String encodedToken)
if (trustCertCollectionFile != null) {
sslContextBuilder.trustManager(trustCertCollectionFile);
}
if (secConfig.isGrpcMutualTlsRequired() && clientCertChainFile != null &&
privateKeyFile != null) {
if (secConfig.isGrpcMutualTlsRequired() && clientCertChainFile != null
&& privateKeyFile != null) {
sslContextBuilder.keyManager(clientCertChainFile, privateKeyFile);
}

Expand Down Expand Up @@ -216,77 +214,83 @@ public ContainerCommandResponseProto sendCommand(
}

@Override
public XceiverClientReply sendCommand(
ContainerCommandRequestProto request, List<DatanodeDetails> excludeDns)
public ContainerCommandResponseProto sendCommand(
ContainerCommandRequestProto request, List<CheckedBiFunction> validators)
throws IOException {
Preconditions.checkState(HddsUtils.isReadOnly(request));
return sendCommandWithTraceIDAndRetry(request, excludeDns);
try {
XceiverClientReply reply;
reply = sendCommandWithTraceIDAndRetry(request, validators);
ContainerCommandResponseProto responseProto = reply.getResponse().get();
return responseProto;
} catch (ExecutionException | InterruptedException e) {
throw new IOException("Failed to execute command " + request, e);
}
}

private XceiverClientReply sendCommandWithTraceIDAndRetry(
ContainerCommandRequestProto request, List<DatanodeDetails> excludeDns)
ContainerCommandRequestProto request, List<CheckedBiFunction> validators)
throws IOException {
try (Scope scope = GlobalTracer.get()
.buildSpan("XceiverClientGrpc." + request.getCmdType().name())
.startActive(true)) {
ContainerCommandRequestProto finalPayload =
ContainerCommandRequestProto.newBuilder(request)
.setTraceID(TracingUtil.exportCurrentSpan())
.build();
return sendCommandWithRetry(finalPayload, excludeDns);
.setTraceID(TracingUtil.exportCurrentSpan()).build();
return sendCommandWithRetry(finalPayload, validators);
}
}

private XceiverClientReply sendCommandWithRetry(
ContainerCommandRequestProto request, List<DatanodeDetails> excludeDns)
ContainerCommandRequestProto request, List<CheckedBiFunction> validators)
throws IOException {
ContainerCommandResponseProto responseProto = null;
IOException ioException = null;

// In case of an exception or an error, we will try to read from the
// datanodes in the pipeline in a round robin fashion.

// TODO: cache the correct leader info in here, so that any subsequent calls
// should first go to leader
List<DatanodeDetails> dns = pipeline.getNodes();
List<DatanodeDetails> healthyDns =
excludeDns != null ? dns.stream().filter(dnId -> {
for (DatanodeDetails excludeId : excludeDns) {
if (dnId.equals(excludeId)) {
return false;
}
}
return true;
}).collect(Collectors.toList()) : dns;
XceiverClientReply reply = new XceiverClientReply(null);
for (DatanodeDetails dn : healthyDns) {
for (DatanodeDetails dn : pipeline.getNodes()) {
try {
LOG.debug("Executing command " + request + " on datanode " + dn);
// In case the command gets retried on a 2nd datanode,
// sendCommandAsyncCall will create a new channel and async stub
// in case these don't exist for the specific datanode.
reply.addDatanode(dn);
responseProto = sendCommandAsync(request, dn).getResponse().get();
if (responseProto.getResult() == ContainerProtos.Result.SUCCESS) {
break;
if (validators != null && !validators.isEmpty()) {
for (CheckedBiFunction validator : validators) {
validator.apply(request, responseProto);
}
}
} catch (ExecutionException | InterruptedException e) {
break;
} catch (ExecutionException | InterruptedException | IOException e) {
LOG.debug("Failed to execute command " + request + " on datanode " + dn
.getUuidString(), e);
if (Status.fromThrowable(e.getCause()).getCode()
== Status.UNAUTHENTICATED.getCode()) {
throw new SCMSecurityException("Failed to authenticate with "
+ "GRPC XceiverServer with Ozone block token.");
if (!(e instanceof IOException)) {
if (Status.fromThrowable(e.getCause()).getCode()
== Status.UNAUTHENTICATED.getCode()) {
throw new SCMSecurityException("Failed to authenticate with "
+ "GRPC XceiverServer with Ozone block token.");
}
ioException = new IOException(e);
} else {
ioException = (IOException) e;
}
responseProto = null;
}
}

if (responseProto != null) {
reply.setResponse(CompletableFuture.completedFuture(responseProto));
return reply;
} else {
throw new IOException(
"Failed to execute command " + request + " on the pipeline "
+ pipeline.getId());
Preconditions.checkNotNull(ioException);
LOG.error("Failed to execute command " + request + " on the pipeline "
+ pipeline.getId());
throw ioException;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,29 +21,29 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.scm.XceiverClientReply;
import org.apache.hadoop.hdds.scm.container.common.helpers
.StorageContainerException;
import org.apache.hadoop.ozone.common.Checksum;
import org.apache.hadoop.ozone.common.ChecksumData;
import org.apache.hadoop.ozone.common.OzoneChecksumException;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.apache.hadoop.fs.Seekable;
import org.apache.hadoop.hdds.scm.XceiverClientManager;
import org.apache.hadoop.hdds.scm.XceiverClientSpi;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChunkInfo;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.ReadChunkResponseProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.
ContainerCommandResponseProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.
ContainerCommandRequestProto;
import org.apache.hadoop.hdds.client.BlockID;

import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ExecutionException;

/**
* An {@link InputStream} used by the REST service in combination with the
Expand Down Expand Up @@ -74,7 +74,7 @@ public class BlockInputStream extends InputStream implements Seekable {
private List<ByteBuffer> buffers;
private int bufferIndex;
private long bufferPosition;
private final boolean verifyChecksum;
private boolean verifyChecksum;

/**
* Creates a new BlockInputStream.
Expand Down Expand Up @@ -323,41 +323,8 @@ private boolean chunksRemaining() {
private synchronized void readChunkFromContainer() throws IOException {
// Read the chunk at chunkIndex
final ChunkInfo chunkInfo = chunks.get(chunkIndex);
List<DatanodeDetails> excludeDns = null;
ByteString byteString;
List<DatanodeDetails> dnList = getDatanodeList();
while (true) {
List<DatanodeDetails> dnListFromReadChunkCall = new ArrayList<>();
byteString = readChunk(chunkInfo, excludeDns, dnListFromReadChunkCall);
try {
if (byteString.size() != chunkInfo.getLen()) {
// Bytes read from chunk should be equal to chunk size.
throw new IOException(String
.format("Inconsistent read for chunk=%s len=%d bytesRead=%d",
chunkInfo.getChunkName(), chunkInfo.getLen(),
byteString.size()));
}
ChecksumData checksumData =
ChecksumData.getFromProtoBuf(chunkInfo.getChecksumData());
if (verifyChecksum) {
Checksum.verifyChecksum(byteString, checksumData);
}
break;
} catch (IOException ioe) {
// we will end up in this situation only if the checksum mismatch
// happens or the length of the chunk mismatches.
// In this case, read should be retried on a different replica.
// TODO: Inform SCM of a possible corrupt container replica here
if (excludeDns == null) {
excludeDns = new ArrayList<>();
}
excludeDns.addAll(dnListFromReadChunkCall);
if (excludeDns.size() == dnList.size()) {
throw ioe;
}
}
}

byteString = readChunk(chunkInfo);
buffers = byteString.asReadOnlyByteBufferList();
bufferIndex = 0;
chunkIndexOfCurrentBuffer = chunkIndex;
Expand All @@ -372,28 +339,20 @@ private synchronized void readChunkFromContainer() throws IOException {
* Send RPC call to get the chunk from the container.
*/
@VisibleForTesting
protected ByteString readChunk(final ChunkInfo chunkInfo,
List<DatanodeDetails> excludeDns, List<DatanodeDetails> dnListFromReply)
protected ByteString readChunk(final ChunkInfo chunkInfo)
throws IOException {
XceiverClientReply reply;
ReadChunkResponseProto readChunkResponse = null;
ReadChunkResponseProto readChunkResponse;
try {
reply = ContainerProtocolCalls
.readChunk(xceiverClient, chunkInfo, blockID, traceID, excludeDns);
ContainerProtos.ContainerCommandResponseProto response;
response = reply.getResponse().get();
ContainerProtocolCalls.validateContainerResponse(response);
readChunkResponse = response.getReadChunk();
dnListFromReply.addAll(reply.getDatanodes());
List<CheckedBiFunction> validators =
ContainerProtocolCalls.getValidatorList();
validators.add(validator);
readChunkResponse = ContainerProtocolCalls
.readChunk(xceiverClient, chunkInfo, blockID, traceID, validators);
} catch (IOException e) {
if (e instanceof StorageContainerException) {
throw e;
}
throw new IOException("Unexpected OzoneException: " + e.toString(), e);
} catch (ExecutionException | InterruptedException e) {
throw new IOException(
"Failed to execute ReadChunk command for chunk " + chunkInfo
.getChunkName(), e);
}
return readChunkResponse.getData();
}
Expand All @@ -403,6 +362,26 @@ protected List<DatanodeDetails> getDatanodeList() {
return xceiverClient.getPipeline().getNodes();
}

private CheckedBiFunction<ContainerCommandRequestProto,
ContainerCommandResponseProto, IOException> validator =
(request, response) -> {
ReadChunkResponseProto readChunkResponse = response.getReadChunk();
final ChunkInfo chunkInfo = readChunkResponse.getChunkData();
ByteString byteString = readChunkResponse.getData();
if (byteString.size() != chunkInfo.getLen()) {
// Bytes read from chunk should be equal to chunk size.
throw new OzoneChecksumException(String
.format("Inconsistent read for chunk=%s len=%d bytesRead=%d",
chunkInfo.getChunkName(), chunkInfo.getLen(),
byteString.size()));
}
ChecksumData checksumData =
ChecksumData.getFromProtoBuf(chunkInfo.getChecksumData());
if (verifyChecksum) {
Checksum.verifyChecksum(byteString, checksumData);
}
};

@Override
public synchronized void seek(long pos) throws IOException {
if (pos < 0 || (chunks.size() == 0 && pos > 0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,7 @@ private static class DummyBlockInputStream extends BlockInputStream {
}

@Override
protected ByteString readChunk(final ChunkInfo chunkInfo,
List<DatanodeDetails> excludeDns, List<DatanodeDetails> dnListFromReply)
protected ByteString readChunk(final ChunkInfo chunkInfo)
throws IOException {
return getByteString(chunkInfo.getChunkName(), (int) chunkInfo.getLen());
}
Expand Down
Loading