Skip to content

Commit 13d2405

Browse files
frostruanhuiruan
authored andcommitted
HBASE-26867 Introduce a FlushProcedure (#5256)
Co-authored-by: huiruan <876107431@qq.com> Signed-off-by: Duo Zhang <zhangduo@apache.org> (cherry picked from commit 20c9e4b)
1 parent ca0ef68 commit 13d2405

File tree

31 files changed

+1111
-28
lines changed

31 files changed

+1111
-28
lines changed

hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -550,6 +550,15 @@ Future<Void> modifyColumnFamilyStoreFileTrackerAsync(TableName tableName, byte[]
550550
*/
551551
void flush(TableName tableName, byte[] columnFamily) throws IOException;
552552

553+
/**
554+
* Flush the specified column family stores on all regions of the passed table. This runs as a
555+
* synchronous operation.
556+
* @param tableName table to flush
557+
* @param columnFamilies column families within a table
558+
* @throws IOException if a remote or network exception occurs
559+
*/
560+
void flush(TableName tableName, List<byte[]> columnFamilies) throws IOException;
561+
553562
/**
554563
* Flush an individual region. Synchronous operation.
555564
* @param regionName region to flush

hbase-client/src/main/java/org/apache/hadoop/hbase/client/AdminOverAsyncAdmin.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -264,6 +264,11 @@ public void flush(TableName tableName, byte[] columnFamily) throws IOException {
264264
get(admin.flush(tableName, columnFamily));
265265
}
266266

267+
@Override
268+
public void flush(TableName tableName, List<byte[]> columnFamilies) throws IOException {
269+
get(admin.flush(tableName, columnFamilies));
270+
}
271+
267272
@Override
268273
public void flushRegion(byte[] regionName) throws IOException {
269274
get(admin.flushRegion(regionName));

hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -347,6 +347,14 @@ CompletableFuture<Void> modifyColumnFamilyStoreFileTracker(TableName tableName,
347347
*/
348348
CompletableFuture<Void> flush(TableName tableName, byte[] columnFamily);
349349

350+
/**
351+
* Flush the specified column family stores on all regions of the passed table. This runs as a
352+
* synchronous operation.
353+
* @param tableName table to flush
354+
* @param columnFamilies column families within a table
355+
*/
356+
CompletableFuture<Void> flush(TableName tableName, List<byte[]> columnFamilies);
357+
350358
/**
351359
* Flush an individual region.
352360
* @param regionName region to flush

hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -269,6 +269,11 @@ public CompletableFuture<Void> flush(TableName tableName, byte[] columnFamily) {
269269
return wrap(rawAdmin.flush(tableName, columnFamily));
270270
}
271271

272+
@Override
273+
public CompletableFuture<Void> flush(TableName tableName, List<byte[]> columnFamilies) {
274+
return wrap(rawAdmin.flush(tableName, columnFamilies));
275+
}
276+
272277
@Override
273278
public CompletableFuture<Void> flushRegion(byte[] regionName) {
274279
return wrap(rawAdmin.flushRegion(regionName));

hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java

Lines changed: 58 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@
5353
import org.apache.hadoop.hbase.ClusterMetrics;
5454
import org.apache.hadoop.hbase.ClusterMetrics.Option;
5555
import org.apache.hadoop.hbase.ClusterMetricsBuilder;
56+
import org.apache.hadoop.hbase.DoNotRetryIOException;
5657
import org.apache.hadoop.hbase.HConstants;
5758
import org.apache.hadoop.hbase.HRegionLocation;
5859
import org.apache.hadoop.hbase.NamespaceDescriptor;
@@ -96,6 +97,7 @@
9697
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
9798
import org.apache.hadoop.hbase.util.ForeignExceptionUtil;
9899
import org.apache.hadoop.hbase.util.Pair;
100+
import org.apache.hadoop.hbase.util.Strings;
99101
import org.apache.yetus.audience.InterfaceAudience;
100102
import org.slf4j.Logger;
101103
import org.slf4j.LoggerFactory;
@@ -180,6 +182,8 @@
180182
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ExecProcedureResponse;
181183
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FlushMasterStoreRequest;
182184
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FlushMasterStoreResponse;
185+
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FlushTableRequest;
186+
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FlushTableResponse;
183187
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterStatusRequest;
184188
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterStatusResponse;
185189
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetCompletedSnapshotsRequest;
@@ -950,12 +954,50 @@ public CompletableFuture<List<RegionInfo>> getRegions(TableName tableName) {
950954

951955
@Override
952956
public CompletableFuture<Void> flush(TableName tableName) {
953-
return flush(tableName, null);
957+
return flush(tableName, Collections.emptyList());
954958
}
955959

956960
@Override
957961
public CompletableFuture<Void> flush(TableName tableName, byte[] columnFamily) {
962+
return flush(tableName, Collections.singletonList(columnFamily));
963+
}
964+
965+
@Override
966+
public CompletableFuture<Void> flush(TableName tableName, List<byte[]> columnFamilyList) {
967+
// This is for keeping compatibility with old implementation.
968+
// If the server version is lower than the client version, it's possible that the
969+
// flushTable method is not present in the server side, if so, we need to fall back
970+
// to the old implementation.
971+
List<byte[]> columnFamilies = columnFamilyList.stream()
972+
.filter(cf -> cf != null && cf.length > 0).distinct().collect(Collectors.toList());
973+
FlushTableRequest request = RequestConverter.buildFlushTableRequest(tableName, columnFamilies,
974+
ng.getNonceGroup(), ng.newNonce());
975+
CompletableFuture<Void> procFuture = this.<FlushTableRequest, FlushTableResponse> procedureCall(
976+
tableName, request, (s, c, req, done) -> s.flushTable(c, req, done),
977+
(resp) -> resp.getProcId(), new FlushTableProcedureBiConsumer(tableName));
958978
CompletableFuture<Void> future = new CompletableFuture<>();
979+
addListener(procFuture, (ret, error) -> {
980+
if (error != null) {
981+
if (error instanceof TableNotFoundException || error instanceof TableNotEnabledException) {
982+
future.completeExceptionally(error);
983+
} else if (error instanceof DoNotRetryIOException) {
984+
// usually this is caused by the method is not present on the server or
985+
// the hbase hadoop version does not match the running hadoop version.
986+
// if that happens, we need fall back to the old flush implementation.
987+
LOG.info("Unrecoverable error in master side. Fallback to FlushTableProcedure V1", error);
988+
legacyFlush(future, tableName, columnFamilies);
989+
} else {
990+
future.completeExceptionally(error);
991+
}
992+
} else {
993+
future.complete(ret);
994+
}
995+
});
996+
return future;
997+
}
998+
999+
private void legacyFlush(CompletableFuture<Void> future, TableName tableName,
1000+
List<byte[]> columnFamilies) {
9591001
addListener(tableExists(tableName), (exists, err) -> {
9601002
if (err != null) {
9611003
future.completeExceptionally(err);
@@ -969,8 +1011,9 @@ public CompletableFuture<Void> flush(TableName tableName, byte[] columnFamily) {
9691011
future.completeExceptionally(new TableNotEnabledException(tableName));
9701012
} else {
9711013
Map<String, String> props = new HashMap<>();
972-
if (columnFamily != null) {
973-
props.put(HConstants.FAMILY_KEY_STR, Bytes.toString(columnFamily));
1014+
if (columnFamilies != null && !columnFamilies.isEmpty()) {
1015+
props.put(HConstants.FAMILY_KEY_STR, Strings.JOINER
1016+
.join(columnFamilies.stream().map(Bytes::toString).collect(Collectors.toList())));
9741017
}
9751018
addListener(
9761019
execProcedure(FLUSH_TABLE_PROCEDURE_SIGNATURE, tableName.getNameAsString(), props),
@@ -985,7 +1028,6 @@ public CompletableFuture<Void> flush(TableName tableName, byte[] columnFamily) {
9851028
});
9861029
}
9871030
});
988-
return future;
9891031
}
9901032

9911033
@Override
@@ -2768,6 +2810,18 @@ String getOperationType() {
27682810
}
27692811
}
27702812

2813+
private static class FlushTableProcedureBiConsumer extends TableProcedureBiConsumer {
2814+
2815+
FlushTableProcedureBiConsumer(TableName tableName) {
2816+
super(tableName);
2817+
}
2818+
2819+
@Override
2820+
String getOperationType() {
2821+
return "FLUSH";
2822+
}
2823+
}
2824+
27712825
private static class CreateNamespaceProcedureBiConsumer extends NamespaceProcedureBiConsumer {
27722826

27732827
CreateNamespaceProcedureBiConsumer(String namespaceName) {

hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,7 @@
116116
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DisableTableRequest;
117117
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableCatalogJanitorRequest;
118118
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableTableRequest;
119+
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FlushTableRequest;
119120
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterStatusRequest;
120121
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetNamespaceDescriptorRequest;
121122
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableDescriptorsRequest;
@@ -1714,4 +1715,16 @@ public static RemoveServersRequest buildRemoveServersRequest(Set<Address> server
17141715
}
17151716
return RemoveServersRequest.newBuilder().addAllServers(hostPorts).build();
17161717
}
1718+
1719+
public static FlushTableRequest buildFlushTableRequest(final TableName tableName,
1720+
final List<byte[]> columnFamilies, final long nonceGroup, final long nonce) {
1721+
FlushTableRequest.Builder builder = FlushTableRequest.newBuilder();
1722+
builder.setTableName(ProtobufUtil.toProtoTableName(tableName));
1723+
if (!columnFamilies.isEmpty()) {
1724+
for (byte[] columnFamily : columnFamilies) {
1725+
builder.addColumnFamily(UnsafeByteOperations.unsafeWrap(columnFamily));
1726+
}
1727+
}
1728+
return builder.setNonceGroup(nonceGroup).setNonce(nonce).build();
1729+
}
17171730
}

hbase-common/src/main/java/org/apache/hadoop/hbase/util/Strings.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,9 @@
2020
import org.apache.commons.lang3.StringUtils;
2121
import org.apache.yetus.audience.InterfaceAudience;
2222

23+
import org.apache.hbase.thirdparty.com.google.common.base.Joiner;
24+
import org.apache.hbase.thirdparty.com.google.common.base.Splitter;
25+
2326
/**
2427
* Utility for Strings.
2528
*/
@@ -28,6 +31,9 @@ public final class Strings {
2831
public static final String DEFAULT_SEPARATOR = "=";
2932
public static final String DEFAULT_KEYVALUE_SEPARATOR = ", ";
3033

34+
public static final Joiner JOINER = Joiner.on(",");
35+
public static final Splitter SPLITTER = Splitter.on(",");
36+
3137
private Strings() {
3238
}
3339

hbase-protocol-shaded/src/main/protobuf/server/master/Master.proto

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -200,6 +200,17 @@ message ModifyTableResponse {
200200
optional uint64 proc_id = 1;
201201
}
202202

203+
message FlushTableRequest {
204+
required TableName table_name = 1;
205+
repeated bytes column_family = 2;
206+
optional uint64 nonce_group = 3 [default = 0];
207+
optional uint64 nonce = 4 [default = 0];
208+
}
209+
210+
message FlushTableResponse {
211+
optional uint64 proc_id = 1;
212+
}
213+
203214
/* Namespace-level protobufs */
204215

205216
message CreateNamespaceRequest {
@@ -1239,6 +1250,9 @@ service MasterService {
12391250

12401251
rpc FlushMasterStore(FlushMasterStoreRequest)
12411252
returns(FlushMasterStoreResponse);
1253+
1254+
rpc FlushTable(FlushTableRequest)
1255+
returns(FlushTableResponse);
12421256
}
12431257

12441258
// HBCK Service definitions.

hbase-protocol-shaded/src/main/protobuf/server/master/MasterProcedure.proto

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -191,6 +191,26 @@ message RestoreParentToChildRegionsPair {
191191
required string child2_region_name = 3;
192192
}
193193

194+
enum FlushTableState {
195+
FLUSH_TABLE_PREPARE = 1;
196+
FLUSH_TABLE_FLUSH_REGIONS = 2;
197+
}
198+
199+
message FlushTableProcedureStateData {
200+
required TableName table_name = 1;
201+
repeated bytes column_family = 2;
202+
}
203+
204+
message FlushRegionProcedureStateData {
205+
required RegionInfo region = 1;
206+
repeated bytes column_family = 2;
207+
}
208+
209+
message FlushRegionParameter {
210+
required RegionInfo region = 1;
211+
repeated bytes column_family = 2;
212+
}
213+
194214
enum SnapshotState {
195215
SNAPSHOT_PREPARE = 1;
196216
SNAPSHOT_PRE_OPERATION = 2;

hbase-server/src/main/java/org/apache/hadoop/hbase/executor/EventType.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -291,7 +291,13 @@ public enum EventType {
291291
* RS verify snapshot.<br>
292292
* RS_VERIFY_SNAPSHOT
293293
*/
294-
RS_VERIFY_SNAPSHOT(88, ExecutorType.RS_SNAPSHOT_OPERATIONS);
294+
RS_VERIFY_SNAPSHOT(88, ExecutorType.RS_SNAPSHOT_OPERATIONS),
295+
296+
/**
297+
* RS flush regions.<br>
298+
* RS_FLUSH_OPERATIONS
299+
*/
300+
RS_FLUSH_REGIONS(89, ExecutorType.RS_FLUSH_OPERATIONS);
295301

296302
private final int code;
297303
private final ExecutorType executor;

hbase-server/src/main/java/org/apache/hadoop/hbase/executor/ExecutorType.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,9 @@ public enum ExecutorType {
5454
RS_SWITCH_RPC_THROTTLE(33),
5555
RS_IN_MEMORY_COMPACTION(34),
5656
RS_CLAIM_REPLICATION_QUEUE(35),
57-
RS_SNAPSHOT_OPERATIONS(36);
57+
RS_SNAPSHOT_OPERATIONS(36),
58+
59+
RS_FLUSH_OPERATIONS(37);
5860

5961
ExecutorType(int value) {
6062
}

hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,7 @@
157157
import org.apache.hadoop.hbase.master.procedure.DeleteTableProcedure;
158158
import org.apache.hadoop.hbase.master.procedure.DisableTableProcedure;
159159
import org.apache.hadoop.hbase.master.procedure.EnableTableProcedure;
160+
import org.apache.hadoop.hbase.master.procedure.FlushTableProcedure;
160161
import org.apache.hadoop.hbase.master.procedure.InitMetaProcedure;
161162
import org.apache.hadoop.hbase.master.procedure.MasterProcedureConstants;
162163
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
@@ -4381,4 +4382,34 @@ private void initializeCoprocessorHost(Configuration conf) {
43814382
// initialize master side coprocessors before we start handling requests
43824383
this.cpHost = new MasterCoprocessorHost(this, conf);
43834384
}
4385+
4386+
@Override
4387+
public long flushTable(TableName tableName, List<byte[]> columnFamilies, long nonceGroup,
4388+
long nonce) throws IOException {
4389+
checkInitialized();
4390+
4391+
if (
4392+
!getConfiguration().getBoolean(MasterFlushTableProcedureManager.FLUSH_PROCEDURE_ENABLED,
4393+
MasterFlushTableProcedureManager.FLUSH_PROCEDURE_ENABLED_DEFAULT)
4394+
) {
4395+
throw new DoNotRetryIOException("FlushTableProcedureV2 is DISABLED");
4396+
}
4397+
4398+
return MasterProcedureUtil
4399+
.submitProcedure(new MasterProcedureUtil.NonceProcedureRunnable(this, nonceGroup, nonce) {
4400+
@Override
4401+
protected void run() throws IOException {
4402+
getMaster().getMasterCoprocessorHost().preTableFlush(tableName);
4403+
LOG.info(getClientIdAuditPrefix() + " flush " + tableName);
4404+
submitProcedure(
4405+
new FlushTableProcedure(procedureExecutor.getEnvironment(), tableName, columnFamilies));
4406+
getMaster().getMasterCoprocessorHost().postTableFlush(tableName);
4407+
}
4408+
4409+
@Override
4410+
protected String getDescription() {
4411+
return "FlushTableProcedure";
4412+
}
4413+
});
4414+
}
43844415
}

hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -235,6 +235,8 @@
235235
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FixMetaResponse;
236236
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FlushMasterStoreRequest;
237237
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FlushMasterStoreResponse;
238+
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FlushTableRequest;
239+
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FlushTableResponse;
238240
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterStatusRequest;
239241
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterStatusResponse;
240242
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetCompletedSnapshotsRequest;
@@ -3590,4 +3592,21 @@ public FlushMasterStoreResponse flushMasterStore(RpcController controller,
35903592
}
35913593
return FlushMasterStoreResponse.newBuilder().build();
35923594
}
3595+
3596+
@Override
3597+
public FlushTableResponse flushTable(RpcController controller, FlushTableRequest req)
3598+
throws ServiceException {
3599+
TableName tableName = ProtobufUtil.toTableName(req.getTableName());
3600+
List<byte[]> columnFamilies = req.getColumnFamilyCount() > 0
3601+
? req.getColumnFamilyList().stream().filter(cf -> !cf.isEmpty()).map(ByteString::toByteArray)
3602+
.collect(Collectors.toList())
3603+
: null;
3604+
try {
3605+
long procId =
3606+
server.flushTable(tableName, columnFamilies, req.getNonceGroup(), req.getNonce());
3607+
return FlushTableResponse.newBuilder().setProcId(procId).build();
3608+
} catch (IOException ioe) {
3609+
throw new ServiceException(ioe);
3610+
}
3611+
}
35933612
}

hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -477,4 +477,15 @@ boolean normalizeRegions(final NormalizeTableFilterParams ntfp, final boolean is
477477
* Flush master local region
478478
*/
479479
void flushMasterStore() throws IOException;
480+
481+
/**
482+
* Flush an existing table
483+
* @param tableName The table name
484+
* @param columnFamilies The column families to flush
485+
* @param nonceGroup the nonce group
486+
* @param nonce the nonce
487+
* @return the flush procedure id
488+
*/
489+
long flushTable(final TableName tableName, final List<byte[]> columnFamilies,
490+
final long nonceGroup, final long nonce) throws IOException;
480491
}

0 commit comments

Comments
 (0)