Skip to content

HDDS-1586. Allow Ozone RPC client to read with topology awareness. #931

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.HddsUtils;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.XceiverClientProtocolServiceGrpc;
Expand Down Expand Up @@ -64,7 +65,7 @@
import java.util.concurrent.TimeoutException;

/**
* A Client for the storageContainer protocol.
* A Client for the storageContainer protocol for read object data.
*/
public class XceiverClientGrpc extends XceiverClientSpi {
static final Logger LOG = LoggerFactory.getLogger(XceiverClientGrpc.class);
Expand All @@ -76,6 +77,7 @@ public class XceiverClientGrpc extends XceiverClientSpi {
private final Semaphore semaphore;
private boolean closed = false;
private SecurityConfig secConfig;
private final boolean topologyAwareRead;

/**
* Constructs a client that can communicate with the Container framework on
Expand All @@ -96,16 +98,20 @@ public XceiverClientGrpc(Pipeline pipeline, Configuration config) {
this.metrics = XceiverClientManager.getXceiverClientMetrics();
this.channels = new HashMap<>();
this.asyncStubs = new HashMap<>();
this.topologyAwareRead = Boolean.parseBoolean(config.get(
ScmConfigKeys.DFS_NETWORK_TOPOLOGY_AWARE_READ_ENABLED,
ScmConfigKeys.DFS_NETWORK_TOPOLOGY_AWARE_READ_ENABLED_DEFAULT));
}

/**
* To be used when grpc token is not enabled.
*/
@Override
public void connect() throws Exception {
// leader by default is the 1st datanode in the datanode list of pipleline
DatanodeDetails dn = this.pipeline.getFirstNode();
// just make a connection to the 1st datanode at the beginning
// connect to the closest node, if closest node doesn't exist, delegate to
// first node, which is usually the leader in the pipeline.
DatanodeDetails dn = this.pipeline.getClosestNode();
// just make a connection to the picked datanode at the beginning
connectToDatanode(dn, null);
}

Expand All @@ -114,9 +120,11 @@ public void connect() throws Exception {
*/
@Override
public void connect(String encodedToken) throws Exception {
// leader by default is the 1st datanode in the datanode list of pipleline
DatanodeDetails dn = this.pipeline.getFirstNode();
// just make a connection to the 1st datanode at the beginning
// connect to the closest node, if closest node doesn't exist, delegate to
// first node, which is usually the leader in the pipeline.
DatanodeDetails dn;
dn = this.pipeline.getClosestNode();
// just make a connection to the picked datanode at the beginning
connectToDatanode(dn, encodedToken);
}

Expand All @@ -132,7 +140,8 @@ private void connectToDatanode(DatanodeDetails dn, String encodedToken)

// Add credential context to the client call
String userName = UserGroupInformation.getCurrentUser().getShortUserName();
LOG.debug("Connecting to server Port : " + dn.getIpAddress());
LOG.debug("Nodes in pipeline : {}", pipeline.getNodes().toString());
LOG.debug("Connecting to server : {}", dn.getIpAddress());
NettyChannelBuilder channelBuilder =
NettyChannelBuilder.forAddress(dn.getIpAddress(), port).usePlaintext()
.maxInboundMessageSize(OzoneConsts.OZONE_SCM_CHUNK_MAX_SIZE)
Expand Down Expand Up @@ -252,7 +261,15 @@ private XceiverClientReply sendCommandWithRetry(
// TODO: cache the correct leader info in here, so that any subsequent calls
// should first go to leader
XceiverClientReply reply = new XceiverClientReply(null);
for (DatanodeDetails dn : pipeline.getNodes()) {
List<DatanodeDetails> datanodeList;
if ((request.getCmdType() == ContainerProtos.Type.ReadChunk ||
request.getCmdType() == ContainerProtos.Type.GetSmallFile) &&
topologyAwareRead) {
datanodeList = pipeline.getNodesInOrder();
} else {
datanodeList = pipeline.getNodes();
}
for (DatanodeDetails dn : datanodeList) {
try {
LOG.debug("Executing command " + request + " on datanode " + dn);
// In case the command gets retried on a 2nd datanode,
Expand Down Expand Up @@ -349,6 +366,8 @@ private XceiverClientReply sendCommandAsync(
reconnect(dn, token);
}

LOG.debug("Send command {} to datanode {}", request.getCmdType().toString(),
dn.getNetworkFullPath());
final CompletableFuture<ContainerCommandResponseProto> replyFuture =
new CompletableFuture<>();
semaphore.acquire();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.ozone.OzoneSecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.Closeable;
import java.io.IOException;
Expand Down Expand Up @@ -57,14 +59,16 @@
* not being used for a period of time.
*/
public class XceiverClientManager implements Closeable {

private static final Logger LOG =
LoggerFactory.getLogger(XceiverClientManager.class);
//TODO : change this to SCM configuration class
private final Configuration conf;
private final Cache<String, XceiverClientSpi> clientCache;
private final boolean useRatis;

private static XceiverClientMetrics metrics;
private boolean isSecurityEnabled;
private final boolean topologyAwareRead;
/**
* Creates a new XceiverClientManager.
*
Expand Down Expand Up @@ -98,6 +102,9 @@ public void onRemoval(
}
}
}).build();
topologyAwareRead = Boolean.parseBoolean(conf.get(
ScmConfigKeys.DFS_NETWORK_TOPOLOGY_AWARE_READ_ENABLED,
ScmConfigKeys.DFS_NETWORK_TOPOLOGY_AWARE_READ_ENABLED_DEFAULT));
}

@VisibleForTesting
Expand All @@ -118,12 +125,32 @@ public Cache<String, XceiverClientSpi> getClientCache() {
*/
public XceiverClientSpi acquireClient(Pipeline pipeline)
throws IOException {
return acquireClient(pipeline, false);
}

/**
* Acquires a XceiverClientSpi connected to a container for read.
*
* If there is already a cached XceiverClientSpi, simply return
* the cached otherwise create a new one.
*
* @param pipeline the container pipeline for the client connection
* @return XceiverClientSpi connected to a container
* @throws IOException if a XceiverClientSpi cannot be acquired
*/
public XceiverClientSpi acquireClientForReadData(Pipeline pipeline)
throws IOException {
return acquireClient(pipeline, true);
}

private XceiverClientSpi acquireClient(Pipeline pipeline, boolean read)
throws IOException {
Preconditions.checkNotNull(pipeline);
Preconditions.checkArgument(pipeline.getNodes() != null);
Preconditions.checkArgument(!pipeline.getNodes().isEmpty());

synchronized (clientCache) {
XceiverClientSpi info = getClient(pipeline);
XceiverClientSpi info = getClient(pipeline, read);
info.incrementReference();
return info;
}
Expand All @@ -136,12 +163,28 @@ public XceiverClientSpi acquireClient(Pipeline pipeline)
* @param invalidateClient if true, invalidates the client in cache
*/
public void releaseClient(XceiverClientSpi client, boolean invalidateClient) {
releaseClient(client, invalidateClient, false);
}

/**
* Releases a read XceiverClientSpi after use.
*
* @param client client to release
* @param invalidateClient if true, invalidates the client in cache
*/
public void releaseClientForReadData(XceiverClientSpi client,
boolean invalidateClient) {
releaseClient(client, invalidateClient, true);
}

private void releaseClient(XceiverClientSpi client, boolean invalidateClient,
boolean read) {
Preconditions.checkNotNull(client);
synchronized (clientCache) {
client.decrementReference();
if (invalidateClient) {
Pipeline pipeline = client.getPipeline();
String key = pipeline.getId().getId().toString() + pipeline.getType();
String key = getPipelineCacheKey(pipeline, read);
XceiverClientSpi cachedClient = clientCache.getIfPresent(key);
if (cachedClient == client) {
clientCache.invalidate(key);
Expand All @@ -150,11 +193,13 @@ public void releaseClient(XceiverClientSpi client, boolean invalidateClient) {
}
}

private XceiverClientSpi getClient(Pipeline pipeline)
private XceiverClientSpi getClient(Pipeline pipeline, boolean forRead)
throws IOException {
HddsProtos.ReplicationType type = pipeline.getType();
try {
String key = pipeline.getId().getId().toString() + type;
// create different client for read different pipeline node based on
// network topology
String key = getPipelineCacheKey(pipeline, forRead);
// Append user short name to key to prevent a different user
// from using same instance of xceiverClient.
key = isSecurityEnabled ?
Expand Down Expand Up @@ -184,6 +229,19 @@ public XceiverClientSpi call() throws Exception {
}
}

private String getPipelineCacheKey(Pipeline pipeline, boolean forRead) {
String key = pipeline.getId().getId().toString() + pipeline.getType();
if (topologyAwareRead && forRead) {
try {
key += pipeline.getClosestNode().getHostName();
} catch (IOException e) {
LOG.error("Failed to get closest node to create pipeline cache key:" +
e.getMessage());
}
}
return key;
}

/**
* Close and remove all the cached clients.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ protected List<ChunkInfo> getChunkInfos() throws IOException {
pipeline = Pipeline.newBuilder(pipeline)
.setType(HddsProtos.ReplicationType.STAND_ALONE).build();
}
xceiverClient = xceiverClientManager.acquireClient(pipeline);
xceiverClient = xceiverClientManager.acquireClientForReadData(pipeline);
boolean success = false;
List<ChunkInfo> chunks;
try {
Expand All @@ -170,7 +170,7 @@ protected List<ChunkInfo> getChunkInfos() throws IOException {
success = true;
} finally {
if (!success) {
xceiverClientManager.releaseClient(xceiverClient, false);
xceiverClientManager.releaseClientForReadData(xceiverClient, false);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ protected DatanodeDetails(DatanodeDetails datanodeDetails) {
this.ipAddress = datanodeDetails.ipAddress;
this.hostName = datanodeDetails.hostName;
this.ports = datanodeDetails.ports;
this.setNetworkName(datanodeDetails.getNetworkName());
}

/**
Expand Down Expand Up @@ -192,6 +193,12 @@ public static DatanodeDetails getFromProtoBuf(
builder.addPort(newPort(
Port.Name.valueOf(port.getName().toUpperCase()), port.getValue()));
}
if (datanodeDetailsProto.hasNetworkLocation()) {
builder.setNetworkLocation(datanodeDetailsProto.getNetworkLocation());
}
if (datanodeDetailsProto.hasNetworkName()) {
builder.setNetworkName(datanodeDetailsProto.getNetworkName());
}
return builder.build();
}

Expand All @@ -213,6 +220,7 @@ public HddsProtos.DatanodeDetailsProto getProtoBufMessage() {
builder.setCertSerialId(certSerialId);
}
builder.setNetworkLocation(getNetworkLocation());
builder.setNetworkName(getNetworkName());

for (Port port : ports) {
builder.addPorts(HddsProtos.Port.newBuilder()
Expand Down Expand Up @@ -268,6 +276,7 @@ public static final class Builder {
private String id;
private String ipAddress;
private String hostName;
private String networkName;
private String networkLocation;
private List<Port> ports;
private String certSerialId;
Expand Down Expand Up @@ -313,6 +322,17 @@ public Builder setHostName(String host) {
return this;
}

/**
* Sets the network name of DataNode.
*
* @param name network name
* @return DatanodeDetails.Builder
*/
public Builder setNetworkName(String name) {
this.networkName = name;
return this;
}

/**
* Sets the network location of DataNode.
*
Expand Down Expand Up @@ -358,8 +378,12 @@ public DatanodeDetails build() {
if (networkLocation == null) {
networkLocation = NetConstants.DEFAULT_RACK;
}
return new DatanodeDetails(id, ipAddress, hostName, networkLocation,
ports, certSerialId);
DatanodeDetails dn = new DatanodeDetails(id, ipAddress, hostName,
networkLocation, ports, certSerialId);
if (networkName != null) {
dn.setNetworkName(networkName);
}
return dn;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,10 @@ public final class ScmConfigKeys {
"ozone.scm.network.topology.schema.file";
public static final String OZONE_SCM_NETWORK_TOPOLOGY_SCHEMA_FILE_DEFAULT =
"network-topology-default.xml";
public static final String DFS_NETWORK_TOPOLOGY_AWARE_READ_ENABLED =
"dfs.network.topology.aware.read.enable";
public static final String DFS_NETWORK_TOPOLOGY_AWARE_READ_ENABLED_DEFAULT =
"true";

public static final String HDDS_TRACING_ENABLED = "hdds.tracing.enabled";
public static final boolean HDDS_TRACING_ENABLED_DEFAULT = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.hadoop.hdds.scm.net;

import java.util.Collection;
import java.util.List;

/**
* The interface defines a network topology.
Expand Down Expand Up @@ -246,5 +247,6 @@ Node getNode(int leafIndex, String scope, String excludedScope,
* @param nodes Available replicas with the requested data
* @param activeLen Number of active nodes at the front of the array
*/
void sortByDistanceCost(Node reader, Node[] nodes, int activeLen);
List<? extends Node> sortByDistanceCost(Node reader,
List<? extends Node> nodes, int activeLen);
}
Loading