17
17
*/
18
18
package org .apache .hadoop .hbase .rsgroup ;
19
19
20
- import com .google .protobuf .ServiceException ;
21
20
import java .io .ByteArrayInputStream ;
22
21
import java .io .IOException ;
23
22
import java .util .ArrayList ;
34
33
import java .util .TreeSet ;
35
34
import org .apache .hadoop .hbase .Coprocessor ;
36
35
import org .apache .hadoop .hbase .DoNotRetryIOException ;
36
+ import org .apache .hadoop .hbase .NamespaceDescriptor ;
37
37
import org .apache .hadoop .hbase .ServerName ;
38
38
import org .apache .hadoop .hbase .TableName ;
39
+ import org .apache .hadoop .hbase .client .AsyncClusterConnection ;
40
+ import org .apache .hadoop .hbase .client .AsyncTable ;
39
41
import org .apache .hadoop .hbase .client .ColumnFamilyDescriptorBuilder ;
40
- import org .apache .hadoop .hbase .client .Connection ;
41
42
import org .apache .hadoop .hbase .client .CoprocessorDescriptorBuilder ;
42
43
import org .apache .hadoop .hbase .client .Delete ;
43
44
import org .apache .hadoop .hbase .client .Get ;
44
45
import org .apache .hadoop .hbase .client .Mutation ;
45
46
import org .apache .hadoop .hbase .client .Put ;
46
47
import org .apache .hadoop .hbase .client .Result ;
47
48
import org .apache .hadoop .hbase .client .ResultScanner ;
48
- import org .apache .hadoop .hbase .client .Scan ;
49
- import org .apache .hadoop .hbase .client .Table ;
50
49
import org .apache .hadoop .hbase .client .TableDescriptor ;
51
50
import org .apache .hadoop .hbase .client .TableDescriptorBuilder ;
52
51
import org .apache .hadoop .hbase .constraint .ConstraintException ;
53
52
import org .apache .hadoop .hbase .coprocessor .MultiRowMutationEndpoint ;
54
53
import org .apache .hadoop .hbase .exceptions .DeserializationException ;
55
- import org .apache .hadoop .hbase .ipc .CoprocessorRpcChannel ;
56
54
import org .apache .hadoop .hbase .master .MasterServices ;
57
55
import org .apache .hadoop .hbase .master .ServerListener ;
58
56
import org .apache .hadoop .hbase .master .TableStateManager ;
62
60
import org .apache .hadoop .hbase .procedure2 .Procedure ;
63
61
import org .apache .hadoop .hbase .protobuf .ProtobufMagic ;
64
62
import org .apache .hadoop .hbase .protobuf .ProtobufUtil ;
65
- import org .apache .hadoop .hbase .protobuf .generated .MultiRowMutationProtos ;
63
+ import org .apache .hadoop .hbase .protobuf .generated .ClientProtos .MutationProto ;
64
+ import org .apache .hadoop .hbase .protobuf .generated .MultiRowMutationProtos .MultiRowMutationService ;
65
+ import org .apache .hadoop .hbase .protobuf .generated .MultiRowMutationProtos .MutateRowsRequest ;
66
+ import org .apache .hadoop .hbase .protobuf .generated .MultiRowMutationProtos .MutateRowsResponse ;
66
67
import org .apache .hadoop .hbase .protobuf .generated .RSGroupProtos ;
67
68
import org .apache .hadoop .hbase .regionserver .DisabledRegionSplitPolicy ;
68
69
import org .apache .hadoop .hbase .util .Bytes ;
70
+ import org .apache .hadoop .hbase .util .FutureUtils ;
69
71
import org .apache .hadoop .hbase .util .Threads ;
70
72
import org .apache .hadoop .hbase .zookeeper .ZKUtil ;
71
73
import org .apache .hadoop .hbase .zookeeper .ZKWatcher ;
75
77
import org .slf4j .Logger ;
76
78
import org .slf4j .LoggerFactory ;
77
79
80
+ import org .apache .hbase .thirdparty .com .google .common .annotations .VisibleForTesting ;
78
81
import org .apache .hbase .thirdparty .com .google .common .collect .Lists ;
79
82
import org .apache .hbase .thirdparty .com .google .common .collect .Maps ;
80
83
import org .apache .hbase .thirdparty .com .google .common .collect .Sets ;
87
90
* RSGroupInfo Map at {@link #rsGroupMap} and a Map of tables to the name of the rsgroup they belong
88
91
* too (in {@link #tableMap}). These Maps are persisted to the hbase:rsgroup table (and cached in
89
92
* zk) on each modification.
90
- * <p>
93
+ * <p/ >
91
94
* Mutations on state are synchronized but reads can continue without having to wait on an instance
92
95
* monitor, mutations do wholesale replace of the Maps on update -- Copy-On-Write; the local Maps of
93
96
* state are read-only, just-in-case (see flushConfig).
94
- * <p>
97
+ * <p/ >
95
98
* Reads must not block else there is a danger we'll deadlock.
96
- * <p>
99
+ * <p/ >
97
100
* Clients of this class, the {@link RSGroupAdminEndpoint} for example, want to query and then act
98
101
* on the results of the query modifying cache in zookeeper without another thread making
99
102
* intermediate modifications. These clients synchronize on the 'this' instance so no other has
103
106
final class RSGroupInfoManagerImpl implements RSGroupInfoManager {
104
107
private static final Logger LOG = LoggerFactory .getLogger (RSGroupInfoManagerImpl .class );
105
108
109
+ private static final String REASSIGN_WAIT_INTERVAL_KEY = "hbase.rsgroup.reassign.wait" ;
110
+ private static final long DEFAULT_REASSIGN_WAIT_INTERVAL = 30 * 1000L ;
111
+
112
+ // Assigned before user tables
113
+ @ VisibleForTesting
114
+ static final TableName RSGROUP_TABLE_NAME =
115
+ TableName .valueOf (NamespaceDescriptor .SYSTEM_NAMESPACE_NAME_STR , "rsgroup" );
116
+
117
+ private static final String RS_GROUP_ZNODE = "rsgroup" ;
118
+
119
+ @ VisibleForTesting
120
+ static final byte [] META_FAMILY_BYTES = Bytes .toBytes ("m" );
121
+
122
+ @ VisibleForTesting
123
+ static final byte [] META_QUALIFIER_BYTES = Bytes .toBytes ("i" );
124
+
125
+ private static final byte [] ROW_KEY = { 0 };
126
+
106
127
/** Table descriptor for <code>hbase:rsgroup</code> catalog table */
107
128
private static final TableDescriptor RSGROUP_TABLE_DESC ;
108
129
static {
@@ -125,7 +146,7 @@ final class RSGroupInfoManagerImpl implements RSGroupInfoManager {
125
146
private volatile Map <TableName , String > tableMap = Collections .emptyMap ();
126
147
127
148
private final MasterServices masterServices ;
128
- private final Connection conn ;
149
+ private final AsyncClusterConnection conn ;
129
150
private final ZKWatcher watcher ;
130
151
private final RSGroupStartupWorker rsGroupStartupWorker ;
131
152
// contains list of groups that were last flushed to persistent store
@@ -136,7 +157,7 @@ final class RSGroupInfoManagerImpl implements RSGroupInfoManager {
136
157
private RSGroupInfoManagerImpl (MasterServices masterServices ) throws IOException {
137
158
this .masterServices = masterServices ;
138
159
this .watcher = masterServices .getZooKeeper ();
139
- this .conn = masterServices .getConnection ();
160
+ this .conn = masterServices .getAsyncClusterConnection ();
140
161
this .rsGroupStartupWorker = new RSGroupStartupWorker ();
141
162
}
142
163
@@ -349,25 +370,25 @@ public synchronized void removeServers(Set<Address> servers) throws IOException
349
370
}
350
371
}
351
372
352
- List <RSGroupInfo > retrieveGroupListFromGroupTable () throws IOException {
373
+ private List <RSGroupInfo > retrieveGroupListFromGroupTable () throws IOException {
353
374
List <RSGroupInfo > rsGroupInfoList = Lists .newArrayList ();
354
- try ( Table table = conn .getTable (RSGROUP_TABLE_NAME );
355
- ResultScanner scanner = table .getScanner (new Scan () )) {
375
+ AsyncTable <?> table = conn .getTable (RSGROUP_TABLE_NAME );
376
+ try ( ResultScanner scanner = table .getScanner (META_FAMILY_BYTES , META_QUALIFIER_BYTES )) {
356
377
for (Result result ;;) {
357
378
result = scanner .next ();
358
379
if (result == null ) {
359
380
break ;
360
381
}
361
382
RSGroupProtos .RSGroupInfo proto = RSGroupProtos .RSGroupInfo
362
- .parseFrom (result .getValue (META_FAMILY_BYTES , META_QUALIFIER_BYTES ));
383
+ .parseFrom (result .getValue (META_FAMILY_BYTES , META_QUALIFIER_BYTES ));
363
384
rsGroupInfoList .add (ProtobufUtil .toGroupInfo (proto ));
364
385
}
365
386
}
366
387
return rsGroupInfoList ;
367
388
}
368
389
369
- List <RSGroupInfo > retrieveGroupListFromZookeeper () throws IOException {
370
- String groupBasePath = ZNodePaths .joinZNode (watcher .getZNodePaths ().baseZNode , rsGroupZNode );
390
+ private List <RSGroupInfo > retrieveGroupListFromZookeeper () throws IOException {
391
+ String groupBasePath = ZNodePaths .joinZNode (watcher .getZNodePaths ().baseZNode , RS_GROUP_ZNODE );
371
392
List <RSGroupInfo > RSGroupInfoList = Lists .newArrayList ();
372
393
// Overwrite any info stored by table, this takes precedence
373
394
try {
@@ -519,7 +540,8 @@ private synchronized void flushConfig(Map<String, RSGroupInfo> newGroupMap) thro
519
540
resetRSGroupAndTableMaps (newGroupMap , newTableMap );
520
541
521
542
try {
522
- String groupBasePath = ZNodePaths .joinZNode (watcher .getZNodePaths ().baseZNode , rsGroupZNode );
543
+ String groupBasePath =
544
+ ZNodePaths .joinZNode (watcher .getZNodePaths ().baseZNode , RS_GROUP_ZNODE );
523
545
ZKUtil .createAndFailSilent (watcher , groupBasePath , ProtobufMagic .PB_MAGIC );
524
546
525
547
List <ZKUtil .ZKUtilOp > zkOps = new ArrayList <>(newGroupMap .size ());
@@ -702,11 +724,8 @@ private boolean waitForGroupTableOnline() {
702
724
createRSGroupTable ();
703
725
}
704
726
// try reading from the table
705
- try (Table table = conn .getTable (RSGROUP_TABLE_NAME )) {
706
- table .get (new Get (ROW_KEY ));
707
- }
708
- LOG .info (
709
- "RSGroup table=" + RSGROUP_TABLE_NAME + " is online, refreshing cached information" );
727
+ FutureUtils .get (conn .getTable (RSGROUP_TABLE_NAME ).get (new Get (ROW_KEY )));
728
+ LOG .info ("RSGroup table={} is online, refreshing cached information" , RSGROUP_TABLE_NAME );
710
729
RSGroupInfoManagerImpl .this .refresh (true );
711
730
online = true ;
712
731
// flush any inconsistencies between ZK and HTable
@@ -748,8 +767,8 @@ private void createRSGroupTable() throws IOException {
748
767
} else {
749
768
Procedure <?> result = masterServices .getMasterProcedureExecutor ().getResult (procId );
750
769
if (result != null && result .isFailed ()) {
751
- throw new IOException (
752
- "Failed to create group table. " + MasterProcedureUtil .unwrapRemoteIOException (result ));
770
+ throw new IOException ("Failed to create group table. " +
771
+ MasterProcedureUtil .unwrapRemoteIOException (result ));
753
772
}
754
773
}
755
774
}
@@ -764,33 +783,24 @@ private static boolean isMasterRunning(MasterServices masterServices) {
764
783
}
765
784
766
785
private void multiMutate (List <Mutation > mutations ) throws IOException {
767
- try (Table table = conn .getTable (RSGROUP_TABLE_NAME )) {
768
- CoprocessorRpcChannel channel = table .coprocessorService (ROW_KEY );
769
- MultiRowMutationProtos .MutateRowsRequest .Builder mmrBuilder =
770
- MultiRowMutationProtos .MutateRowsRequest .newBuilder ();
771
- for (Mutation mutation : mutations ) {
772
- if (mutation instanceof Put ) {
773
- mmrBuilder .addMutationRequest (org .apache .hadoop .hbase .protobuf .ProtobufUtil .toMutation (
774
- org .apache .hadoop .hbase .protobuf .generated .ClientProtos .MutationProto .MutationType .PUT ,
775
- mutation ));
776
- } else if (mutation instanceof Delete ) {
777
- mmrBuilder .addMutationRequest (org .apache .hadoop .hbase .protobuf .ProtobufUtil .toMutation (
778
- org .apache .hadoop .hbase .protobuf .generated .ClientProtos .MutationProto .MutationType .DELETE ,
779
- mutation ));
780
- } else {
781
- throw new DoNotRetryIOException (
786
+ MutateRowsRequest .Builder builder = MutateRowsRequest .newBuilder ();
787
+ for (Mutation mutation : mutations ) {
788
+ if (mutation instanceof Put ) {
789
+ builder
790
+ .addMutationRequest (ProtobufUtil .toMutation (MutationProto .MutationType .PUT , mutation ));
791
+ } else if (mutation instanceof Delete ) {
792
+ builder .addMutationRequest (
793
+ ProtobufUtil .toMutation (MutationProto .MutationType .DELETE , mutation ));
794
+ } else {
795
+ throw new DoNotRetryIOException (
782
796
"multiMutate doesn't support " + mutation .getClass ().getName ());
783
- }
784
- }
785
-
786
- MultiRowMutationProtos .MultiRowMutationService .BlockingInterface service =
787
- MultiRowMutationProtos .MultiRowMutationService .newBlockingStub (channel );
788
- try {
789
- service .mutateRows (null , mmrBuilder .build ());
790
- } catch (ServiceException ex ) {
791
- ProtobufUtil .toIOException (ex );
792
797
}
793
798
}
799
+ MutateRowsRequest request = builder .build ();
800
+ AsyncTable <?> table = conn .getTable (RSGROUP_TABLE_NAME );
801
+ FutureUtils .get (table .<MultiRowMutationService , MutateRowsResponse > coprocessorService (
802
+ MultiRowMutationService ::newStub ,
803
+ (stub , controller , done ) -> stub .mutateRows (controller , request , done ), ROW_KEY ));
794
804
}
795
805
796
806
private void checkGroupName (String groupName ) throws ConstraintException {
0 commit comments