Skip to content

HDDS-1569 Support creating multiple pipelines with same datanode #1431

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,15 @@ public final class ScmConfigKeys {
// the max number of pipelines can a single datanode be engaged in.
public static final String OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT =
"ozone.scm.datanode.max.pipeline.engagement";
public static final int OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT_DEFAULT = 5;
// Setting to zero by default means this limit doesn't take effect.
public static final int OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT_DEFAULT = 0;

// Upper limit for how many pipelines can be created.
// Only for test purpose now.
public static final String OZONE_SCM_PIPELINE_NUMBER_LIMIT =
"ozone.scm.datanode.pipeline.number.limit";
// Setting to zero by default means this limit doesn't take effect.
public static final int OZONE_SCM_PIPELINE_NUMBER_LIMIT_DEFAULT = 0;

public static final String
OZONE_SCM_KEY_VALUE_CONTAINER_DELETION_CHOOSING_POLICY =
Expand Down
13 changes: 10 additions & 3 deletions hadoop-hdds/common/src/main/resources/ozone-default.xml
Original file line number Diff line number Diff line change
Expand Up @@ -838,10 +838,17 @@
</description>
</property>
<property>
<name>ozone.scm.datanode.max.pipeline.engagement</name>
<value>5</value>
<name>ozone.scm.datanode.max.pipeline.engagement</name>
<value>0</value>
<tag>OZONE, SCM, PIPELINE</tag>
<description>Max number of pipelines per datanode can be engaged in.
</description>
</property>
<property>
<name>ozone.scm.datanode.pipeline.number.limit</name>
<value>0</value>
<tag>OZONE, SCM, PIPELINE</tag>
<description>Max number of pipelines per datanode can be engaged in.
<description>Upper limit for how many pipelines can be created in SCM.
</description>
</property>
<property>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,10 @@ public AllocatedBlock allocateBlock(final long size, ReplicationType type,
// TODO: #CLUTIL Remove creation logic when all replication types and
// factors are handled by pipeline creator
pipeline = pipelineManager.createPipeline(type, factor);
} catch (SCMException se) {
LOG.warn("Pipeline creation failed for type:{} factor:{}. " +
"Datanodes may be used up.", type, factor, se);
break;
} catch (IOException e) {
LOG.warn("Pipeline creation failed for type:{} factor:{}. Retrying " +
"get pipelines call once.", type, factor, e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,10 @@ private ContainerPlacementPolicyFactory() {
}


public static PlacementPolicy getPolicy(Configuration conf,
final NodeManager nodeManager, NetworkTopology clusterMap,
final boolean fallback, SCMContainerPlacementMetrics metrics)
throws SCMException{
public static PlacementPolicy getPolicy(
Configuration conf, final NodeManager nodeManager,
NetworkTopology clusterMap, final boolean fallback,
SCMContainerPlacementMetrics metrics) throws SCMException{
final Class<? extends PlacementPolicy> placementClass = conf
.getClass(ScmConfigKeys.OZONE_SCM_CONTAINER_PLACEMENT_IMPL_KEY,
OZONE_SCM_CONTAINER_PLACEMENT_IMPL_DEFAULT,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,10 @@ public synchronized void addPipeline(Pipeline pipeline) {
UUID dnId = details.getUuid();
dn2ObjectMap.computeIfAbsent(dnId, k -> ConcurrentHashMap.newKeySet())
.add(pipeline.getId());
dn2ObjectMap.computeIfPresent(dnId, (k, v) -> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Line 73 should be able to add(pipeline.getId() if the HashMap exist. Line 74-77 seems redundant.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From Java doc:https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ConcurrentHashMap.html#computeIfAbsent-K-java.util.function.Function- , it looks like computeIfAbsent is only adding absent member. Line 74 is trying to merge a pipelineId into an existed candidate. This happens when one datanode is assigned to multiple pipelines, which is a classic scenario for multiraft.

v.add(pipeline.getId());
return v;
});
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ private void createPipelines() {
if (scheduler.isClosed()) {
break;
}

pipelineManager.createPipeline(type, factor);
} catch (IOException ioe) {
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public void onMessage(PipelineActionsFromDatanode report,
pipelineID = PipelineID.
getFromProtobuf(action.getClosePipeline().getPipelineID());
Pipeline pipeline = pipelineManager.getPipeline(pipelineID);
LOG.error("Received pipeline action {} for {} from datanode {}. " +
LOG.info("Received pipeline action {} for {} from datanode {}. " +
"Reason : {}", action.getAction(), pipeline,
report.getDatanodeDetails(),
action.getClosePipeline().getDetailedReason());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,22 +52,25 @@ public final class PipelinePlacementPolicy extends SCMCommonPlacementPolicy {
static final Logger LOG =
LoggerFactory.getLogger(PipelinePlacementPolicy.class);
private final NodeManager nodeManager;
private final PipelineStateManager stateManager;
private final Configuration conf;
private final int heavyNodeCriteria;

/**
* Constructs a pipeline placement with considering network topology,
* load balancing and rack awareness.
*
* @param nodeManager Node Manager
* @param nodeManager NodeManager
* @param stateManager PipelineStateManager
* @param conf Configuration
*/
public PipelinePlacementPolicy(
final NodeManager nodeManager, final Configuration conf) {
public PipelinePlacementPolicy(final NodeManager nodeManager,
final PipelineStateManager stateManager, final Configuration conf) {
super(nodeManager, conf);
this.nodeManager = nodeManager;
this.conf = conf;
heavyNodeCriteria = conf.getInt(
this.stateManager = stateManager;
this.heavyNodeCriteria = conf.getInt(
ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT,
ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT_DEFAULT);
}
Expand All @@ -76,11 +79,34 @@ public PipelinePlacementPolicy(
* Returns true if this node meets the criteria.
*
* @param datanodeDetails DatanodeDetails
* @param nodesRequired nodes required count
* @return true if we have enough space.
*/
@VisibleForTesting
boolean meetCriteria(DatanodeDetails datanodeDetails, long heavyNodeLimit) {
return (nodeManager.getPipelinesCount(datanodeDetails) <= heavyNodeLimit);
boolean meetCriteria(DatanodeDetails datanodeDetails, int nodesRequired) {
if (heavyNodeCriteria == 0) {
// no limit applied.
return true;
}
// Datanodes from pipeline in some states can also be considered available
// for pipeline allocation. Thus the number of these pipeline shall be
// deducted from total heaviness calculation.
int pipelineNumDeductable = (int)stateManager.getPipelines(
HddsProtos.ReplicationType.RATIS,
HddsProtos.ReplicationFactor.valueOf(nodesRequired),
Pipeline.PipelineState.CLOSED)
.stream().filter(
p -> nodeManager.getPipelines(datanodeDetails).contains(p.getId()))
.count();
boolean meet = (nodeManager.getPipelinesCount(datanodeDetails)
- pipelineNumDeductable) < heavyNodeCriteria;
if (!meet) {
LOG.info("Pipeline Placement: can't place more pipeline on heavy " +
"datanode: " + datanodeDetails.getUuid().toString() + " Heaviness: " +
nodeManager.getPipelinesCount(datanodeDetails) + " limit: " +
heavyNodeCriteria);
}
return meet;
}

/**
Expand All @@ -102,33 +128,37 @@ List<DatanodeDetails> filterViableNodes(
if (excludedNodes != null) {
healthyNodes.removeAll(excludedNodes);
}
int initialHealthyNodesCount = healthyNodes.size();
String msg;
if (healthyNodes.size() == 0) {
if (initialHealthyNodesCount == 0) {
msg = "No healthy node found to allocate pipeline.";
LOG.error(msg);
throw new SCMException(msg, SCMException.ResultCodes
.FAILED_TO_FIND_HEALTHY_NODES);
}

if (healthyNodes.size() < nodesRequired) {
if (initialHealthyNodesCount < nodesRequired) {
msg = String.format("Not enough healthy nodes to allocate pipeline. %d "
+ " datanodes required. Found %d",
nodesRequired, healthyNodes.size());
nodesRequired, initialHealthyNodesCount);
LOG.error(msg);
throw new SCMException(msg,
SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE);
}

// filter nodes that meet the size and pipeline engagement criteria.
// Pipeline placement doesn't take node space left into account.
List<DatanodeDetails> healthyList = healthyNodes.stream().filter(d ->
meetCriteria(d, heavyNodeCriteria)).collect(Collectors.toList());
List<DatanodeDetails> healthyList = healthyNodes.stream()
.filter(d -> meetCriteria(d, nodesRequired)).limit(nodesRequired)
.collect(Collectors.toList());

if (healthyList.size() < nodesRequired) {
msg = String.format("Unable to find enough nodes that meet " +
"the criteria that cannot engage in more than %d pipelines." +
" Nodes required: %d Found: %d",
heavyNodeCriteria, nodesRequired, healthyList.size());
" Nodes required: %d Found: %d, healthy nodes count in " +
"NodeManager: %d.",
heavyNodeCriteria, nodesRequired, healthyList.size(),
initialHealthyNodesCount);
LOG.error(msg);
throw new SCMException(msg,
SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE);
Expand All @@ -155,12 +185,10 @@ public List<DatanodeDetails> chooseDatanodes(
List<DatanodeDetails> healthyNodes =
filterViableNodes(excludedNodes, nodesRequired);

// Randomly picks nodes when all nodes are equal.
// Randomly picks nodes when all nodes are equal or factor is ONE.
// This happens when network topology is absent or
// all nodes are on the same rack.
if (checkAllNodesAreEqual(nodeManager.getClusterNetworkTopologyMap())) {
LOG.info("All nodes are considered equal. Now randomly pick nodes. " +
"Required nodes: {}", nodesRequired);
return super.getResultSet(nodesRequired, healthyNodes);
} else {
// Since topology and rack awareness are available, picks nodes
Expand Down Expand Up @@ -188,8 +216,8 @@ public List<DatanodeDetails> getResultSet(
// First choose an anchor nodes randomly
DatanodeDetails anchor = chooseNode(healthyNodes);
if (anchor == null) {
LOG.error("Unable to find the first healthy nodes that " +
"meet the criteria. Required nodes: {}, Found nodes: {}",
LOG.error("Pipeline Placement: Unable to find the first healthy nodes " +
"that meet the criteria. Required nodes: {}, Found nodes: {}",
nodesRequired, results.size());
throw new SCMException("Unable to find required number of nodes.",
SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE);
Expand All @@ -204,8 +232,8 @@ public List<DatanodeDetails> getResultSet(
healthyNodes, exclude,
nodeManager.getClusterNetworkTopologyMap(), anchor);
if (nodeOnDifferentRack == null) {
LOG.error("Unable to find nodes on different racks that " +
"meet the criteria. Required nodes: {}, Found nodes: {}",
LOG.error("Pipeline Placement: Unable to find nodes on different racks " +
" that meet the criteria. Required nodes: {}, Found nodes: {}",
nodesRequired, results.size());
throw new SCMException("Unable to find required number of nodes.",
SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE);
Expand All @@ -228,9 +256,9 @@ public List<DatanodeDetails> getResultSet(
}

if (results.size() < nodesRequired) {
LOG.error("Unable to find the required number of healthy nodes that " +
"meet the criteria. Required nodes: {}, Found nodes: {}",
nodesRequired, results.size());
LOG.error("Pipeline Placement: Unable to find the required number of " +
"healthy nodes that meet the criteria. Required nodes: {}, " +
"Found nodes: {}", nodesRequired, results.size());
throw new SCMException("Unable to find required number of nodes.",
SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@

import java.io.IOException;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.function.Predicate;
import java.util.stream.Collectors;
Expand All @@ -52,8 +53,8 @@ class PipelineStateMap {
PipelineStateMap() {

// TODO: Use TreeMap for range operations?
pipelineMap = new HashMap<>();
pipeline2container = new HashMap<>();
pipelineMap = new ConcurrentHashMap<>();
pipeline2container = new ConcurrentHashMap<>();
query2OpenPipelines = new HashMap<>();
initializeQueryMap();

Expand Down
Loading