Skip to content

Commit fa8bc25

Browse files
committed
HBASE-26080 Implement a new mini cluster class for end users (#3470)
Signed-off-by: Yulin Niu <niuyulin@apache.org>
1 parent 3294d32 commit fa8bc25

File tree

4 files changed

+707
-0
lines changed

4 files changed

+707
-0
lines changed
Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,141 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.hbase.testing;
19+
20+
import java.util.concurrent.CompletableFuture;
21+
import org.apache.hadoop.conf.Configuration;
22+
import org.apache.hadoop.hbase.ServerName;
23+
import org.apache.yetus.audience.InterfaceAudience;
24+
25+
/**
26+
* A mini hbase cluster used for testing.
27+
* <p/>
28+
* It will also start the necessary zookeeper cluster and dfs cluster. But we will not provide
29+
* methods for controlling the zookeeper cluster and dfs cluster, as end users do not need to test
30+
* the HBase behavior when these systems are broken.
31+
* <p/>
32+
* The implementation is not required to be thread safe, so do not call different methods
33+
* concurrently.
34+
*/
35+
@InterfaceAudience.Public
36+
public interface TestingHBaseCluster {
37+
38+
/**
39+
* Get configuration of this cluster.
40+
* <p/>
41+
* You could use the returned {@link Configuration} to create
42+
* {@link org.apache.hadoop.hbase.client.Connection} for accessing the testing cluster.
43+
*/
44+
Configuration getConf();
45+
46+
/**
47+
* Start a new master with localhost and random port.
48+
*/
49+
void startMaster() throws Exception;
50+
51+
/**
52+
* Start a new master bind on the given host and port.
53+
*/
54+
void startMaster(String hostname, int port) throws Exception;
55+
56+
/**
57+
* Stop the given master.
58+
* <p/>
59+
* Wait on the returned {@link CompletableFuture} to wait on the master quit. The differences
60+
* comparing to {@link org.apache.hadoop.hbase.client.Admin#stopMaster()} is that first, we could
61+
* also stop backup masters here, second, this method does not always fail since we do not use rpc
62+
* to stop the master.
63+
*/
64+
CompletableFuture<Void> stopMaster(ServerName serverName) throws Exception;
65+
66+
/**
67+
* Start a new region server with localhost and random port.
68+
*/
69+
void startRegionServer() throws Exception;
70+
71+
/**
72+
* Start a new region server bind on the given host and port.
73+
*/
74+
void startRegionServer(String hostname, int port) throws Exception;
75+
76+
/**
77+
* Stop the given region server.
78+
* <p/>
79+
* Wait on the returned {@link CompletableFuture} to wait on the master quit. The difference
80+
* comparing to {@link org.apache.hadoop.hbase.client.Admin#stopMaster()} is that this method does
81+
* not always fail since we do not use rpc to stop the region server.
82+
*/
83+
CompletableFuture<Void> stopRegionServer(ServerName serverName) throws Exception;
84+
85+
/**
86+
* Stop the hbase cluster.
87+
* <p/>
88+
* You need to call {@link #start()} first before calling this method, otherwise an
89+
* {@link IllegalStateException} will be thrown. If the hbase is not running because you have
90+
* already stopped the cluster, an {@link IllegalStateException} will be thrown too.
91+
*/
92+
void stopHBaseCluster() throws Exception;
93+
94+
/**
95+
* Start the hbase cluster.
96+
* <p/>
97+
* This is used to start the hbase cluster again after you call {@link #stopHBaseCluster()}. If
98+
* the cluster is already running or you have not called {@link #start()} yet, an
99+
* {@link IllegalStateException} will be thrown.
100+
*/
101+
void startHBaseCluster() throws Exception;
102+
103+
/**
104+
* Return whether the hbase cluster is running.
105+
*/
106+
boolean isHBaseClusterRunning();
107+
108+
/**
109+
* Start the whole mini cluster, including zookeeper cluster, dfs cluster and hbase cluster.
110+
* <p/>
111+
* You can only call this method once at the beginning, unless you have called {@link #stop()} to
112+
* shutdown the cluster completely, and then you can call this method to start the whole cluster
113+
* again. An {@link IllegalStateException} will be thrown if you call this method incorrectly.
114+
*/
115+
void start() throws Exception;
116+
117+
/**
118+
* Return whether the cluster is running.
119+
* <p/>
120+
* Notice that, this only means you have called {@link #start()} and have not called
121+
* {@link #stop()} yet. If you want to make sure the hbase cluster is running, use
122+
* {@link #isHBaseClusterRunning()}.
123+
*/
124+
boolean isClusterRunning();
125+
126+
/**
127+
* Stop the whole mini cluster, including zookeeper cluster, dfs cluster and hbase cluster.
128+
* <p/>
129+
* You can only call this method after calling {@link #start()}, otherwise an
130+
* {@link IllegalStateException} will be thrown.
131+
*/
132+
void stop() throws Exception;
133+
134+
/**
135+
* Create a {@link TestingHBaseCluster}. You need to call {@link #start()} of the returned
136+
* {@link TestingHBaseCluster} to actually start the mini testing cluster.
137+
*/
138+
static TestingHBaseCluster create(TestingHBaseClusterOption option) {
139+
return new TestingHBaseClusterImpl(option);
140+
}
141+
}
Lines changed: 174 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,174 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.hbase.testing;
19+
20+
import java.util.List;
21+
import java.util.concurrent.CompletableFuture;
22+
import java.util.concurrent.ExecutorService;
23+
import java.util.concurrent.Executors;
24+
import org.apache.hadoop.conf.Configuration;
25+
import org.apache.hadoop.hbase.HBaseTestingUtility;
26+
import org.apache.hadoop.hbase.ServerName;
27+
import org.apache.hadoop.hbase.StartMiniClusterOption;
28+
import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread;
29+
import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
30+
import org.apache.yetus.audience.InterfaceAudience;
31+
32+
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
33+
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
34+
35+
@InterfaceAudience.Private
36+
class TestingHBaseClusterImpl implements TestingHBaseCluster {
37+
38+
private final HBaseTestingUtility util = new HBaseTestingUtility();
39+
40+
private final StartMiniClusterOption option;
41+
42+
private final ExecutorService executor = Executors.newCachedThreadPool(new ThreadFactoryBuilder()
43+
.setNameFormat(getClass().getSuperclass() + "-%d").setDaemon(true).build());
44+
45+
private boolean miniClusterRunning = false;
46+
47+
private boolean miniHBaseClusterRunning = false;
48+
49+
TestingHBaseClusterImpl(TestingHBaseClusterOption option) {
50+
this.option = option.convert();
51+
}
52+
53+
@Override
54+
public Configuration getConf() {
55+
return util.getConfiguration();
56+
}
57+
58+
private int getRegionServerIndex(ServerName serverName) {
59+
// we have a small number of region servers, this should be fine for now.
60+
List<RegionServerThread> servers = util.getMiniHBaseCluster().getRegionServerThreads();
61+
for (int i = 0; i < servers.size(); i++) {
62+
if (servers.get(i).getRegionServer().getServerName().equals(serverName)) {
63+
return i;
64+
}
65+
}
66+
return -1;
67+
}
68+
69+
private int getMasterIndex(ServerName serverName) {
70+
List<MasterThread> masters = util.getMiniHBaseCluster().getMasterThreads();
71+
for (int i = 0; i < masters.size(); i++) {
72+
if (masters.get(i).getMaster().getServerName().equals(serverName)) {
73+
return i;
74+
}
75+
}
76+
return -1;
77+
}
78+
79+
private void join(Thread thread, CompletableFuture<?> future) {
80+
executor.execute(() -> {
81+
try {
82+
thread.join();
83+
future.complete(null);
84+
} catch (InterruptedException e) {
85+
future.completeExceptionally(e);
86+
}
87+
});
88+
}
89+
90+
@Override
91+
public CompletableFuture<Void> stopMaster(ServerName serverName) throws Exception {
92+
CompletableFuture<Void> future = new CompletableFuture<>();
93+
int index = getMasterIndex(serverName);
94+
if (index == -1) {
95+
future.completeExceptionally(new IllegalArgumentException("Unknown master " + serverName));
96+
}
97+
join(util.getMiniHBaseCluster().stopMaster(index), future);
98+
return future;
99+
}
100+
101+
@Override
102+
public CompletableFuture<Void> stopRegionServer(ServerName serverName) throws Exception {
103+
CompletableFuture<Void> future = new CompletableFuture<>();
104+
int index = getRegionServerIndex(serverName);
105+
if (index == -1) {
106+
future
107+
.completeExceptionally(new IllegalArgumentException("Unknown region server " + serverName));
108+
}
109+
join(util.getMiniHBaseCluster().stopRegionServer(index), future);
110+
return future;
111+
}
112+
113+
@Override
114+
public void stopHBaseCluster() throws Exception {
115+
Preconditions.checkState(miniClusterRunning, "Cluster has already been stopped");
116+
Preconditions.checkState(miniHBaseClusterRunning, "HBase cluster has already been started");
117+
util.shutdownMiniHBaseCluster();
118+
miniHBaseClusterRunning = false;
119+
}
120+
121+
@Override
122+
public void startHBaseCluster() throws Exception {
123+
Preconditions.checkState(miniClusterRunning, "Cluster has already been stopped");
124+
Preconditions.checkState(!miniHBaseClusterRunning, "HBase cluster has already been started");
125+
util.startMiniHBaseCluster(option);
126+
miniHBaseClusterRunning = true;
127+
}
128+
129+
@Override
130+
public void start() throws Exception {
131+
Preconditions.checkState(!miniClusterRunning, "Cluster has already been started");
132+
util.startMiniCluster(option);
133+
miniClusterRunning = true;
134+
miniHBaseClusterRunning = true;
135+
}
136+
137+
@Override
138+
public void stop() throws Exception {
139+
Preconditions.checkState(miniClusterRunning, "Cluster has already been stopped");
140+
util.shutdownMiniCluster();
141+
miniClusterRunning = false;
142+
miniHBaseClusterRunning = false;
143+
}
144+
145+
@Override
146+
public boolean isHBaseClusterRunning() {
147+
return miniHBaseClusterRunning;
148+
}
149+
150+
@Override
151+
public boolean isClusterRunning() {
152+
return miniClusterRunning;
153+
}
154+
155+
@Override
156+
public void startMaster() throws Exception {
157+
util.getMiniHBaseCluster().startMaster();
158+
}
159+
160+
@Override
161+
public void startMaster(String hostname, int port) throws Exception {
162+
util.getMiniHBaseCluster().startMaster(hostname, port);
163+
}
164+
165+
@Override
166+
public void startRegionServer() throws Exception {
167+
util.getMiniHBaseCluster().startRegionServer();
168+
}
169+
170+
@Override
171+
public void startRegionServer(String hostname, int port) throws Exception {
172+
util.getMiniHBaseCluster().startRegionServer(hostname, port);
173+
}
174+
}

0 commit comments

Comments
 (0)