Skip to content

Commit

Permalink
HDFS-16481. Provide support to set Http and Rpc ports in MiniJournalC…
Browse files Browse the repository at this point in the history
…luster
  • Loading branch information
virajjasani committed Feb 24, 2022
1 parent c18b646 commit 948305f
Show file tree
Hide file tree
Showing 2 changed files with 96 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ public static class Builder {
private int numJournalNodes = 3;
private boolean format = true;
private final Configuration conf;
private int[] httpPorts = null;
private int[] rpcPorts = null;

static {
DefaultMetricsSystem.setMiniClusterMode(true);
Expand All @@ -76,6 +78,16 @@ public Builder format(boolean f) {
return this;
}

public Builder setHttpPorts(int... httpPorts) {
this.httpPorts = httpPorts;
return this;
}

public Builder setRpcPorts(int... rpcPorts) {
this.rpcPorts = rpcPorts;
return this;
}

public MiniJournalCluster build() throws IOException {
return new MiniJournalCluster(this);
}
Expand All @@ -99,6 +111,19 @@ private JNInfo(JournalNode node) {
private final JNInfo[] nodes;

private MiniJournalCluster(Builder b) throws IOException {

if (b.httpPorts != null && b.httpPorts.length != b.numJournalNodes) {
throw new IllegalArgumentException(
"Num of http ports (" + b.httpPorts.length + ") should match num of JournalNodes ("
+ b.numJournalNodes + ")");
}

if (b.rpcPorts != null && b.rpcPorts.length != b.numJournalNodes) {
throw new IllegalArgumentException(
"Num of rpc ports (" + b.rpcPorts.length + ") should match num of JournalNodes ("
+ b.numJournalNodes + ")");
}

LOG.info("Starting MiniJournalCluster with " +
b.numJournalNodes + " journal nodes");

Expand Down Expand Up @@ -173,8 +198,10 @@ private Configuration createConfForNode(Builder b, int idx) {
Configuration conf = new Configuration(b.conf);
File logDir = getStorageDir(idx);
conf.set(DFSConfigKeys.DFS_JOURNALNODE_EDITS_DIR_KEY, logDir.toString());
conf.set(DFSConfigKeys.DFS_JOURNALNODE_RPC_ADDRESS_KEY, "localhost:0");
conf.set(DFSConfigKeys.DFS_JOURNALNODE_HTTP_ADDRESS_KEY, "localhost:0");
int httpPort = b.httpPorts != null ? b.httpPorts[idx] : 0;
int rpcPort = b.rpcPorts != null ? b.rpcPorts[idx] : 0;
conf.set(DFSConfigKeys.DFS_JOURNALNODE_RPC_ADDRESS_KEY, "localhost:" + rpcPort);
conf.set(DFSConfigKeys.DFS_JOURNALNODE_HTTP_ADDRESS_KEY, "localhost:" + httpPort);
return conf;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@


public class TestMiniJournalCluster {

@Test
public void testStartStop() throws IOException {
Configuration conf = new Configuration();
Expand All @@ -52,4 +53,70 @@ public void testStartStop() throws IOException {
c.shutdown();
}
}

@Test
public void testStartStopWithPorts() throws IOException {
Configuration conf = new Configuration();

try {
new MiniJournalCluster.Builder(conf).setHttpPorts(8481).build();
fail("Should not reach here");
} catch (IllegalArgumentException e) {
assertEquals("Num of http ports (1) should match num of JournalNodes (3)", e.getMessage());
}

try {
new MiniJournalCluster.Builder(conf).setRpcPorts(8481, 8482)
.build();
fail("Should not reach here");
} catch (IllegalArgumentException e) {
assertEquals("Num of rpc ports (2) should match num of JournalNodes (3)", e.getMessage());
}

try {
new MiniJournalCluster.Builder(conf).setHttpPorts(800, 9000, 10000).setRpcPorts(8481)
.build();
fail("Should not reach here");
} catch (IllegalArgumentException e) {
assertEquals("Num of rpc ports (1) should match num of JournalNodes (3)", e.getMessage());
}

try {
new MiniJournalCluster.Builder(conf).setHttpPorts(800, 9000, 1000, 2000)
.setRpcPorts(8481, 8482, 8483)
.build();
fail("Should not reach here");
} catch (IllegalArgumentException e) {
assertEquals("Num of http ports (4) should match num of JournalNodes (3)", e.getMessage());
}

MiniJournalCluster miniJournalCluster =
new MiniJournalCluster.Builder(conf).setHttpPorts(8481, 8482, 8483)
.setRpcPorts(8491, 8492, 8493).build();
try {
miniJournalCluster.waitActive();
URI uri = miniJournalCluster.getQuorumJournalURI("myjournal");
String[] addrs = uri.getAuthority().split(";");
assertEquals(3, addrs.length);

assertEquals(8481, miniJournalCluster.getJournalNode(0).getHttpAddress().getPort());
assertEquals(8482, miniJournalCluster.getJournalNode(1).getHttpAddress().getPort());
assertEquals(8483, miniJournalCluster.getJournalNode(2).getHttpAddress().getPort());

assertEquals(8491,
miniJournalCluster.getJournalNode(0).getRpcServer().getAddress().getPort());
assertEquals(8492,
miniJournalCluster.getJournalNode(1).getRpcServer().getAddress().getPort());
assertEquals(8493,
miniJournalCluster.getJournalNode(2).getRpcServer().getAddress().getPort());

JournalNode node = miniJournalCluster.getJournalNode(0);
String dir = node.getConf().get(DFSConfigKeys.DFS_JOURNALNODE_EDITS_DIR_KEY);
assertEquals(new File(MiniDFSCluster.getBaseDirectory() + "journalnode-0").getAbsolutePath(),
dir);
} finally {
miniJournalCluster.shutdown();
}
}

}

0 comments on commit 948305f

Please sign in to comment.