15
15
* See the License for the specific language governing permissions and
16
16
* limitations under the License.
17
17
*/
18
-
19
18
package org .apache .hadoop .hbase .rsgroup ;
20
19
21
- import com .google .protobuf .ServiceException ;
22
20
import java .io .ByteArrayInputStream ;
23
21
import java .io .IOException ;
24
22
import java .util .ArrayList ;
36
34
import org .apache .hadoop .conf .Configuration ;
37
35
import org .apache .hadoop .hbase .Coprocessor ;
38
36
import org .apache .hadoop .hbase .DoNotRetryIOException ;
37
+ import org .apache .hadoop .hbase .NamespaceDescriptor ;
39
38
import org .apache .hadoop .hbase .ServerName ;
40
39
import org .apache .hadoop .hbase .TableName ;
40
+ import org .apache .hadoop .hbase .client .AsyncClusterConnection ;
41
+ import org .apache .hadoop .hbase .client .AsyncTable ;
41
42
import org .apache .hadoop .hbase .client .ColumnFamilyDescriptorBuilder ;
42
- import org .apache .hadoop .hbase .client .Connection ;
43
43
import org .apache .hadoop .hbase .client .CoprocessorDescriptorBuilder ;
44
44
import org .apache .hadoop .hbase .client .Delete ;
45
45
import org .apache .hadoop .hbase .client .Get ;
48
48
import org .apache .hadoop .hbase .client .RegionInfo ;
49
49
import org .apache .hadoop .hbase .client .Result ;
50
50
import org .apache .hadoop .hbase .client .ResultScanner ;
51
- import org .apache .hadoop .hbase .client .Scan ;
52
- import org .apache .hadoop .hbase .client .Table ;
53
51
import org .apache .hadoop .hbase .client .TableDescriptor ;
54
52
import org .apache .hadoop .hbase .client .TableDescriptorBuilder ;
55
53
import org .apache .hadoop .hbase .constraint .ConstraintException ;
56
54
import org .apache .hadoop .hbase .coprocessor .MultiRowMutationEndpoint ;
57
55
import org .apache .hadoop .hbase .exceptions .DeserializationException ;
58
- import org .apache .hadoop .hbase .ipc .CoprocessorRpcChannel ;
59
56
import org .apache .hadoop .hbase .master .MasterServices ;
60
57
import org .apache .hadoop .hbase .master .ServerListener ;
61
58
import org .apache .hadoop .hbase .master .TableStateManager ;
66
63
import org .apache .hadoop .hbase .procedure2 .Procedure ;
67
64
import org .apache .hadoop .hbase .protobuf .ProtobufMagic ;
68
65
import org .apache .hadoop .hbase .protobuf .ProtobufUtil ;
69
- import org .apache .hadoop .hbase .protobuf .generated .MultiRowMutationProtos ;
66
+ import org .apache .hadoop .hbase .protobuf .generated .ClientProtos .MutationProto ;
67
+ import org .apache .hadoop .hbase .protobuf .generated .MultiRowMutationProtos .MultiRowMutationService ;
68
+ import org .apache .hadoop .hbase .protobuf .generated .MultiRowMutationProtos .MutateRowsRequest ;
69
+ import org .apache .hadoop .hbase .protobuf .generated .MultiRowMutationProtos .MutateRowsResponse ;
70
70
import org .apache .hadoop .hbase .protobuf .generated .RSGroupProtos ;
71
71
import org .apache .hadoop .hbase .regionserver .DisabledRegionSplitPolicy ;
72
72
import org .apache .hadoop .hbase .util .Bytes ;
73
+ import org .apache .hadoop .hbase .util .FutureUtils ;
73
74
import org .apache .hadoop .hbase .util .Threads ;
74
75
import org .apache .hadoop .hbase .zookeeper .ZKUtil ;
75
76
import org .apache .hadoop .hbase .zookeeper .ZKWatcher ;
79
80
import org .slf4j .Logger ;
80
81
import org .slf4j .LoggerFactory ;
81
82
83
+ import org .apache .hbase .thirdparty .com .google .common .annotations .VisibleForTesting ;
82
84
import org .apache .hbase .thirdparty .com .google .common .collect .Lists ;
83
85
import org .apache .hbase .thirdparty .com .google .common .collect .Maps ;
84
86
import org .apache .hbase .thirdparty .com .google .common .collect .Sets ;
91
93
* RSGroupInfo Map at {@link #rsGroupMap} and a Map of tables to the name of the rsgroup they belong
92
94
* too (in {@link #tableMap}). These Maps are persisted to the hbase:rsgroup table (and cached in
93
95
* zk) on each modification.
94
- * <p>
96
+ * <p/ >
95
97
* Mutations on state are synchronized but reads can continue without having to wait on an instance
96
98
* monitor, mutations do wholesale replace of the Maps on update -- Copy-On-Write; the local Maps of
97
99
* state are read-only, just-in-case (see flushConfig).
98
- * <p>
100
+ * <p/ >
99
101
* Reads must not block else there is a danger we'll deadlock.
100
- * <p>
102
+ * <p/ >
101
103
* Clients of this class, the {@link RSGroupAdminEndpoint} for example, want to query and then act
102
104
* on the results of the query modifying cache in zookeeper without another thread making
103
105
* intermediate modifications. These clients synchronize on the 'this' instance so no other has
107
109
final class RSGroupInfoManagerImpl implements RSGroupInfoManager {
108
110
private static final Logger LOG = LoggerFactory .getLogger (RSGroupInfoManagerImpl .class );
109
111
112
+ private static final String REASSIGN_WAIT_INTERVAL_KEY = "hbase.rsgroup.reassign.wait" ;
113
+ private static final long DEFAULT_REASSIGN_WAIT_INTERVAL = 30 * 1000L ;
114
+
115
+ // Assigned before user tables
116
+ @ VisibleForTesting
117
+ static final TableName RSGROUP_TABLE_NAME =
118
+ TableName .valueOf (NamespaceDescriptor .SYSTEM_NAMESPACE_NAME_STR , "rsgroup" );
119
+
120
+ private static final String RS_GROUP_ZNODE = "rsgroup" ;
121
+
122
+ @ VisibleForTesting
123
+ static final byte [] META_FAMILY_BYTES = Bytes .toBytes ("m" );
124
+
125
+ @ VisibleForTesting
126
+ static final byte [] META_QUALIFIER_BYTES = Bytes .toBytes ("i" );
127
+
128
+ private static final byte [] ROW_KEY = { 0 };
129
+
110
130
/** Table descriptor for <code>hbase:rsgroup</code> catalog table */
111
131
private static final TableDescriptor RSGROUP_TABLE_DESC ;
112
132
static {
@@ -129,7 +149,7 @@ final class RSGroupInfoManagerImpl implements RSGroupInfoManager {
129
149
private volatile Map <TableName , String > tableMap = Collections .emptyMap ();
130
150
131
151
private final MasterServices masterServices ;
132
- private final Connection conn ;
152
+ private final AsyncClusterConnection conn ;
133
153
private final ZKWatcher watcher ;
134
154
private final RSGroupStartupWorker rsGroupStartupWorker ;
135
155
// contains list of groups that were last flushed to persistent store
@@ -141,7 +161,7 @@ final class RSGroupInfoManagerImpl implements RSGroupInfoManager {
141
161
private RSGroupInfoManagerImpl (MasterServices masterServices ) throws IOException {
142
162
this .masterServices = masterServices ;
143
163
this .watcher = masterServices .getZooKeeper ();
144
- this .conn = masterServices .getConnection ();
164
+ this .conn = masterServices .getAsyncClusterConnection ();
145
165
this .rsGroupStartupWorker = new RSGroupStartupWorker ();
146
166
}
147
167
@@ -357,25 +377,25 @@ public synchronized void removeServers(Set<Address> servers) throws IOException
357
377
}
358
378
}
359
379
360
- List <RSGroupInfo > retrieveGroupListFromGroupTable () throws IOException {
380
+ private List <RSGroupInfo > retrieveGroupListFromGroupTable () throws IOException {
361
381
List <RSGroupInfo > rsGroupInfoList = Lists .newArrayList ();
362
- try ( Table table = conn .getTable (RSGROUP_TABLE_NAME );
363
- ResultScanner scanner = table .getScanner (new Scan () )) {
382
+ AsyncTable <?> table = conn .getTable (RSGROUP_TABLE_NAME );
383
+ try ( ResultScanner scanner = table .getScanner (META_FAMILY_BYTES , META_QUALIFIER_BYTES )) {
364
384
for (Result result ;;) {
365
385
result = scanner .next ();
366
386
if (result == null ) {
367
387
break ;
368
388
}
369
389
RSGroupProtos .RSGroupInfo proto = RSGroupProtos .RSGroupInfo
370
- .parseFrom (result .getValue (META_FAMILY_BYTES , META_QUALIFIER_BYTES ));
390
+ .parseFrom (result .getValue (META_FAMILY_BYTES , META_QUALIFIER_BYTES ));
371
391
rsGroupInfoList .add (ProtobufUtil .toGroupInfo (proto ));
372
392
}
373
393
}
374
394
return rsGroupInfoList ;
375
395
}
376
396
377
- List <RSGroupInfo > retrieveGroupListFromZookeeper () throws IOException {
378
- String groupBasePath = ZNodePaths .joinZNode (watcher .getZNodePaths ().baseZNode , rsGroupZNode );
397
+ private List <RSGroupInfo > retrieveGroupListFromZookeeper () throws IOException {
398
+ String groupBasePath = ZNodePaths .joinZNode (watcher .getZNodePaths ().baseZNode , RS_GROUP_ZNODE );
379
399
List <RSGroupInfo > RSGroupInfoList = Lists .newArrayList ();
380
400
// Overwrite any info stored by table, this takes precedence
381
401
try {
@@ -527,7 +547,8 @@ private synchronized void flushConfig(Map<String, RSGroupInfo> newGroupMap) thro
527
547
resetRSGroupAndTableMaps (newGroupMap , newTableMap );
528
548
529
549
try {
530
- String groupBasePath = ZNodePaths .joinZNode (watcher .getZNodePaths ().baseZNode , rsGroupZNode );
550
+ String groupBasePath =
551
+ ZNodePaths .joinZNode (watcher .getZNodePaths ().baseZNode , RS_GROUP_ZNODE );
531
552
ZKUtil .createAndFailSilent (watcher , groupBasePath , ProtobufMagic .PB_MAGIC );
532
553
533
554
List <ZKUtil .ZKUtilOp > zkOps = new ArrayList <>(newGroupMap .size ());
@@ -790,11 +811,8 @@ private boolean waitForGroupTableOnline() {
790
811
createRSGroupTable ();
791
812
}
792
813
// try reading from the table
793
- try (Table table = conn .getTable (RSGROUP_TABLE_NAME )) {
794
- table .get (new Get (ROW_KEY ));
795
- }
796
- LOG .info (
797
- "RSGroup table=" + RSGROUP_TABLE_NAME + " is online, refreshing cached information" );
814
+ FutureUtils .get (conn .getTable (RSGROUP_TABLE_NAME ).get (new Get (ROW_KEY )));
815
+ LOG .info ("RSGroup table={} is online, refreshing cached information" , RSGROUP_TABLE_NAME );
798
816
RSGroupInfoManagerImpl .this .refresh (true );
799
817
online = true ;
800
818
// flush any inconsistencies between ZK and HTable
@@ -836,8 +854,8 @@ private void createRSGroupTable() throws IOException {
836
854
} else {
837
855
Procedure <?> result = masterServices .getMasterProcedureExecutor ().getResult (procId );
838
856
if (result != null && result .isFailed ()) {
839
- throw new IOException (
840
- "Failed to create group table. " + MasterProcedureUtil .unwrapRemoteIOException (result ));
857
+ throw new IOException ("Failed to create group table. " +
858
+ MasterProcedureUtil .unwrapRemoteIOException (result ));
841
859
}
842
860
}
843
861
}
@@ -852,33 +870,24 @@ private static boolean isMasterRunning(MasterServices masterServices) {
852
870
}
853
871
854
872
private void multiMutate (List <Mutation > mutations ) throws IOException {
855
- try (Table table = conn .getTable (RSGROUP_TABLE_NAME )) {
856
- CoprocessorRpcChannel channel = table .coprocessorService (ROW_KEY );
857
- MultiRowMutationProtos .MutateRowsRequest .Builder mmrBuilder =
858
- MultiRowMutationProtos .MutateRowsRequest .newBuilder ();
859
- for (Mutation mutation : mutations ) {
860
- if (mutation instanceof Put ) {
861
- mmrBuilder .addMutationRequest (org .apache .hadoop .hbase .protobuf .ProtobufUtil .toMutation (
862
- org .apache .hadoop .hbase .protobuf .generated .ClientProtos .MutationProto .MutationType .PUT ,
863
- mutation ));
864
- } else if (mutation instanceof Delete ) {
865
- mmrBuilder .addMutationRequest (org .apache .hadoop .hbase .protobuf .ProtobufUtil .toMutation (
866
- org .apache .hadoop .hbase .protobuf .generated .ClientProtos .MutationProto .MutationType .DELETE ,
867
- mutation ));
868
- } else {
869
- throw new DoNotRetryIOException (
873
+ MutateRowsRequest .Builder builder = MutateRowsRequest .newBuilder ();
874
+ for (Mutation mutation : mutations ) {
875
+ if (mutation instanceof Put ) {
876
+ builder
877
+ .addMutationRequest (ProtobufUtil .toMutation (MutationProto .MutationType .PUT , mutation ));
878
+ } else if (mutation instanceof Delete ) {
879
+ builder .addMutationRequest (
880
+ ProtobufUtil .toMutation (MutationProto .MutationType .DELETE , mutation ));
881
+ } else {
882
+ throw new DoNotRetryIOException (
870
883
"multiMutate doesn't support " + mutation .getClass ().getName ());
871
- }
872
- }
873
-
874
- MultiRowMutationProtos .MultiRowMutationService .BlockingInterface service =
875
- MultiRowMutationProtos .MultiRowMutationService .newBlockingStub (channel );
876
- try {
877
- service .mutateRows (null , mmrBuilder .build ());
878
- } catch (ServiceException ex ) {
879
- ProtobufUtil .toIOException (ex );
880
884
}
881
885
}
886
+ MutateRowsRequest request = builder .build ();
887
+ AsyncTable <?> table = conn .getTable (RSGROUP_TABLE_NAME );
888
+ FutureUtils .get (table .<MultiRowMutationService , MutateRowsResponse > coprocessorService (
889
+ MultiRowMutationService ::newStub ,
890
+ (stub , controller , done ) -> stub .mutateRows (controller , request , done ), ROW_KEY ));
882
891
}
883
892
884
893
private void checkGroupName (String groupName ) throws ConstraintException {
0 commit comments