Skip to content

Commit 0f93663

Browse files
frostruanApache9
authored andcommitted
HBASE-26323 Introduce a Snapshot Procedure (#4115)
Signed-off-by: Duo Zhang <zhangduo@apache.org>
1 parent 9e1de13 commit 0f93663

30 files changed

+2719
-336
lines changed

hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java

Lines changed: 65 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -1871,51 +1871,67 @@ public CompletableFuture<Void> snapshot(SnapshotDescription snapshotDesc) {
18711871
return failedFuture(e);
18721872
}
18731873
CompletableFuture<Void> future = new CompletableFuture<>();
1874-
final SnapshotRequest request = SnapshotRequest.newBuilder().setSnapshot(snapshot).build();
1875-
addListener(this.<Long> newMasterCaller()
1876-
.action((controller, stub) -> this.<SnapshotRequest, SnapshotResponse, Long> call(controller,
1877-
stub, request, (s, c, req, done) -> s.snapshot(c, req, done),
1878-
resp -> resp.getExpectedTimeout()))
1879-
.call(), (expectedTimeout, err) -> {
1874+
final SnapshotRequest request =
1875+
SnapshotRequest.newBuilder().setSnapshot(snapshot).setNonceGroup(ng.getNonceGroup())
1876+
.setNonce(ng.newNonce()).build();
1877+
addListener(this.<SnapshotResponse> newMasterCaller()
1878+
.action((controller, stub) ->
1879+
this.<SnapshotRequest, SnapshotResponse, SnapshotResponse> call(controller, stub,
1880+
request, (s, c, req, done) -> s.snapshot(c, req, done), resp -> resp))
1881+
.call(), (resp, err) -> {
18801882
if (err != null) {
18811883
future.completeExceptionally(err);
18821884
return;
18831885
}
1884-
TimerTask pollingTask = new TimerTask() {
1885-
int tries = 0;
1886-
long startTime = EnvironmentEdgeManager.currentTime();
1887-
long endTime = startTime + expectedTimeout;
1888-
long maxPauseTime = expectedTimeout / maxAttempts;
1889-
1890-
@Override
1891-
public void run(Timeout timeout) throws Exception {
1892-
if (EnvironmentEdgeManager.currentTime() < endTime) {
1893-
addListener(isSnapshotFinished(snapshotDesc), (done, err2) -> {
1894-
if (err2 != null) {
1895-
future.completeExceptionally(err2);
1896-
} else if (done) {
1897-
future.complete(null);
1898-
} else {
1899-
// retry again after pauseTime.
1900-
long pauseTime =
1901-
ConnectionUtils.getPauseTime(TimeUnit.NANOSECONDS.toMillis(pauseNs), ++tries);
1902-
pauseTime = Math.min(pauseTime, maxPauseTime);
1903-
AsyncConnectionImpl.RETRY_TIMER.newTimeout(this, pauseTime,
1904-
TimeUnit.MILLISECONDS);
1905-
}
1906-
});
1907-
} else {
1908-
future.completeExceptionally(
1909-
new SnapshotCreationException("Snapshot '" + snapshot.getName() +
1910-
"' wasn't completed in expectedTime:" + expectedTimeout + " ms", snapshotDesc));
1911-
}
1912-
}
1913-
};
1914-
AsyncConnectionImpl.RETRY_TIMER.newTimeout(pollingTask, 1, TimeUnit.MILLISECONDS);
1886+
waitSnapshotFinish(snapshotDesc, future, resp);
19151887
});
19161888
return future;
19171889
}
19181890

1891+
// This is for keeping compatibility with old implementation.
1892+
// If there is a procId field in the response, then the snapshot will be operated with a
1893+
// SnapshotProcedure, otherwise the snapshot will be coordinated by zk.
1894+
private void waitSnapshotFinish(SnapshotDescription snapshot,
1895+
CompletableFuture<Void> future, SnapshotResponse resp) {
1896+
if (resp.hasProcId()) {
1897+
getProcedureResult(resp.getProcId(), future, 0);
1898+
addListener(future, new SnapshotProcedureBiConsumer(snapshot.getTableName()));
1899+
} else {
1900+
long expectedTimeout = resp.getExpectedTimeout();
1901+
TimerTask pollingTask = new TimerTask() {
1902+
int tries = 0;
1903+
long startTime = EnvironmentEdgeManager.currentTime();
1904+
long endTime = startTime + expectedTimeout;
1905+
long maxPauseTime = expectedTimeout / maxAttempts;
1906+
1907+
@Override
1908+
public void run(Timeout timeout) throws Exception {
1909+
if (EnvironmentEdgeManager.currentTime() < endTime) {
1910+
addListener(isSnapshotFinished(snapshot), (done, err2) -> {
1911+
if (err2 != null) {
1912+
future.completeExceptionally(err2);
1913+
} else if (done) {
1914+
future.complete(null);
1915+
} else {
1916+
// retry again after pauseTime.
1917+
long pauseTime = ConnectionUtils
1918+
.getPauseTime(TimeUnit.NANOSECONDS.toMillis(pauseNs), ++tries);
1919+
pauseTime = Math.min(pauseTime, maxPauseTime);
1920+
AsyncConnectionImpl.RETRY_TIMER
1921+
.newTimeout(this, pauseTime, TimeUnit.MILLISECONDS);
1922+
}
1923+
});
1924+
} else {
1925+
future.completeExceptionally(new SnapshotCreationException(
1926+
"Snapshot '" + snapshot.getName() + "' wasn't completed in expectedTime:"
1927+
+ expectedTimeout + " ms", snapshot));
1928+
}
1929+
}
1930+
};
1931+
AsyncConnectionImpl.RETRY_TIMER.newTimeout(pollingTask, 1, TimeUnit.MILLISECONDS);
1932+
}
1933+
}
1934+
19191935
@Override
19201936
public CompletableFuture<Boolean> isSnapshotFinished(SnapshotDescription snapshot) {
19211937
return this
@@ -2767,6 +2783,18 @@ String getOperationType() {
27672783
}
27682784
}
27692785

2786+
private static class SnapshotProcedureBiConsumer extends TableProcedureBiConsumer {
2787+
SnapshotProcedureBiConsumer(TableName tableName) {
2788+
super(tableName);
2789+
}
2790+
2791+
@Override
2792+
String getOperationType() {
2793+
return "SNAPSHOT";
2794+
}
2795+
}
2796+
2797+
27702798
private static class ReplicationProcedureBiConsumer extends ProcedureBiConsumer {
27712799
private final String peerId;
27722800
private final Supplier<String> getOperation;

hbase-protocol-shaded/src/main/protobuf/Master.proto

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -436,10 +436,13 @@ message IsCleanerChoreEnabledResponse {
436436

437437
message SnapshotRequest {
438438
required SnapshotDescription snapshot = 1;
439+
optional uint64 nonce_group = 2 [default = 0];
440+
optional uint64 nonce = 3 [default = 0];
439441
}
440442

441443
message SnapshotResponse {
442444
required int64 expected_timeout = 1;
445+
optional int64 proc_id = 2;
443446
}
444447

445448
message GetCompletedSnapshotsRequest {

hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -191,6 +191,45 @@ message RestoreParentToChildRegionsPair {
191191
required string child2_region_name = 3;
192192
}
193193

194+
enum SnapshotState {
195+
SNAPSHOT_PREPARE = 1;
196+
SNAPSHOT_PRE_OPERATION = 2;
197+
SNAPSHOT_WRITE_SNAPSHOT_INFO = 3;
198+
SNAPSHOT_SNAPSHOT_ONLINE_REGIONS = 4;
199+
SNAPSHOT_SNAPSHOT_SPLIT_REGIONS = 5;
200+
SNAPSHOT_SNAPSHOT_CLOSED_REGIONS = 6;
201+
SNAPSHOT_SNAPSHOT_MOB_REGION = 7;
202+
SNAPSHOT_CONSOLIDATE_SNAPSHOT = 8;
203+
SNAPSHOT_VERIFIER_SNAPSHOT = 9;
204+
SNAPSHOT_COMPLETE_SNAPSHOT = 10;
205+
SNAPSHOT_POST_OPERATION = 11;
206+
}
207+
208+
message SnapshotProcedureStateData {
209+
required SnapshotDescription snapshot = 1;
210+
}
211+
212+
message SnapshotRegionProcedureStateData {
213+
required RegionInfo region = 1;
214+
required SnapshotDescription snapshot = 2;
215+
}
216+
217+
message SnapshotRegionParameter {
218+
required RegionInfo region = 1;
219+
required SnapshotDescription snapshot = 2;
220+
}
221+
222+
message SnapshotVerifyProcedureStateData {
223+
required SnapshotDescription snapshot = 1;
224+
required RegionInfo region = 2;
225+
optional ServerName target_server = 3;
226+
}
227+
228+
message SnapshotVerifyParameter {
229+
required SnapshotDescription snapshot = 1;
230+
required RegionInfo region = 2;
231+
}
232+
194233
enum CloneSnapshotState {
195234
CLONE_SNAPSHOT_PRE_OPERATION = 1;
196235
CLONE_SNAPSHOT_WRITE_FS_LAYOUT = 2;

hbase-server/src/main/java/org/apache/hadoop/hbase/executor/EventType.java

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -294,7 +294,21 @@ public enum EventType {
294294
*
295295
* RS_CLAIM_REPLICATION_QUEUE
296296
*/
297-
RS_CLAIM_REPLICATION_QUEUE(86, ExecutorType.RS_CLAIM_REPLICATION_QUEUE);
297+
RS_CLAIM_REPLICATION_QUEUE(86, ExecutorType.RS_CLAIM_REPLICATION_QUEUE),
298+
299+
/**
300+
* RS snapshot regions.<br>
301+
*
302+
* RS_SNAPSHOT_REGIONS
303+
*/
304+
RS_SNAPSHOT_REGIONS(87, ExecutorType.RS_SNAPSHOT_OPERATIONS),
305+
306+
/**
307+
* RS verify snapshot.<br>
308+
*
309+
* RS_VERIFY_SNAPSHOT
310+
*/
311+
RS_VERIFY_SNAPSHOT(88, ExecutorType.RS_SNAPSHOT_OPERATIONS);
298312

299313
private final int code;
300314
private final ExecutorType executor;

hbase-server/src/main/java/org/apache/hadoop/hbase/executor/ExecutorType.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,8 @@ public enum ExecutorType {
5252
RS_REFRESH_PEER(31),
5353
RS_SWITCH_RPC_THROTTLE(33),
5454
RS_IN_MEMORY_COMPACTION(34),
55-
RS_CLAIM_REPLICATION_QUEUE(35);
55+
RS_CLAIM_REPLICATION_QUEUE(35),
56+
RS_SNAPSHOT_OPERATIONS(36);
5657

5758
ExecutorType(int value) {
5859
}

hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterCoprocessorHost.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1097,8 +1097,8 @@ public void call(MasterObserver observer) throws IOException {
10971097
}
10981098

10991099
public void preSnapshot(final SnapshotDescription snapshot,
1100-
final TableDescriptor hTableDescriptor) throws IOException {
1101-
execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() {
1100+
final TableDescriptor hTableDescriptor, final User user) throws IOException {
1101+
execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation(user) {
11021102
@Override
11031103
public void call(MasterObserver observer) throws IOException {
11041104
observer.preSnapshot(this, snapshot, hTableDescriptor);
@@ -1107,8 +1107,8 @@ public void call(MasterObserver observer) throws IOException {
11071107
}
11081108

11091109
public void postSnapshot(final SnapshotDescription snapshot,
1110-
final TableDescriptor hTableDescriptor) throws IOException {
1111-
execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() {
1110+
final TableDescriptor hTableDescriptor, final User user) throws IOException {
1111+
execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation(user) {
11121112
@Override
11131113
public void call(MasterObserver observer) throws IOException {
11141114
observer.postSnapshot(this, snapshot, hTableDescriptor);

hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1664,12 +1664,25 @@ public SnapshotResponse snapshot(RpcController controller,
16641664
// get the snapshot information
16651665
SnapshotDescription snapshot = SnapshotDescriptionUtils.validate(
16661666
request.getSnapshot(), master.getConfiguration());
1667-
master.snapshotManager.takeSnapshot(snapshot);
1668-
16691667
// send back the max amount of time the client should wait for the snapshot to complete
16701668
long waitTime = SnapshotDescriptionUtils.getMaxMasterTimeout(master.getConfiguration(),
16711669
snapshot.getType(), SnapshotDescriptionUtils.DEFAULT_MAX_WAIT_TIME);
1672-
return SnapshotResponse.newBuilder().setExpectedTimeout(waitTime).build();
1670+
1671+
SnapshotResponse.Builder builder = SnapshotResponse.newBuilder().setExpectedTimeout(waitTime);
1672+
1673+
// If there is nonce group and nonce in the snapshot request, then the client can
1674+
// handle snapshot procedure procId. And if enable the snapshot procedure, we
1675+
// will do the snapshot work with proc-v2, otherwise we will fall back to zk proc.
1676+
if (request.hasNonceGroup() && request.hasNonce() &&
1677+
master.snapshotManager.snapshotProcedureEnabled()) {
1678+
long nonceGroup = request.getNonceGroup();
1679+
long nonce = request.getNonce();
1680+
long procId = master.snapshotManager.takeSnapshot(snapshot, nonceGroup, nonce);
1681+
return builder.setProcId(procId).build();
1682+
} else {
1683+
master.snapshotManager.takeSnapshot(snapshot);
1684+
return builder.build();
1685+
}
16731686
} catch (ForeignException e) {
16741687
throw new ServiceException(e.getCause());
16751688
} catch (IOException e) {

hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitWALManager.java

Lines changed: 4 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,7 @@
2323
import java.io.IOException;
2424
import java.util.Arrays;
2525
import java.util.Collections;
26-
import java.util.HashMap;
2726
import java.util.List;
28-
import java.util.Map;
2927
import java.util.Optional;
3028
import java.util.stream.Collectors;
3129
import org.apache.hadoop.conf.Configuration;
@@ -73,7 +71,7 @@ public class SplitWALManager {
7371
private static final Logger LOG = LoggerFactory.getLogger(SplitWALManager.class);
7472

7573
private final MasterServices master;
76-
private final SplitWorkerAssigner splitWorkerAssigner;
74+
private final WorkerAssigner splitWorkerAssigner;
7775
private final Path rootDir;
7876
private final FileSystem fs;
7977
private final Configuration conf;
@@ -82,8 +80,9 @@ public class SplitWALManager {
8280
public SplitWALManager(MasterServices master) throws IOException {
8381
this.master = master;
8482
this.conf = master.getConfiguration();
85-
this.splitWorkerAssigner = new SplitWorkerAssigner(this.master,
86-
conf.getInt(HBASE_SPLIT_WAL_MAX_SPLITTER, DEFAULT_HBASE_SPLIT_WAL_MAX_SPLITTER));
83+
this.splitWorkerAssigner = new WorkerAssigner(this.master,
84+
conf.getInt(HBASE_SPLIT_WAL_MAX_SPLITTER, DEFAULT_HBASE_SPLIT_WAL_MAX_SPLITTER),
85+
new ProcedureEvent<>("split-WAL-worker-assigning"));
8786
this.rootDir = master.getMasterFileSystem().getWALRootDir();
8887
this.fs = master.getMasterFileSystem().getWALFileSystem();
8988
this.walArchiveDir = new Path(this.rootDir, HConstants.HREGION_OLDLOGDIR_NAME);
@@ -189,67 +188,4 @@ public void releaseSplitWALWorker(ServerName worker, MasterProcedureScheduler sc
189188
public void addUsedSplitWALWorker(ServerName worker){
190189
splitWorkerAssigner.addUsedWorker(worker);
191190
}
192-
193-
/**
194-
* help assign and release a worker for each WAL splitting task
195-
* For each worker, concurrent running splitting task should be no more than maxSplitTasks
196-
* If a task failed to acquire a worker, it will suspend and wait for workers available
197-
*
198-
*/
199-
private static final class SplitWorkerAssigner implements ServerListener {
200-
private int maxSplitTasks;
201-
private final ProcedureEvent<?> event;
202-
private Map<ServerName, Integer> currentWorkers = new HashMap<>();
203-
private MasterServices master;
204-
205-
public SplitWorkerAssigner(MasterServices master, int maxSplitTasks) {
206-
this.maxSplitTasks = maxSplitTasks;
207-
this.master = master;
208-
this.event = new ProcedureEvent<>("split-WAL-worker-assigning");
209-
// ServerManager might be null in a test context where we are mocking; allow for this
210-
ServerManager sm = this.master.getServerManager();
211-
if (sm != null) {
212-
sm.registerListener(this);
213-
}
214-
}
215-
216-
public synchronized Optional<ServerName> acquire() {
217-
List<ServerName> serverList = master.getServerManager().getOnlineServersList();
218-
Collections.shuffle(serverList);
219-
Optional<ServerName> worker = serverList.stream().filter(
220-
serverName -> !currentWorkers.containsKey(serverName) || currentWorkers.get(serverName) > 0)
221-
.findAny();
222-
if (worker.isPresent()) {
223-
currentWorkers.compute(worker.get(), (serverName,
224-
availableWorker) -> availableWorker == null ? maxSplitTasks - 1 : availableWorker - 1);
225-
}
226-
return worker;
227-
}
228-
229-
public synchronized void release(ServerName serverName) {
230-
currentWorkers.compute(serverName, (k, v) -> v == null ? null : v + 1);
231-
}
232-
233-
public void suspend(Procedure<?> proc) {
234-
event.suspend();
235-
event.suspendIfNotReady(proc);
236-
}
237-
238-
public void wake(MasterProcedureScheduler scheduler) {
239-
if (!event.isReady()) {
240-
event.wake(scheduler);
241-
}
242-
}
243-
244-
@Override
245-
public void serverAdded(ServerName worker) {
246-
this.wake(master.getMasterProcedureExecutor().getEnvironment().getProcedureScheduler());
247-
}
248-
249-
public synchronized void addUsedWorker(ServerName worker) {
250-
// load used worker when master restart
251-
currentWorkers.compute(worker, (serverName,
252-
availableWorker) -> availableWorker == null ? maxSplitTasks - 1 : availableWorker - 1);
253-
}
254-
}
255191
}

0 commit comments

Comments
 (0)