Skip to content

HBASE-25830 HBaseCluster support CompactionServer for UTs (addendum) #3464

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jul 14, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,19 @@ public LocalHBaseCluster(final Configuration conf, final int noMasters,
LOG.debug("Setting RS InfoServer Port to random.");
conf.set(HConstants.REGIONSERVER_INFO_PORT, "0");
}
if (conf.getInt(HConstants.COMPACTION_SERVER_PORT, HConstants.DEFAULT_COMPACTION_SERVER_PORT)
== HConstants.DEFAULT_COMPACTION_SERVER_PORT) {
LOG.debug("Setting CompactionServer Port to random.");
conf.set(HConstants.COMPACTION_SERVER_PORT, "0");
}
// treat info ports special; expressly don't change '-1' (keep off)
// in case we make that the default behavior.
if (conf.getInt(HConstants.COMPACTION_SERVER_INFO_PORT, 0) != -1 && conf.getInt(
HConstants.COMPACTION_SERVER_INFO_PORT, HConstants.DEFAULT_COMPACTION_SERVER_INFOPORT)
== HConstants.DEFAULT_COMPACTION_SERVER_INFOPORT) {
LOG.debug("Setting CS InfoServer Port to random.");
conf.set(HConstants.COMPACTION_SERVER_INFO_PORT, "0");
}
if (conf.getInt(HConstants.MASTER_INFO_PORT, 0) != -1 &&
conf.getInt(HConstants.MASTER_INFO_PORT, HConstants.DEFAULT_MASTER_INFOPORT)
== HConstants.DEFAULT_MASTER_INFOPORT) {
Expand Down Expand Up @@ -308,6 +321,17 @@ public String waitOnRegionServer(int serverNumber) {
return waitOnRegionServer(regionServerThread);
}

/**
* Wait for the specified compaction server to stop. Removes this thread from list of running
* threads.
* @return Name of compaction server that just went down.
*/
public String waitOnCompactionServer(int serverNumber) {
JVMClusterUtil.CompactionServerThread regionServerThread =
this.compactionServerThreads.get(serverNumber);
return waitOnCompactionServer(regionServerThread);
}

/**
* Wait for the specified region server to stop. Removes this thread from list of running threads.
* @return Name of region server that just went down.
Expand All @@ -326,6 +350,25 @@ public String waitOnRegionServer(JVMClusterUtil.RegionServerThread rst) {
return rst.getName();
}

/**
* Wait for the specified compaction server to stop. Removes this thread from list of running
* threads.
* @return Name of compaction server that just went down.
*/
public String waitOnCompactionServer(JVMClusterUtil.CompactionServerThread cst) {
while (cst.isAlive()) {
try {
LOG.info("Waiting on " + cst.getCompactionServer().toString());
cst.join();
} catch (InterruptedException e) {
LOG.error("Interrupted while waiting for {} to finish. Retrying join", cst.getName(), e);
Thread.currentThread().interrupt();
}
}
compactionServerThreads.remove(cst);
return cst.getName();
}

/**
* @return the HMaster thread
*/
Expand Down Expand Up @@ -427,6 +470,18 @@ public void join() {
}
}
}
if (this.compactionServerThreads != null) {
for(Thread t: this.compactionServerThreads) {
if (t.isAlive()) {
try {
Threads.threadDumpingIsAlive(t);
} catch (InterruptedException e) {
LOG.debug("Interrupted", e);
}
}
}
}

}

@SuppressWarnings("unchecked")
Expand Down Expand Up @@ -491,7 +546,7 @@ public void startup() throws IOException {
* Shut down the mini HBase cluster
*/
public void shutdown() {
JVMClusterUtil.shutdown(this.masterThreads, this.regionThreads);
JVMClusterUtil.shutdown(this.masterThreads, this.regionThreads, this.compactionServerThreads);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -294,12 +294,64 @@ private static void waitForEvent(long millis, String action, Supplier<Boolean> c

}

private static <T extends Thread> boolean shutdown(final List<T> servers,
boolean wasInterrupted) {
final long maxTime = System.currentTimeMillis() + 30 * 1000;
// first try nicely.
for (Thread t : servers) {
long now = System.currentTimeMillis();
if (t.isAlive() && !wasInterrupted && now < maxTime) {
try {
t.join(maxTime - now);
} catch (InterruptedException e) {
LOG.info(
"Got InterruptedException on shutdown - " + "not waiting anymore on region server ends",
e);
wasInterrupted = true; // someone wants us to speed up.
}
}
}

// Let's try to interrupt the remaining threads if any.
for (int i = 0; i < 100; ++i) {
boolean atLeastOneLiveServer = false;
for (Thread t : servers) {
if (t.isAlive()) {
atLeastOneLiveServer = true;
try {
LOG.warn("{} remaining, give one more chance before interrupting",
t.getClass().getSimpleName());
t.join(1000);
} catch (InterruptedException e) {
wasInterrupted = true;
}
}
}
if (!atLeastOneLiveServer) {
break;
}
for (Thread t : servers) {
if (t.isAlive()) {
LOG.warn(
"{} taking too long to stop, interrupting; thread dump " + "if > 3 attempts: i=" + i,
t.getClass().getSimpleName());
if (i > 3) {
Threads.printThreadInfo(System.out, "Thread dump " + t.getName());
}
t.interrupt();
}
}
}
return wasInterrupted;
}

/**
* @param masters
* @param regionservers
*/
public static void shutdown(final List<MasterThread> masters,
final List<RegionServerThread> regionservers) {
final List<RegionServerThread> regionservers,
final List<CompactionServerThread> compactionServers) {
LOG.debug("Shutting down HBase Cluster");
if (masters != null) {
// Do backups first.
Expand Down Expand Up @@ -335,51 +387,18 @@ public static void shutdown(final List<MasterThread> masters,
}
}
boolean wasInterrupted = false;
final long maxTime = System.currentTimeMillis() + 30 * 1000;
if (regionservers != null) {
// first try nicely.
for (RegionServerThread t : regionservers) {
t.getRegionServer().stop("Shutdown requested");
}
for (RegionServerThread t : regionservers) {
long now = System.currentTimeMillis();
if (t.isAlive() && !wasInterrupted && now < maxTime) {
try {
t.join(maxTime - now);
} catch (InterruptedException e) {
LOG.info("Got InterruptedException on shutdown - " +
"not waiting anymore on region server ends", e);
wasInterrupted = true; // someone wants us to speed up.
}
}
}

// Let's try to interrupt the remaining threads if any.
for (int i = 0; i < 100; ++i) {
boolean atLeastOneLiveServer = false;
for (RegionServerThread t : regionservers) {
if (t.isAlive()) {
atLeastOneLiveServer = true;
try {
LOG.warn("RegionServerThreads remaining, give one more chance before interrupting");
t.join(1000);
} catch (InterruptedException e) {
wasInterrupted = true;
}
}
}
if (!atLeastOneLiveServer) break;
for (RegionServerThread t : regionservers) {
if (t.isAlive()) {
LOG.warn("RegionServerThreads taking too long to stop, interrupting; thread dump " +
"if > 3 attempts: i=" + i);
if (i > 3) {
Threads.printThreadInfo(System.out, "Thread dump " + t.getName());
}
t.interrupt();
}
}
wasInterrupted = shutdown(regionservers, wasInterrupted);
}
if (compactionServers != null) {
// first try nicely.
for (CompactionServerThread t : compactionServers) {
t.getCompactionServer().stop("Shutdown requested");
}
wasInterrupted = shutdown(compactionServers, wasInterrupted);
}

if (masters != null) {
Expand All @@ -398,12 +417,14 @@ public static void shutdown(final List<MasterThread> masters,
}
}
}
LOG.info("Shutdown of " +
((masters != null) ? masters.size() : "0") + " master(s) and " +
((regionservers != null) ? regionservers.size() : "0") +
" regionserver(s) " + (wasInterrupted ? "interrupted" : "complete"));
LOG.info("Shutdown of " + ((masters != null) ? masters.size() : "0") + " master(s) and "
+ ((regionservers != null) ? regionservers.size() : "0") + " regionserver(s) "
+ ((compactionServers != null) ? compactionServers.size() : "0") + " compactionServer(s) "
+ (wasInterrupted ? "interrupted" : "complete")

);

if (wasInterrupted){
if (wasInterrupted) {
Thread.currentThread().interrupt();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -436,6 +436,25 @@ public JVMClusterUtil.RegionServerThread startRegionServer()
return startRegionServer(newConf);
}

/**
* Starts a compaction server thread running
* @return New CompactionServerThread
*/
public JVMClusterUtil.CompactionServerThread startCompactionServer() throws IOException {
final Configuration configuration = HBaseConfiguration.create(conf);
User rsUser = HBaseTestingUtility.getDifferentUser(configuration, ".hfs." + index++);
JVMClusterUtil.CompactionServerThread t = null;
try {
t = hbaseCluster.addCompactionServer(configuration,
hbaseCluster.getCompactionServers().size(), rsUser);
t.start();
t.waitForServerOnline();
} catch (InterruptedException ie) {
throw new IOException("Interrupted adding compactionserver to cluster", ie);
}
return t;
}

private JVMClusterUtil.RegionServerThread startRegionServer(Configuration configuration)
throws IOException {
User rsUser =
Expand Down Expand Up @@ -502,6 +521,20 @@ public JVMClusterUtil.RegionServerThread stopRegionServer(int serverNumber) {
return stopRegionServer(serverNumber, true);
}

/**
* Shut down the specified compaction server cleanly
*
* @param serverNumber Used as index into a list.
* @return the compaction server that was stopped
*/
public JVMClusterUtil.CompactionServerThread stopCompactionServer(int serverNumber) {
JVMClusterUtil.CompactionServerThread server =
hbaseCluster.getCompactionServers().get(serverNumber);
LOG.info("Stopping " + server.toString());
server.getCompactionServer().stop("Stopping rs " + serverNumber);
return server;
}

/**
* Shut down the specified region server cleanly
*
Expand Down Expand Up @@ -557,6 +590,14 @@ public String waitOnRegionServer(final int serverNumber) {
return this.hbaseCluster.waitOnRegionServer(serverNumber);
}

/**
* Wait for the specified compaction server to stop. Removes this thread from list
* of running threads.
* @return Name of compaction server that just went down.
*/
public String waitOnCompactionServer(final int serverNumber) {
return this.hbaseCluster.waitOnCompactionServer(serverNumber);
}

/**
* Starts a master thread running
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,12 @@ public static void afterClass() throws Exception {

@Before
public void before() throws Exception {
TEST_UTIL.createTable(TABLENAME, FAMILY);
TableDescriptor tableDescriptor =
TableDescriptorBuilder.newBuilder(TABLENAME).setCompactionOffloadEnabled(true).build();
TEST_UTIL.createTable(tableDescriptor, Bytes.toByteArrays(FAMILY),
TEST_UTIL.getConfiguration());
TEST_UTIL.waitTableAvailable(TABLENAME);
COMPACTION_SERVER.requestCount.reset();
}

@After
Expand Down Expand Up @@ -169,9 +173,9 @@ public void testCompactionWithVersions() throws Exception {
TEST_UTIL.getAdmin().compactionSwitch(false, new ArrayList<>());
ColumnFamilyDescriptor cfd =
ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(FAMILY)).setMaxVersions(3).build();
TableDescriptor modifiedtableDescriptor =
TableDescriptorBuilder.newBuilder(TABLENAME).setColumnFamily(cfd).build();
TEST_UTIL.getAdmin().modifyTable(modifiedtableDescriptor);
TableDescriptor modifiedTableDescriptor = TableDescriptorBuilder.newBuilder(TABLENAME)
.setColumnFamily(cfd).setCompactionOffloadEnabled(true).build();
TEST_UTIL.getAdmin().modifyTable(modifiedTableDescriptor);
TEST_UTIL.waitTableAvailable(TABLENAME);
doFillRecord(1, 500, RandomUtils.nextBytes(20));
doFillRecord(1, 500, RandomUtils.nextBytes(20));
Expand All @@ -198,6 +202,8 @@ public void testCompactionWithVersions() throws Exception {
return hFileCount == 1;
});

// To ensure do compaction on compaction server
TEST_UTIL.waitFor(60000, () -> COMPACTION_SERVER.requestCount.sum() > 0);
kVCount = 0;
for (HRegion region : TEST_UTIL.getHBaseCluster().getRegions(TABLENAME)) {
for (HStoreFile hStoreFile : region.getStore(Bytes.toBytes(FAMILY)).getStorefiles()) {
Expand All @@ -208,11 +214,11 @@ public void testCompactionWithVersions() throws Exception {
verifyRecord(1, 500, true);
}


@Test
public void testCompactionServerDown() throws Exception {
TEST_UTIL.getAdmin().compactionSwitch(false, new ArrayList<>());
COMPACTION_SERVER.stop("test");
TEST_UTIL.getHBaseCluster().stopCompactionServer(0);
TEST_UTIL.getHBaseCluster().waitOnCompactionServer(0);
TEST_UTIL.waitFor(60000,
() -> MASTER.getCompactionOffloadManager().getOnlineServersList().size() == 0);
doPutRecord(1, 1000, true);
Expand All @@ -232,6 +238,12 @@ public void testCompactionServerDown() throws Exception {
return hFile == 1;
});
verifyRecord(1, 1000, true);
TEST_UTIL.getHBaseCluster().startCompactionServer();
COMPACTION_SERVER = TEST_UTIL.getMiniHBaseCluster().getCompactionServerThreads().get(0)
.getCompactionServer();
COMPACTION_SERVER_NAME = COMPACTION_SERVER.getServerName();
TEST_UTIL.waitFor(60000,
() -> MASTER.getCompactionOffloadManager().getOnlineServersList().size() == 1);
}

@Test
Expand Down Expand Up @@ -277,20 +289,19 @@ public void testCompactionOffloadTableDescriptor() throws Exception {

TableDescriptor htd =
TableDescriptorBuilder.newBuilder(TEST_UTIL.getAdmin().getDescriptor(TABLENAME))
.setCompactionOffloadEnabled(true).build();
.setCompactionOffloadEnabled(false).build();
TEST_UTIL.getAdmin().modifyTable(htd);
TEST_UTIL.waitUntilAllRegionsAssigned(TABLENAME);
// invoke compact
TEST_UTIL.compact(TABLENAME, false);
TEST_UTIL.waitFor(6000, () -> COMPACTION_SERVER.requestCount.sum() > 0);
long requestCount = COMPACTION_SERVER.requestCount.sum();
TEST_UTIL.waitFor(6000, () -> COMPACTION_SERVER.requestCount.sum() == 0);

htd = TableDescriptorBuilder.newBuilder(TEST_UTIL.getAdmin().getDescriptor(TABLENAME))
.setCompactionOffloadEnabled(false).build();
.setCompactionOffloadEnabled(true).build();
TEST_UTIL.getAdmin().modifyTable(htd);
TEST_UTIL.waitUntilAllRegionsAssigned(TABLENAME);
// invoke compact
TEST_UTIL.compact(TABLENAME, false);
TEST_UTIL.waitFor(6000, () -> COMPACTION_SERVER.requestCount.sum() == requestCount);
TEST_UTIL.waitFor(6000, () -> COMPACTION_SERVER.requestCount.sum() > 0);
}
}