Skip to content

Commit

Permalink
HDFS-16481. Provide support to set Http and Rpc ports in MiniJournalC…
Browse files Browse the repository at this point in the history
…luster (apache#4028). Contributed by Viraj Jasani.
  • Loading branch information
virajjasani authored and lgh committed Dec 5, 2023
1 parent eb184f6 commit e1e927e
Show file tree
Hide file tree
Showing 3 changed files with 161 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1035,6 +1035,32 @@ public static int getFreeSocketPort() {
return port;
}

/**
* Return free ports. There is no guarantee they will remain free, so
* ports should be used immediately. The number of free ports returned by
* this method should match argument {@code numOfPorts}. Num of ports
* provided in the argument should not exceed 25.
*
* @param numOfPorts Number of free ports to acquire.
* @return Free ports for binding a local socket.
*/
public static Set<Integer> getFreeSocketPorts(int numOfPorts) {
Preconditions.checkArgument(numOfPorts > 0 && numOfPorts <= 25,
"Valid range for num of ports is between 0 and 26");
final Set<Integer> freePorts = new HashSet<>(numOfPorts);
for (int i = 0; i < numOfPorts * 5; i++) {
int port = getFreeSocketPort();
if (port == 0) {
continue;
}
freePorts.add(port);
if (freePorts.size() == numOfPorts) {
return freePorts;
}
}
throw new IllegalStateException(numOfPorts + " free ports could not be acquired.");
}

/**
* Return an @{@link InetAddress} to bind to. If bindWildCardAddress is true
* than returns null.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static org.apache.hadoop.hdfs.qjournal.QJMTestUtil.FAKE_NSINFO;
import static org.junit.Assert.fail;

import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
Expand All @@ -44,13 +45,16 @@
import org.apache.hadoop.thirdparty.com.google.common.collect.Lists;
import org.apache.hadoop.test.GenericTestUtils;

public class MiniJournalCluster {
public final class MiniJournalCluster implements Closeable {

public static final String CLUSTER_WAITACTIVE_URI = "waitactive";
public static class Builder {
private String baseDir;
private int numJournalNodes = 3;
private boolean format = true;
private final Configuration conf;
private int[] httpPorts = null;
private int[] rpcPorts = null;

static {
DefaultMetricsSystem.setMiniClusterMode(true);
Expand All @@ -75,6 +79,16 @@ public Builder format(boolean f) {
return this;
}

public Builder setHttpPorts(int... ports) {
this.httpPorts = ports;
return this;
}

public Builder setRpcPorts(int... ports) {
this.rpcPorts = ports;
return this;
}

public MiniJournalCluster build() throws IOException {
return new MiniJournalCluster(this);
}
Expand All @@ -98,6 +112,19 @@ private JNInfo(JournalNode node) {
private final JNInfo[] nodes;

private MiniJournalCluster(Builder b) throws IOException {

if (b.httpPorts != null && b.httpPorts.length != b.numJournalNodes) {
throw new IllegalArgumentException(
"Num of http ports (" + b.httpPorts.length + ") should match num of JournalNodes ("
+ b.numJournalNodes + ")");
}

if (b.rpcPorts != null && b.rpcPorts.length != b.numJournalNodes) {
throw new IllegalArgumentException(
"Num of rpc ports (" + b.rpcPorts.length + ") should match num of JournalNodes ("
+ b.numJournalNodes + ")");
}

LOG.info("Starting MiniJournalCluster with " +
b.numJournalNodes + " journal nodes");

Expand Down Expand Up @@ -172,8 +199,10 @@ private Configuration createConfForNode(Builder b, int idx) {
Configuration conf = new Configuration(b.conf);
File logDir = getStorageDir(idx);
conf.set(DFSConfigKeys.DFS_JOURNALNODE_EDITS_DIR_KEY, logDir.toString());
conf.set(DFSConfigKeys.DFS_JOURNALNODE_RPC_ADDRESS_KEY, "localhost:0");
conf.set(DFSConfigKeys.DFS_JOURNALNODE_HTTP_ADDRESS_KEY, "localhost:0");
int httpPort = b.httpPorts != null ? b.httpPorts[idx] : 0;
int rpcPort = b.rpcPorts != null ? b.rpcPorts[idx] : 0;
conf.set(DFSConfigKeys.DFS_JOURNALNODE_RPC_ADDRESS_KEY, "localhost:" + rpcPort);
conf.set(DFSConfigKeys.DFS_JOURNALNODE_HTTP_ADDRESS_KEY, "localhost:" + httpPort);
return conf;
}

Expand Down Expand Up @@ -273,4 +302,10 @@ public void setNamenodeSharedEditsConf(String jid) {
.DFS_NAMENODE_SHARED_EDITS_DIR_KEY, quorumJournalURI.toString());
}
}

@Override
public void close() throws IOException {
this.shutdown();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,23 @@
import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.util.Set;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.qjournal.server.JournalNode;
import org.junit.Test;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.test.LambdaTestUtils;

import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TestMiniJournalCluster {

private static final Logger LOG = LoggerFactory.getLogger(TestMiniJournalCluster.class);

@Test
public void testStartStop() throws IOException {
Configuration conf = new Configuration();
Expand All @@ -52,4 +60,92 @@ public void testStartStop() throws IOException {
c.shutdown();
}
}

@Test
public void testStartStopWithPorts() throws Exception {
Configuration conf = new Configuration();

LambdaTestUtils.intercept(
IllegalArgumentException.class,
"Num of http ports (1) should match num of JournalNodes (3)",
"MiniJournalCluster port validation failed",
() -> {
new MiniJournalCluster.Builder(conf).setHttpPorts(8481).build();
});

LambdaTestUtils.intercept(
IllegalArgumentException.class,
"Num of rpc ports (2) should match num of JournalNodes (3)",
"MiniJournalCluster port validation failed",
() -> {
new MiniJournalCluster.Builder(conf).setRpcPorts(8481, 8482).build();
});

LambdaTestUtils.intercept(
IllegalArgumentException.class,
"Num of rpc ports (1) should match num of JournalNodes (3)",
"MiniJournalCluster port validation failed",
() -> {
new MiniJournalCluster.Builder(conf).setHttpPorts(800, 9000, 10000).setRpcPorts(8481)
.build();
});

LambdaTestUtils.intercept(
IllegalArgumentException.class,
"Num of http ports (4) should match num of JournalNodes (3)",
"MiniJournalCluster port validation failed",
() -> {
new MiniJournalCluster.Builder(conf).setHttpPorts(800, 9000, 1000, 2000)
.setRpcPorts(8481, 8482, 8483).build();
});

final Set<Integer> httpAndRpcPorts = NetUtils.getFreeSocketPorts(6);
LOG.info("Free socket ports: {}", httpAndRpcPorts);

for (Integer httpAndRpcPort : httpAndRpcPorts) {
assertNotEquals("None of the acquired socket port should not be zero", 0,
httpAndRpcPort.intValue());
}

final int[] httpPorts = new int[3];
final int[] rpcPorts = new int[3];
int httpPortIdx = 0;
int rpcPortIdx = 0;
for (Integer httpAndRpcPort : httpAndRpcPorts) {
if (httpPortIdx < 3) {
httpPorts[httpPortIdx++] = httpAndRpcPort;
} else {
rpcPorts[rpcPortIdx++] = httpAndRpcPort;
}
}

LOG.info("Http ports selected: {}", httpPorts);
LOG.info("Rpc ports selected: {}", rpcPorts);

try (MiniJournalCluster miniJournalCluster = new MiniJournalCluster.Builder(conf)
.setHttpPorts(httpPorts)
.setRpcPorts(rpcPorts).build()) {
miniJournalCluster.waitActive();
URI uri = miniJournalCluster.getQuorumJournalURI("myjournal");
String[] addrs = uri.getAuthority().split(";");
assertEquals(3, addrs.length);

assertEquals(httpPorts[0], miniJournalCluster.getJournalNode(0).getHttpAddress().getPort());
assertEquals(httpPorts[1], miniJournalCluster.getJournalNode(1).getHttpAddress().getPort());
assertEquals(httpPorts[2], miniJournalCluster.getJournalNode(2).getHttpAddress().getPort());

assertEquals(rpcPorts[0],
miniJournalCluster.getJournalNode(0).getRpcServer().getAddress().getPort());
assertEquals(rpcPorts[1],
miniJournalCluster.getJournalNode(1).getRpcServer().getAddress().getPort());
assertEquals(rpcPorts[2],
miniJournalCluster.getJournalNode(2).getRpcServer().getAddress().getPort());

JournalNode node = miniJournalCluster.getJournalNode(0);
String dir = node.getConf().get(DFSConfigKeys.DFS_JOURNALNODE_EDITS_DIR_KEY);
assertEquals(new File(MiniDFSCluster.getBaseDirectory() + "journalnode-0").getAbsolutePath(),
dir);
}
}

}

0 comments on commit e1e927e

Please sign in to comment.