Skip to content

HBASE-26867 Introduce a FlushProcedure #4246

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import org.apache.hadoop.hbase.ClusterMetrics;
import org.apache.hadoop.hbase.ClusterMetrics.Option;
import org.apache.hadoop.hbase.ClusterMetricsBuilder;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.NamespaceDescriptor;
Expand Down Expand Up @@ -180,6 +181,8 @@
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ExecProcedureResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FlushMasterStoreRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FlushMasterStoreResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FlushTableRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FlushTableResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterStatusRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterStatusResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetCompletedSnapshotsRequest;
Expand Down Expand Up @@ -925,7 +928,38 @@ public CompletableFuture<Void> flush(TableName tableName) {

@Override
public CompletableFuture<Void> flush(TableName tableName, byte[] columnFamily) {
// This is for keeping compatibility with old implementation.
// If the server version is lower than the client version, it's possible that the
// flushTable method is not present in the server side, if so, we need to fall back
// to the old implementation.
FlushTableRequest request = RequestConverter.buildFlushTableRequest(tableName, columnFamily,
ng.getNonceGroup(), ng.newNonce());
CompletableFuture<Void> procFuture = this.<FlushTableRequest, FlushTableResponse> procedureCall(
tableName, request, (s, c, req, done) -> s.flushTable(c, req, done),
(resp) -> resp.getProcId(), new FlushTableProcedureBiConsumer(tableName));
// here we use another new CompletableFuture because the
// procFuture is not fully controlled by ourselves.
CompletableFuture<Void> future = new CompletableFuture<>();
addListener(procFuture, (ret, error) -> {
if (error != null) {
if (error instanceof DoNotRetryIOException) {
// usually this is caused by the method is not present on the server or
// the hbase hadoop version does not match the running hadoop version.
// if that happens, we need fall back to the old flush implementation.
LOG.info("Unrecoverable error in master side. Fallback to FlushTableProcedure V1", error);
legacyFlush(future, tableName, columnFamily);
} else {
future.completeExceptionally(error);
}
} else {
future.complete(ret);
}
});
return future;
}

private void legacyFlush(CompletableFuture<Void> future, TableName tableName,
byte[] columnFamily) {
addListener(tableExists(tableName), (exists, err) -> {
if (err != null) {
future.completeExceptionally(err);
Expand Down Expand Up @@ -955,7 +989,6 @@ public CompletableFuture<Void> flush(TableName tableName, byte[] columnFamily) {
});
}
});
return future;
}

@Override
Expand Down Expand Up @@ -2611,6 +2644,18 @@ String getOperationType() {
}
}

private static class FlushTableProcedureBiConsumer extends TableProcedureBiConsumer {

FlushTableProcedureBiConsumer(TableName tableName) {
super(tableName);
}

@Override
String getOperationType() {
return "FLUSH";
}
}

private static class ModifyTableProcedureBiConsumer extends TableProcedureBiConsumer {

ModifyTableProcedureBiConsumer(AsyncAdmin admin, TableName tableName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DisableTableRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableCatalogJanitorRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableTableRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FlushTableRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterStatusRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetNamespaceDescriptorRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableDescriptorsRequest;
Expand Down Expand Up @@ -746,6 +747,16 @@ public static GetOnlineRegionRequest buildGetOnlineRegionRequest() {
return GetOnlineRegionRequest.newBuilder().build();
}

public static FlushTableRequest buildFlushTableRequest(final TableName tableName,
final byte[] columnFamily, final long nonceGroup, final long nonce) {
FlushTableRequest.Builder builder = FlushTableRequest.newBuilder();
builder.setTableName(ProtobufUtil.toProtoTableName(tableName));
if (columnFamily != null) {
builder.setColumnFamily(UnsafeByteOperations.unsafeWrap(columnFamily));
}
return builder.setNonceGroup(nonceGroup).setNonce(nonce).build();
}

/**
* Create a protocol buffer FlushRegionRequest for a given region name
* @param regionName the name of the region to get info
Expand Down
14 changes: 14 additions & 0 deletions hbase-protocol-shaded/src/main/protobuf/server/master/Master.proto
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,17 @@ message ModifyTableResponse {
optional uint64 proc_id = 1;
}

message FlushTableRequest {
required TableName table_name = 1;
optional bytes column_family = 2;
optional uint64 nonce_group = 3 [default = 0];
optional uint64 nonce = 4 [default = 0];
}

message FlushTableResponse {
optional uint64 proc_id = 1;
}

/* Namespace-level protobufs */

message CreateNamespaceRequest {
Expand Down Expand Up @@ -1203,6 +1214,9 @@ service MasterService {

rpc FlushMasterStore(FlushMasterStoreRequest)
returns(FlushMasterStoreResponse);

rpc FlushTable(FlushTableRequest)
returns(FlushTableResponse);
}

// HBCK Service definitions.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,26 @@ message RestoreParentToChildRegionsPair {
required string child2_region_name = 3;
}

enum FlushTableState {
FLUSH_TABLE_PREPARE = 1;
FLUSH_TABLE_FLUSH_REGIONS = 2;
}

message FlushTableProcedureStateData {
required TableName table_name = 1;
optional bytes column_family = 2;
}

message FlushRegionProcedureStateData {
required RegionInfo region = 1;
optional bytes column_family = 2;
}

message FlushRegionParameter {
required RegionInfo region = 1;
optional bytes column_family = 2;
}

enum SnapshotState {
SNAPSHOT_PREPARE = 1;
SNAPSHOT_PRE_OPERATION = 2;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,13 @@ public enum EventType {
* RS verify snapshot.<br>
* RS_VERIFY_SNAPSHOT
*/
RS_VERIFY_SNAPSHOT(88, ExecutorType.RS_SNAPSHOT_OPERATIONS);
RS_VERIFY_SNAPSHOT(88, ExecutorType.RS_SNAPSHOT_OPERATIONS),

/**
* RS flush regions.<br>
* RS_FLUSH_OPERATIONS
*/
RS_FLUSH_REGIONS(89, ExecutorType.RS_FLUSH_OPERATIONS);

private final int code;
private final ExecutorType executor;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ public enum ExecutorType {
RS_SWITCH_RPC_THROTTLE(33),
RS_IN_MEMORY_COMPACTION(34),
RS_CLAIM_REPLICATION_QUEUE(35),
RS_SNAPSHOT_OPERATIONS(36);
RS_SNAPSHOT_OPERATIONS(36),
RS_FLUSH_OPERATIONS(37);

ExecutorType(int value) {
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@
import org.apache.hadoop.hbase.master.procedure.DeleteTableProcedure;
import org.apache.hadoop.hbase.master.procedure.DisableTableProcedure;
import org.apache.hadoop.hbase.master.procedure.EnableTableProcedure;
import org.apache.hadoop.hbase.master.procedure.FlushTableProcedure;
import org.apache.hadoop.hbase.master.procedure.InitMetaProcedure;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureConstants;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
Expand Down Expand Up @@ -2691,6 +2692,36 @@ protected String getDescription() {
});
}

@Override
public long flushTable(TableName tableName, byte[] columnFamily, long nonceGroup, long nonce)
throws IOException {
checkInitialized();

if (
!getConfiguration().getBoolean(MasterFlushTableProcedureManager.FLUSH_PROCEDURE_ENABLED,
MasterFlushTableProcedureManager.FLUSH_PROCEDURE_ENABLED_DEFAULT)
) {
throw new DoNotRetryIOException("FlushProcedure is DISABLED");
}

return MasterProcedureUtil
.submitProcedure(new MasterProcedureUtil.NonceProcedureRunnable(this, nonceGroup, nonce) {
@Override
protected void run() throws IOException {
getMaster().getMasterCoprocessorHost().preTableFlush(tableName);
LOG.info(getClientIdAuditPrefix() + " flush " + tableName);
submitProcedure(
new FlushTableProcedure(procedureExecutor.getEnvironment(), tableName, columnFamily));
getMaster().getMasterCoprocessorHost().postTableFlush(tableName);
}

@Override
protected String getDescription() {
return "FlushTableProcedure";
}
});
}

private long modifyTable(final TableName tableName,
final TableDescriptorGetter newDescriptorGetter, final long nonceGroup, final long nonce,
final boolean shouldCheckDescriptor) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,8 @@
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FixMetaResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FlushMasterStoreRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FlushMasterStoreResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FlushTableRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FlushTableResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterStatusRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterStatusResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetCompletedSnapshotsRequest;
Expand Down Expand Up @@ -3483,4 +3485,17 @@ public FlushMasterStoreResponse flushMasterStore(RpcController controller,
}
return FlushMasterStoreResponse.newBuilder().build();
}

@Override
public FlushTableResponse flushTable(RpcController controller, FlushTableRequest req)
throws ServiceException {
TableName tableName = ProtobufUtil.toTableName(req.getTableName());
byte[] columnFamily = req.hasColumnFamily() ? req.getColumnFamily().toByteArray() : null;
try {
long procId = server.flushTable(tableName, columnFamily, req.getNonceGroup(), req.getNonce());
return FlushTableResponse.newBuilder().setProcId(procId).build();
} catch (IOException ioe) {
throw new ServiceException(ioe);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,17 @@ long enableTable(final TableName tableName, final long nonceGroup, final long no
long disableTable(final TableName tableName, final long nonceGroup, final long nonce)
throws IOException;

/**
* Flush an existing table
* @param tableName The table name
* @param columnFamily The column family
* @param nonceGroup the nonce group
* @param nonce the nonce
* @return the flush procedure id
*/
long flushTable(final TableName tableName, final byte[] columnFamily, final long nonceGroup,
final long nonce) throws IOException;

/**
* Add a new column to an existing table
* @param tableName The table name
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.procedure;

import java.io.IOException;
import java.util.Optional;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.master.RegionState.State;
import org.apache.hadoop.hbase.master.assignment.RegionStateNode;
import org.apache.hadoop.hbase.procedure2.Procedure;
import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteOperation;
import org.apache.hadoop.hbase.regionserver.FlushRegionCallable;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;

import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.FlushRegionParameter;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.FlushRegionProcedureStateData;

@InterfaceAudience.Private
public class FlushRegionProcedure extends IdempotentRegionRemoteProcedureBase {
private static final Logger LOG = LoggerFactory.getLogger(FlushRegionProcedure.class);

private byte[] columnFamily;

public FlushRegionProcedure() {
}

public FlushRegionProcedure(RegionInfo region) {
this(region, null);
}

public FlushRegionProcedure(RegionInfo region, byte[] columnFamily) {
super(region);
this.columnFamily = columnFamily;
}

@Override
protected Procedure<MasterProcedureEnv>[] execute(MasterProcedureEnv env)
throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException {
RegionStateNode regionNode =
env.getAssignmentManager().getRegionStates().getRegionStateNode(region);
if (!regionNode.isInState(State.OPEN) || regionNode.isInTransition()) {
LOG.info("State of region {} is not OPEN or in transition. Skip {} ...", region, this);
return null;
}
return super.execute(env);
}

@Override
protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
FlushRegionProcedureStateData.Builder builder = FlushRegionProcedureStateData.newBuilder();
builder.setRegion(ProtobufUtil.toRegionInfo(region));
if (columnFamily != null) {
builder.setColumnFamily(UnsafeByteOperations.unsafeWrap(columnFamily));
}
serializer.serialize(builder.build());
}

@Override
protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException {
FlushRegionProcedureStateData data =
serializer.deserialize(FlushRegionProcedureStateData.class);
this.region = ProtobufUtil.toRegionInfo(data.getRegion());
if (data.hasColumnFamily()) {
this.columnFamily = data.getColumnFamily().toByteArray();
}
}

@Override
public Optional<RemoteOperation> remoteCallBuild(MasterProcedureEnv env, ServerName serverName) {
FlushRegionParameter.Builder builder = FlushRegionParameter.newBuilder();
builder.setRegion(ProtobufUtil.toRegionInfo(region));
if (columnFamily != null) {
builder.setColumnFamily(UnsafeByteOperations.unsafeWrap(columnFamily));
}
return Optional.of(new RSProcedureDispatcher.ServerOperation(this, getProcId(),
FlushRegionCallable.class, builder.build().toByteArray()));
}

@Override
public TableOperationType getTableOperationType() {
return TableOperationType.FLUSH;
}

@Override
protected boolean waitInitialized(MasterProcedureEnv env) {
return env.waitInitialized(this);
}
}
Loading