Skip to content

Commit

Permalink
add parameter to balancer to specify target node list so balancer onl…
Browse files Browse the repository at this point in the history
…y moves blocks to those nodes
  • Loading branch information
Jtdellaringa committed Oct 21, 2024
1 parent e4b0700 commit c1bf31c
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,8 @@ public class Balancer {
+ "\tIncludes only the specified datanodes."
+ "\n\t[-source [-f <hosts-file> | <comma-separated list of hosts>]]"
+ "\tPick only the specified datanodes as source nodes."
+ "\n\t[-target [-f <hosts-file> | <comma-separated list of hosts>]]"
+ "\tPick only the specified datanodes as target nodes."
+ "\n\t[-blockpools <comma-separated list of blockpool ids>]"
+ "\tThe balancer will only run on blockpools included in this list."
+ "\n\t[-idleiterations <idleiterations>]"
Expand Down Expand Up @@ -222,6 +224,7 @@ public class Balancer {
private final NameNodeConnector nnc;
private final BalancingPolicy policy;
private final Set<String> sourceNodes;
private final Set<String> targetNodes;
private final boolean runDuringUpgrade;
private final double threshold;
private final long maxSizeToMove;
Expand Down Expand Up @@ -350,6 +353,7 @@ static int getFailedTimesSinceLastSuccessfulBalance() {
this.threshold = p.getThreshold();
this.policy = p.getBalancingPolicy();
this.sourceNodes = p.getSourceNodes();
this.targetNodes = p.getTargetNodes();
this.runDuringUpgrade = p.getRunDuringUpgrade();
this.sortTopNodes = p.getSortTopNodes();

Expand Down Expand Up @@ -408,6 +412,7 @@ private long init(List<DatanodeStorageReport> reports) {
for(DatanodeStorageReport r : reports) {
final DDatanode dn = dispatcher.newDatanode(r.getDatanodeInfo());
final boolean isSource = Util.isIncluded(sourceNodes, dn.getDatanodeInfo());
final boolean isTarget = Util.isIncluded(targetNodes, dn.getDatanodeInfo());
for(StorageType t : StorageType.getMovableTypes()) {
final Double utilization = policy.getUtilization(r, t);
if (utilization == null) { // datanode does not have such storage type
Expand All @@ -421,6 +426,12 @@ private long init(List<DatanodeStorageReport> reports) {
+ " but it is not specified as a source; skipping it.");
continue;
}
if (utilization <= average && !isTarget) {
LOG.info(dn + "[" + t + "] has utilization=" + utilization
+ " <= average=" + average
+ " but it is not specified as a target; skipping it.");
continue;
}

final double utilizationDiff = utilization - average;
final long capacity = getCapacity(r, t);
Expand Down Expand Up @@ -804,6 +815,7 @@ static private int doBalance(Collection<URI> namenodes,
LOG.info("included nodes = " + p.getIncludedNodes());
LOG.info("excluded nodes = " + p.getExcludedNodes());
LOG.info("source nodes = " + p.getSourceNodes());
LOG.info("target nodes = " + p.getTargetNodes());
checkKeytabAndInit(conf);
System.out.println("Time Stamp Iteration#"
+ " Bytes Already Moved Bytes Left To Move Bytes Being Moved"
Expand Down Expand Up @@ -1034,6 +1046,10 @@ static BalancerParameters parse(String[] args) {
Set<String> sourceNodes = new HashSet<>();
i = processHostList(args, i, "source", sourceNodes);
b.setSourceNodes(sourceNodes);
} else if ("-target".equalsIgnoreCase(args[i])) {
Set<String> targetNodes = new HashSet<>();
i = processHostList(args, i, "target", targetNodes);
b.setTargetNodes(targetNodes);
} else if ("-blockpools".equalsIgnoreCase(args[i])) {
Preconditions.checkArgument(
++i < args.length,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,11 @@ final class BalancerParameters {
* source nodes.
*/
private final Set<String> sourceNodes;
/**
* If empty, any node can be a target; otherwise, use only these nodes as
* target nodes.
*/
private final Set<String> targetNodes;
/**
* A set of block pools to run the balancer on.
*/
Expand All @@ -63,6 +68,7 @@ private BalancerParameters(Builder builder) {
this.excludedNodes = builder.excludedNodes;
this.includedNodes = builder.includedNodes;
this.sourceNodes = builder.sourceNodes;
this.targetNodes = builder.targetNodes;
this.blockpools = builder.blockpools;
this.runDuringUpgrade = builder.runDuringUpgrade;
this.runAsService = builder.runAsService;
Expand Down Expand Up @@ -94,6 +100,10 @@ Set<String> getSourceNodes() {
return this.sourceNodes;
}

Set<String> getTargetNodes() {
return this.targetNodes;
}

Set<String> getBlockPools() {
return this.blockpools;
}
Expand All @@ -119,12 +129,13 @@ public String toString() {
return String.format("%s.%s [%s," + " threshold = %s,"
+ " max idle iteration = %s," + " #excluded nodes = %s,"
+ " #included nodes = %s," + " #source nodes = %s,"
+ " #target nodes = %s,"
+ " #blockpools = %s," + " run during upgrade = %s,"
+ " sort top nodes = %s,"
+ " hot block time interval = %s]",
Balancer.class.getSimpleName(), getClass().getSimpleName(), policy,
threshold, maxIdleIteration, excludedNodes.size(),
includedNodes.size(), sourceNodes.size(), blockpools.size(),
includedNodes.size(), sourceNodes.size(), targetNodes.size(), blockpools.size(),
runDuringUpgrade, sortTopNodes, hotBlockTimeInterval);
}

Expand All @@ -137,6 +148,7 @@ static class Builder {
private Set<String> excludedNodes = Collections.<String> emptySet();
private Set<String> includedNodes = Collections.<String> emptySet();
private Set<String> sourceNodes = Collections.<String> emptySet();
private Set<String> targetNodes = Collections.<String> emptySet();
private Set<String> blockpools = Collections.<String> emptySet();
private boolean runDuringUpgrade = false;
private boolean runAsService = false;
Expand Down Expand Up @@ -181,6 +193,11 @@ Builder setSourceNodes(Set<String> nodes) {
return this;
}

Builder setTargetNodes(Set<String> nodes) {
this.targetNodes = nodes;
return this;
}

Builder setBlockpools(Set<String> pools) {
this.blockpools = pools;
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1247,6 +1247,14 @@ public void testBalancerCliParseWithWrongParams() {
} catch (IllegalArgumentException ignored) {
// expected
}

parameters = new String[] {"-target"};
try {
Balancer.Cli.parse(parameters);
fail(reason + " for -target parameter");
} catch (IllegalArgumentException ignored) {
// expected
}
}

@Test
Expand Down

0 comments on commit c1bf31c

Please sign in to comment.