Skip to content

Commit fd301ad

Browse files
authored
HBASE-26323 Introduce a Snapshot Procedure (#4115)
Signed-off-by: Duo Zhang <zhangduo@apache.org>
1 parent c248521 commit fd301ad

30 files changed

+2713
-334
lines changed

hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java

Lines changed: 65 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -1901,51 +1901,67 @@ public CompletableFuture<Void> snapshot(SnapshotDescription snapshotDesc) {
19011901
return failedFuture(e);
19021902
}
19031903
CompletableFuture<Void> future = new CompletableFuture<>();
1904-
final SnapshotRequest request = SnapshotRequest.newBuilder().setSnapshot(snapshot).build();
1905-
addListener(this.<Long> newMasterCaller()
1906-
.action((controller, stub) -> this.<SnapshotRequest, SnapshotResponse, Long> call(controller,
1907-
stub, request, (s, c, req, done) -> s.snapshot(c, req, done),
1908-
resp -> resp.getExpectedTimeout()))
1909-
.call(), (expectedTimeout, err) -> {
1904+
final SnapshotRequest request =
1905+
SnapshotRequest.newBuilder().setSnapshot(snapshot).setNonceGroup(ng.getNonceGroup())
1906+
.setNonce(ng.newNonce()).build();
1907+
addListener(this.<SnapshotResponse> newMasterCaller()
1908+
.action((controller, stub) ->
1909+
this.<SnapshotRequest, SnapshotResponse, SnapshotResponse> call(controller, stub,
1910+
request, (s, c, req, done) -> s.snapshot(c, req, done), resp -> resp))
1911+
.call(), (resp, err) -> {
19101912
if (err != null) {
19111913
future.completeExceptionally(err);
19121914
return;
19131915
}
1914-
TimerTask pollingTask = new TimerTask() {
1915-
int tries = 0;
1916-
long startTime = EnvironmentEdgeManager.currentTime();
1917-
long endTime = startTime + expectedTimeout;
1918-
long maxPauseTime = expectedTimeout / maxAttempts;
1919-
1920-
@Override
1921-
public void run(Timeout timeout) throws Exception {
1922-
if (EnvironmentEdgeManager.currentTime() < endTime) {
1923-
addListener(isSnapshotFinished(snapshotDesc), (done, err2) -> {
1924-
if (err2 != null) {
1925-
future.completeExceptionally(err2);
1926-
} else if (done) {
1927-
future.complete(null);
1928-
} else {
1929-
// retry again after pauseTime.
1930-
long pauseTime =
1931-
ConnectionUtils.getPauseTime(TimeUnit.NANOSECONDS.toMillis(pauseNs), ++tries);
1932-
pauseTime = Math.min(pauseTime, maxPauseTime);
1933-
AsyncConnectionImpl.RETRY_TIMER.newTimeout(this, pauseTime,
1934-
TimeUnit.MILLISECONDS);
1935-
}
1936-
});
1937-
} else {
1938-
future.completeExceptionally(
1939-
new SnapshotCreationException("Snapshot '" + snapshot.getName() +
1940-
"' wasn't completed in expectedTime:" + expectedTimeout + " ms", snapshotDesc));
1941-
}
1942-
}
1943-
};
1944-
AsyncConnectionImpl.RETRY_TIMER.newTimeout(pollingTask, 1, TimeUnit.MILLISECONDS);
1916+
waitSnapshotFinish(snapshotDesc, future, resp);
19451917
});
19461918
return future;
19471919
}
19481920

1921+
// This is for keeping compatibility with old implementation.
1922+
// If there is a procId field in the response, then the snapshot will be operated with a
1923+
// SnapshotProcedure, otherwise the snapshot will be coordinated by zk.
1924+
private void waitSnapshotFinish(SnapshotDescription snapshot,
1925+
CompletableFuture<Void> future, SnapshotResponse resp) {
1926+
if (resp.hasProcId()) {
1927+
getProcedureResult(resp.getProcId(), future, 0);
1928+
addListener(future, new SnapshotProcedureBiConsumer(snapshot.getTableName()));
1929+
} else {
1930+
long expectedTimeout = resp.getExpectedTimeout();
1931+
TimerTask pollingTask = new TimerTask() {
1932+
int tries = 0;
1933+
long startTime = EnvironmentEdgeManager.currentTime();
1934+
long endTime = startTime + expectedTimeout;
1935+
long maxPauseTime = expectedTimeout / maxAttempts;
1936+
1937+
@Override
1938+
public void run(Timeout timeout) throws Exception {
1939+
if (EnvironmentEdgeManager.currentTime() < endTime) {
1940+
addListener(isSnapshotFinished(snapshot), (done, err2) -> {
1941+
if (err2 != null) {
1942+
future.completeExceptionally(err2);
1943+
} else if (done) {
1944+
future.complete(null);
1945+
} else {
1946+
// retry again after pauseTime.
1947+
long pauseTime = ConnectionUtils
1948+
.getPauseTime(TimeUnit.NANOSECONDS.toMillis(pauseNs), ++tries);
1949+
pauseTime = Math.min(pauseTime, maxPauseTime);
1950+
AsyncConnectionImpl.RETRY_TIMER
1951+
.newTimeout(this, pauseTime, TimeUnit.MILLISECONDS);
1952+
}
1953+
});
1954+
} else {
1955+
future.completeExceptionally(new SnapshotCreationException(
1956+
"Snapshot '" + snapshot.getName() + "' wasn't completed in expectedTime:"
1957+
+ expectedTimeout + " ms", snapshot));
1958+
}
1959+
}
1960+
};
1961+
AsyncConnectionImpl.RETRY_TIMER.newTimeout(pollingTask, 1, TimeUnit.MILLISECONDS);
1962+
}
1963+
}
1964+
19491965
@Override
19501966
public CompletableFuture<Boolean> isSnapshotFinished(SnapshotDescription snapshot) {
19511967
return this
@@ -2800,6 +2816,18 @@ String getOperationType() {
28002816
}
28012817
}
28022818

2819+
private static class SnapshotProcedureBiConsumer extends TableProcedureBiConsumer {
2820+
SnapshotProcedureBiConsumer(TableName tableName) {
2821+
super(tableName);
2822+
}
2823+
2824+
@Override
2825+
String getOperationType() {
2826+
return "SNAPSHOT";
2827+
}
2828+
}
2829+
2830+
28032831
private static class ReplicationProcedureBiConsumer extends ProcedureBiConsumer {
28042832
private final String peerId;
28052833
private final Supplier<String> getOperation;

hbase-protocol-shaded/src/main/protobuf/server/master/Master.proto

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -437,10 +437,13 @@ message IsCleanerChoreEnabledResponse {
437437

438438
message SnapshotRequest {
439439
required SnapshotDescription snapshot = 1;
440+
optional uint64 nonce_group = 2 [default = 0];
441+
optional uint64 nonce = 3 [default = 0];
440442
}
441443

442444
message SnapshotResponse {
443445
required int64 expected_timeout = 1;
446+
optional int64 proc_id = 2;
444447
}
445448

446449
message GetCompletedSnapshotsRequest {

hbase-protocol-shaded/src/main/protobuf/server/master/MasterProcedure.proto

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -191,6 +191,45 @@ message RestoreParentToChildRegionsPair {
191191
required string child2_region_name = 3;
192192
}
193193

194+
enum SnapshotState {
195+
SNAPSHOT_PREPARE = 1;
196+
SNAPSHOT_PRE_OPERATION = 2;
197+
SNAPSHOT_WRITE_SNAPSHOT_INFO = 3;
198+
SNAPSHOT_SNAPSHOT_ONLINE_REGIONS = 4;
199+
SNAPSHOT_SNAPSHOT_SPLIT_REGIONS = 5;
200+
SNAPSHOT_SNAPSHOT_CLOSED_REGIONS = 6;
201+
SNAPSHOT_SNAPSHOT_MOB_REGION = 7;
202+
SNAPSHOT_CONSOLIDATE_SNAPSHOT = 8;
203+
SNAPSHOT_VERIFIER_SNAPSHOT = 9;
204+
SNAPSHOT_COMPLETE_SNAPSHOT = 10;
205+
SNAPSHOT_POST_OPERATION = 11;
206+
}
207+
208+
message SnapshotProcedureStateData {
209+
required SnapshotDescription snapshot = 1;
210+
}
211+
212+
message SnapshotRegionProcedureStateData {
213+
required RegionInfo region = 1;
214+
required SnapshotDescription snapshot = 2;
215+
}
216+
217+
message SnapshotRegionParameter {
218+
required RegionInfo region = 1;
219+
required SnapshotDescription snapshot = 2;
220+
}
221+
222+
message SnapshotVerifyProcedureStateData {
223+
required SnapshotDescription snapshot = 1;
224+
required RegionInfo region = 2;
225+
optional ServerName target_server = 3;
226+
}
227+
228+
message SnapshotVerifyParameter {
229+
required SnapshotDescription snapshot = 1;
230+
required RegionInfo region = 2;
231+
}
232+
194233
enum CloneSnapshotState {
195234
CLONE_SNAPSHOT_PRE_OPERATION = 1;
196235
CLONE_SNAPSHOT_WRITE_FS_LAYOUT = 2;

hbase-server/src/main/java/org/apache/hadoop/hbase/executor/EventType.java

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -301,7 +301,21 @@ public enum EventType {
301301
*
302302
* RS_CLAIM_REPLICATION_QUEUE
303303
*/
304-
RS_CLAIM_REPLICATION_QUEUE(86, ExecutorType.RS_CLAIM_REPLICATION_QUEUE);
304+
RS_CLAIM_REPLICATION_QUEUE(86, ExecutorType.RS_CLAIM_REPLICATION_QUEUE),
305+
306+
/**
307+
* RS snapshot regions.<br>
308+
*
309+
* RS_SNAPSHOT_REGIONS
310+
*/
311+
RS_SNAPSHOT_REGIONS(87, ExecutorType.RS_SNAPSHOT_OPERATIONS),
312+
313+
/**
314+
* RS verify snapshot.<br>
315+
*
316+
* RS_VERIFY_SNAPSHOT
317+
*/
318+
RS_VERIFY_SNAPSHOT(88, ExecutorType.RS_SNAPSHOT_OPERATIONS);
305319

306320
private final int code;
307321
private final ExecutorType executor;

hbase-server/src/main/java/org/apache/hadoop/hbase/executor/ExecutorType.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,8 @@ public enum ExecutorType {
5353
RS_REPLAY_SYNC_REPLICATION_WAL(32),
5454
RS_SWITCH_RPC_THROTTLE(33),
5555
RS_IN_MEMORY_COMPACTION(34),
56-
RS_CLAIM_REPLICATION_QUEUE(35);
56+
RS_CLAIM_REPLICATION_QUEUE(35),
57+
RS_SNAPSHOT_OPERATIONS(36);
5758

5859
ExecutorType(int value) {
5960
}

hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterCoprocessorHost.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1092,8 +1092,8 @@ public void call(MasterObserver observer) throws IOException {
10921092
}
10931093

10941094
public void preSnapshot(final SnapshotDescription snapshot,
1095-
final TableDescriptor hTableDescriptor) throws IOException {
1096-
execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() {
1095+
final TableDescriptor hTableDescriptor, final User user) throws IOException {
1096+
execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation(user) {
10971097
@Override
10981098
public void call(MasterObserver observer) throws IOException {
10991099
observer.preSnapshot(this, snapshot, hTableDescriptor);
@@ -1102,8 +1102,8 @@ public void call(MasterObserver observer) throws IOException {
11021102
}
11031103

11041104
public void postSnapshot(final SnapshotDescription snapshot,
1105-
final TableDescriptor hTableDescriptor) throws IOException {
1106-
execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() {
1105+
final TableDescriptor hTableDescriptor, final User user) throws IOException {
1106+
execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation(user) {
11071107
@Override
11081108
public void call(MasterObserver observer) throws IOException {
11091109
observer.postSnapshot(this, snapshot, hTableDescriptor);

hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1736,12 +1736,25 @@ public SnapshotResponse snapshot(RpcController controller,
17361736
// get the snapshot information
17371737
SnapshotDescription snapshot = SnapshotDescriptionUtils.validate(
17381738
request.getSnapshot(), server.getConfiguration());
1739-
server.snapshotManager.takeSnapshot(snapshot);
1740-
17411739
// send back the max amount of time the client should wait for the snapshot to complete
17421740
long waitTime = SnapshotDescriptionUtils.getMaxMasterTimeout(server.getConfiguration(),
17431741
snapshot.getType(), SnapshotDescriptionUtils.DEFAULT_MAX_WAIT_TIME);
1744-
return SnapshotResponse.newBuilder().setExpectedTimeout(waitTime).build();
1742+
1743+
SnapshotResponse.Builder builder = SnapshotResponse.newBuilder().setExpectedTimeout(waitTime);
1744+
1745+
// If there is nonce group and nonce in the snapshot request, then the client can
1746+
// handle snapshot procedure procId. And if enable the snapshot procedure, we
1747+
// will do the snapshot work with proc-v2, otherwise we will fall back to zk proc.
1748+
if (request.hasNonceGroup() && request.hasNonce() &&
1749+
server.snapshotManager.snapshotProcedureEnabled()) {
1750+
long nonceGroup = request.getNonceGroup();
1751+
long nonce = request.getNonce();
1752+
long procId = server.snapshotManager.takeSnapshot(snapshot, nonceGroup, nonce);
1753+
return builder.setProcId(procId).build();
1754+
} else {
1755+
server.snapshotManager.takeSnapshot(snapshot);
1756+
return builder.build();
1757+
}
17451758
} catch (ForeignException e) {
17461759
throw new ServiceException(e.getCause());
17471760
} catch (IOException e) {

hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitWALManager.java

Lines changed: 4 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,7 @@
2323
import java.io.IOException;
2424
import java.util.Arrays;
2525
import java.util.Collections;
26-
import java.util.HashMap;
2726
import java.util.List;
28-
import java.util.Map;
2927
import java.util.Optional;
3028
import java.util.stream.Collectors;
3129
import org.apache.hadoop.conf.Configuration;
@@ -73,7 +71,7 @@ public class SplitWALManager {
7371
private static final Logger LOG = LoggerFactory.getLogger(SplitWALManager.class);
7472

7573
private final MasterServices master;
76-
private final SplitWorkerAssigner splitWorkerAssigner;
74+
private final WorkerAssigner splitWorkerAssigner;
7775
private final Path rootDir;
7876
private final FileSystem fs;
7977
private final Configuration conf;
@@ -82,8 +80,9 @@ public class SplitWALManager {
8280
public SplitWALManager(MasterServices master) throws IOException {
8381
this.master = master;
8482
this.conf = master.getConfiguration();
85-
this.splitWorkerAssigner = new SplitWorkerAssigner(this.master,
86-
conf.getInt(HBASE_SPLIT_WAL_MAX_SPLITTER, DEFAULT_HBASE_SPLIT_WAL_MAX_SPLITTER));
83+
this.splitWorkerAssigner = new WorkerAssigner(this.master,
84+
conf.getInt(HBASE_SPLIT_WAL_MAX_SPLITTER, DEFAULT_HBASE_SPLIT_WAL_MAX_SPLITTER),
85+
new ProcedureEvent<>("split-WAL-worker-assigning"));
8786
this.rootDir = master.getMasterFileSystem().getWALRootDir();
8887
this.fs = master.getMasterFileSystem().getWALFileSystem();
8988
this.walArchiveDir = new Path(this.rootDir, HConstants.HREGION_OLDLOGDIR_NAME);
@@ -189,67 +188,4 @@ public void releaseSplitWALWorker(ServerName worker, MasterProcedureScheduler sc
189188
public void addUsedSplitWALWorker(ServerName worker){
190189
splitWorkerAssigner.addUsedWorker(worker);
191190
}
192-
193-
/**
194-
* help assign and release a worker for each WAL splitting task
195-
* For each worker, concurrent running splitting task should be no more than maxSplitTasks
196-
* If a task failed to acquire a worker, it will suspend and wait for workers available
197-
*
198-
*/
199-
private static final class SplitWorkerAssigner implements ServerListener {
200-
private int maxSplitTasks;
201-
private final ProcedureEvent<?> event;
202-
private Map<ServerName, Integer> currentWorkers = new HashMap<>();
203-
private MasterServices master;
204-
205-
public SplitWorkerAssigner(MasterServices master, int maxSplitTasks) {
206-
this.maxSplitTasks = maxSplitTasks;
207-
this.master = master;
208-
this.event = new ProcedureEvent<>("split-WAL-worker-assigning");
209-
// ServerManager might be null in a test context where we are mocking; allow for this
210-
ServerManager sm = this.master.getServerManager();
211-
if (sm != null) {
212-
sm.registerListener(this);
213-
}
214-
}
215-
216-
public synchronized Optional<ServerName> acquire() {
217-
List<ServerName> serverList = master.getServerManager().getOnlineServersList();
218-
Collections.shuffle(serverList);
219-
Optional<ServerName> worker = serverList.stream().filter(
220-
serverName -> !currentWorkers.containsKey(serverName) || currentWorkers.get(serverName) > 0)
221-
.findAny();
222-
if (worker.isPresent()) {
223-
currentWorkers.compute(worker.get(), (serverName,
224-
availableWorker) -> availableWorker == null ? maxSplitTasks - 1 : availableWorker - 1);
225-
}
226-
return worker;
227-
}
228-
229-
public synchronized void release(ServerName serverName) {
230-
currentWorkers.compute(serverName, (k, v) -> v == null ? null : v + 1);
231-
}
232-
233-
public void suspend(Procedure<?> proc) {
234-
event.suspend();
235-
event.suspendIfNotReady(proc);
236-
}
237-
238-
public void wake(MasterProcedureScheduler scheduler) {
239-
if (!event.isReady()) {
240-
event.wake(scheduler);
241-
}
242-
}
243-
244-
@Override
245-
public void serverAdded(ServerName worker) {
246-
this.wake(master.getMasterProcedureExecutor().getEnvironment().getProcedureScheduler());
247-
}
248-
249-
public synchronized void addUsedWorker(ServerName worker) {
250-
// load used worker when master restart
251-
currentWorkers.compute(worker, (serverName,
252-
availableWorker) -> availableWorker == null ? maxSplitTasks - 1 : availableWorker - 1);
253-
}
254-
}
255191
}

0 commit comments

Comments
 (0)