Skip to content

HBASE-26323 introduce a SnapshotProcedure #3716

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1542,6 +1542,66 @@ default void snapshot(String snapshotName, TableName tableName,
void snapshot(SnapshotDescription snapshot)
throws IOException, SnapshotCreationException, IllegalArgumentException;

/**
* Take a snapshot and wait for the server to complete that snapshot (blocking). It's same as
* {@link org.apache.hadoop.hbase.client.Admin#snapshot(String, TableName)} for users. The
* difference between the two methods is that
* {@link org.apache.hadoop.hbase.client.Admin#snapshotTable(String, TableName)} is based on
* proc-v2.
* @param snapshotName name to give the snapshot on the filesystem. Must be unique from all other
* snapshots stored on the cluster
* @param tableName name of the table to snapshot
* @throws IOException we fail to reach the master
* @throws SnapshotCreationException if snapshot creation failed
* @throws IllegalArgumentException if the snapshot request is formatted incorrectly
*/
default void snapshotTable(String snapshotName, TableName tableName)
throws IOException, SnapshotCreationException, IllegalArgumentException{
snapshotTable(snapshotName, tableName, SnapshotType.FLUSH);
}

/**
* Create typed snapshot of the table.
* @param snapshotName name to give the snapshot on the filesystem. Must be unique from all other
* snapshots stored on the cluster
* @param tableName name of the table to snapshot
* @param type type of snapshot to take
* @throws IOException we fail to reach the master
* @throws SnapshotCreationException if snapshot creation failed
* @throws IllegalArgumentException if the snapshot request is formatted incorrectly
*/
default void snapshotTable(String snapshotName, TableName tableName, SnapshotType type)
throws IOException, SnapshotCreationException, IllegalArgumentException {
snapshotTable(new SnapshotDescription(snapshotName, tableName, type));
}

/**
* Create typed snapshot of the table.
* @param snapshotName name to give the snapshot on the filesystem. Must be unique from all other
* snapshots stored on the cluster
* @param tableName name of the table to snapshot
* @param type type of snapshot to take
* @param snapshotProps snapshot additional properties e.g. TTL
* @throws IOException we fail to reach the master
* @throws SnapshotCreationException if snapshot creation failed
* @throws IllegalArgumentException if the snapshot request is formatted incorrectly
*/
default void snapshotTable(String snapshotName, TableName tableName,
SnapshotType type, Map<String, Object> snapshotProps)
throws IOException, SnapshotCreationException, IllegalArgumentException {
snapshot(new SnapshotDescription(snapshotName, tableName, type, snapshotProps));
}

/**
* Take a snapshot and wait for the server to complete that snapshot (blocking).
* @param snapshot snapshot to take
* @throws IOException we fail to reach the master
* @throws SnapshotCreationException if snapshot creation failed
* @throws IllegalArgumentException if the snapshot request is formatted incorrectly
*/
void snapshotTable(SnapshotDescription snapshot)
throws IOException, SnapshotCreationException, IllegalArgumentException;

/**
* Take a snapshot without waiting for the server to complete that snapshot (asynchronous).
* Snapshots are considered unique based on <b>the name of the snapshot</b>. Snapshots are taken
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -625,6 +625,12 @@ public void snapshot(SnapshotDescription snapshot)
get(admin.snapshot(snapshot));
}

@Override
public void snapshotTable(SnapshotDescription snapshot)
throws IOException, SnapshotCreationException, IllegalArgumentException {
get(admin.snapshotTable(snapshot));
}

@Override
public Future<Void> snapshotAsync(SnapshotDescription snapshot)
throws IOException, SnapshotCreationException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -825,6 +825,20 @@ default CompletableFuture<Void> snapshot(String snapshotName, TableName tableNam
*/
CompletableFuture<Void> snapshot(SnapshotDescription snapshot);

/**
* Take a snapshot and wait for the server to complete that snapshot asynchronously.
*/
default CompletableFuture<Void> snapshotTable(String snapshotName, TableName tableName) {
return snapshot(new SnapshotDescription(snapshotName, tableName, SnapshotType.FLUSH));
}

default CompletableFuture<Void> snapshotTable(String snapshotName, TableName tableName,
SnapshotType type) {
return snapshot(new SnapshotDescription(snapshotName, tableName, type));
}

CompletableFuture<Void> snapshotTable(SnapshotDescription snapshot);

/**
* Check the current state of the passed snapshot. There are three possible states:
* <ol>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -476,6 +476,11 @@ public CompletableFuture<Void> snapshot(SnapshotDescription snapshot) {
return wrap(rawAdmin.snapshot(snapshot));
}

@Override
public CompletableFuture<Void> snapshotTable(SnapshotDescription snapshot) {
return wrap(rawAdmin.snapshotTable(snapshot));
}

@Override
public CompletableFuture<Boolean> isSnapshotFinished(SnapshotDescription snapshot) {
return wrap(rawAdmin.isSnapshotFinished(snapshot));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,8 @@
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ShutdownResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SnapshotRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SnapshotResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SnapshotTableRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SnapshotTableResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SplitTableRegionRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SplitTableRegionResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.StopMasterRequest;
Expand Down Expand Up @@ -1870,11 +1872,12 @@ public CompletableFuture<Void> snapshot(SnapshotDescription snapshotDesc) {
} catch (IllegalArgumentException e) {
return failedFuture(e);
}

CompletableFuture<Void> future = new CompletableFuture<>();
final SnapshotRequest request = SnapshotRequest.newBuilder().setSnapshot(snapshot).build();
addListener(this.<Long> newMasterCaller()
.action((controller, stub) -> this.<SnapshotRequest, SnapshotResponse, Long> call(controller,
stub, request, (s, c, req, done) -> s.snapshot(c, req, done),
.action((controller, stub) -> this.<SnapshotRequest, SnapshotResponse, Long> call(
controller, stub, request, (s, c, req, done) -> s.snapshot(c, req, done),
resp -> resp.getExpectedTimeout()))
.call(), (expectedTimeout, err) -> {
if (err != null) {
Expand Down Expand Up @@ -1916,6 +1919,23 @@ public void run(Timeout timeout) throws Exception {
return future;
}

@Override
public CompletableFuture<Void> snapshotTable(SnapshotDescription snapshotDesc) {
SnapshotProtos.SnapshotDescription snapshot =
ProtobufUtil.createHBaseProtosSnapshotDesc(snapshotDesc);
try {
ClientSnapshotDescriptionUtils.assertSnapshotRequestIsValid(snapshot);
} catch (IllegalArgumentException e) {
return failedFuture(e);
}

SnapshotTableRequest snapshotTableRequest = SnapshotTableRequest.newBuilder()
.setSnapshot(snapshot).setNonceGroup(ng.getNonceGroup()).setNonce(ng.newNonce()).build();
return this.<SnapshotTableRequest, SnapshotTableResponse> procedureCall(snapshotTableRequest,
(s, c, req, done) -> s.snapshotTable(c, req, done), (resp) -> resp.getProcId(),
new SnapshotTableProcedureBiConsumer(TableName.valueOf(snapshot.getTable())));
}

@Override
public CompletableFuture<Boolean> isSnapshotFinished(SnapshotDescription snapshot) {
return this
Expand Down Expand Up @@ -2738,6 +2758,17 @@ String getOperationType() {
}
}

private static class SnapshotTableProcedureBiConsumer extends TableProcedureBiConsumer {
SnapshotTableProcedureBiConsumer(TableName tableName) {
super(tableName);
}

@Override
String getOperationType() {
return "SNAPSHOT";
}
}

private static class ReplicationProcedureBiConsumer extends ProcedureBiConsumer {
private final String peerId;
private final Supplier<String> getOperation;
Expand Down
15 changes: 15 additions & 0 deletions hbase-protocol-shaded/src/main/protobuf/server/master/Master.proto
Original file line number Diff line number Diff line change
Expand Up @@ -443,6 +443,16 @@ message SnapshotResponse {
required int64 expected_timeout = 1;
}

message SnapshotTableRequest {
required SnapshotDescription snapshot = 1;
optional uint64 nonce_group = 2 [default = 0];
optional uint64 nonce = 3 [default = 0];
}

message SnapshotTableResponse {
optional int64 proc_id = 1;
}

message GetCompletedSnapshotsRequest {
}

Expand Down Expand Up @@ -932,6 +942,11 @@ service MasterService {
*/
rpc Snapshot(SnapshotRequest) returns(SnapshotResponse);

/**
* Create a snapshot for the given table.
*/
rpc SnapshotTable(SnapshotTableRequest) returns(SnapshotTableResponse);

/**
* Get completed snapshots.
* Returns a list of snapshot descriptors for completed snapshots
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,46 @@ message RestoreParentToChildRegionsPair {
required string child2_region_name = 3;
}

enum SnapshotState {
SNAPSHOT_PREPARE = 1;
SNAPSHOT_PRE_OPERATION = 2;
SNAPSHOT_WRITE_SNAPSHOT_INFO = 3;
SNAPSHOT_SNAPSHOT_ONLINE_REGIONS = 4;
SNAPSHOT_SNAPSHOT_OFFLINE_REGIONS = 5;
SNAPSHOT_SNAPSHOT_MOB_REGION = 6;
SNAPSHOT_CONSOLIDATE_SNAPSHOT = 7;
SNAPSHOT_VERIFIER_SNAPSHOT = 8;
SNAPSHOT_COMPLETE_SNAPSHOT = 9;
SNAPSHOT_POST_OPERATION = 10;
}

message SnapshotProcedureStateData {
required SnapshotDescription snapshot = 1;
}

message SnapshotRegionProcedureStateData {
required RegionInfo region = 1;
required SnapshotDescription snapshot = 2;
}

message SnapshotRegionParameter {
required RegionInfo region = 1;
required SnapshotDescription snapshot = 2;
}

message SnapshotVerifyProcedureStateData {
required SnapshotDescription snapshot = 1;
repeated RegionInfo region = 2;
required ServerName target_server = 3;
required uint32 expected_num_region = 4;
}

message SnapshotVerifyParameter {
required SnapshotDescription snapshot = 1;
repeated RegionInfo region = 2;
required uint32 expected_num_region = 3;
}

enum CloneSnapshotState {
CLONE_SNAPSHOT_PRE_OPERATION = 1;
CLONE_SNAPSHOT_WRITE_FS_LAYOUT = 2;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,21 @@ public enum EventType {
*
* RS_CLAIM_REPLICATION_QUEUE
*/
RS_CLAIM_REPLICATION_QUEUE(86, ExecutorType.RS_CLAIM_REPLICATION_QUEUE);
RS_CLAIM_REPLICATION_QUEUE(86, ExecutorType.RS_CLAIM_REPLICATION_QUEUE),

/**
* RS snapshot regions.<br>
*
* RS_SNAPSHOT_REGIONS
*/
RS_SNAPSHOT_REGIONS(87, ExecutorType.RS_SNAPSHOT_OPERATIONS),

/**
* RS verify snapshot.<br>
*
* RS_VERIFY_SNAPSHOT
*/
RS_VERIFY_SNAPSHOT(88, ExecutorType.RS_SNAPSHOT_OPERATIONS);

private final int code;
private final ExecutorType executor;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ public enum ExecutorType {
RS_REPLAY_SYNC_REPLICATION_WAL(32),
RS_SWITCH_RPC_THROTTLE(33),
RS_IN_MEMORY_COMPACTION(34),
RS_CLAIM_REPLICATION_QUEUE(35);
RS_CLAIM_REPLICATION_QUEUE(35),
RS_SNAPSHOT_OPERATIONS(36);

ExecutorType(int value) {
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,8 @@
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ShutdownResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SnapshotRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SnapshotResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SnapshotTableRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SnapshotTableResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SplitTableRegionRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SplitTableRegionResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.StopMasterRequest;
Expand Down Expand Up @@ -1719,6 +1721,27 @@ public SnapshotResponse snapshot(RpcController controller,
}
}

@Override
public SnapshotTableResponse snapshotTable(RpcController controller,
SnapshotTableRequest request) throws ServiceException {
try {
server.checkInitialized();
server.snapshotManager.checkSnapshotSupport();
LOG.info(server.getClientIdAuditPrefix() + " snapshot request for:" +
ClientSnapshotDescriptionUtils.toString(request.getSnapshot()));

SnapshotDescription snapshot = SnapshotDescriptionUtils.validate(
request.getSnapshot(), server.getConfiguration());
long procId = server.snapshotManager
.takeSnapshot(snapshot, request.getNonceGroup(), request.getNonce());
return SnapshotTableResponse.newBuilder().setProcId(procId).build();
} catch (ForeignException e) {
throw new ServiceException(e.getCause());
} catch (IOException e) {
throw new ServiceException(e);
}
}

@Override
public StopMasterResponse stopMaster(RpcController controller,
StopMasterRequest request) throws ServiceException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -446,7 +446,7 @@ protected ProcedureMetrics getProcedureMetrics(MasterProcedureEnv env) {
private boolean prepareMergeRegion(final MasterProcedureEnv env) throws IOException {
// Fail if we are taking snapshot for the given table
TableName tn = regionsToMerge[0].getTable();
if (env.getMasterServices().getSnapshotManager().isTakingSnapshot(tn)) {
if (env.getMasterServices().getSnapshotManager().isTakingSnapshot(tn, true)) {
throw new MergeRegionException("Skip merging regions " +
RegionInfo.getShortNameToLog(regionsToMerge) + ", because we are snapshotting " + tn);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -500,7 +500,7 @@ private byte[] getSplitRow() {
public boolean prepareSplitRegion(final MasterProcedureEnv env) throws IOException {
// Fail if we are taking snapshot for the given table
if (env.getMasterServices().getSnapshotManager()
.isTakingSnapshot(getParentRegion().getTable())) {
.isTakingSnapshot(getParentRegion().getTable(), true)) {
setFailure(new IOException("Skip splitting region " + getParentRegion().getShortNameToLog() +
", because we are taking snapshot for the table " + getParentRegion().getTable()));
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,12 @@ public enum ServerOperationType {
/**
* send the claim replication queue request to region server to actually assign it
*/
CLAIM_REPLICATION_QUEUE_REMOTE
CLAIM_REPLICATION_QUEUE_REMOTE,

/**
* send verify snapshot request to region server and handle the response
*/
VERIFY_SNAPSHOT
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ public boolean requireExclusiveLock(Procedure<?> proc) {
case SPLIT_WAL_REMOTE:
case CLAIM_REPLICATION_QUEUES:
case CLAIM_REPLICATION_QUEUE_REMOTE:
case VERIFY_SNAPSHOT:
return false;
default:
break;
Expand Down
Loading