Skip to content

Commit 9e40796

Browse files
committed
HBASE-22819 Automatically migrate the rs group config for table after HBASE-22695
1 parent 07fe41d commit 9e40796

File tree

4 files changed

+253
-15
lines changed

4 files changed

+253
-15
lines changed

hbase-common/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupInfo.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,8 @@
1919
package org.apache.hadoop.hbase.rsgroup;
2020

2121
import java.util.Collection;
22-
import java.util.Set;
2322
import java.util.SortedSet;
2423
import java.util.TreeSet;
25-
2624
import org.apache.hadoop.hbase.TableName;
2725
import org.apache.hadoop.hbase.net.Address;
2826
import org.apache.yetus.audience.InterfaceAudience;
@@ -104,7 +102,7 @@ public boolean containsServer(Address hostPort) {
104102
/**
105103
* Get list of servers.
106104
*/
107-
public Set<Address> getServers() {
105+
public SortedSet<Address> getServers() {
108106
return servers;
109107
}
110108

hbase-server/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupInfoManager.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -67,11 +67,6 @@ Set<Address> moveServers(Set<Address> servers, String srcGroup, String dstGroup)
6767
*/
6868
List<RSGroupInfo> listRSGroups() throws IOException;
6969

70-
/**
71-
* Refresh/reload the group information from the persistent store
72-
*/
73-
void refresh() throws IOException;
74-
7570
/**
7671
* Whether the manager is able to fully return group metadata
7772
* @return whether the manager is in online mode

hbase-server/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupInfoManagerImpl.java

Lines changed: 134 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,11 @@
2020
import java.io.ByteArrayInputStream;
2121
import java.io.IOException;
2222
import java.util.ArrayList;
23+
import java.util.Collection;
2324
import java.util.Collections;
2425
import java.util.HashMap;
2526
import java.util.HashSet;
27+
import java.util.Iterator;
2628
import java.util.List;
2729
import java.util.Map;
2830
import java.util.OptionalLong;
@@ -32,6 +34,7 @@
3234
import org.apache.hadoop.conf.Configuration;
3335
import org.apache.hadoop.hbase.Coprocessor;
3436
import org.apache.hadoop.hbase.DoNotRetryIOException;
37+
import org.apache.hadoop.hbase.HConstants;
3538
import org.apache.hadoop.hbase.NamespaceDescriptor;
3639
import org.apache.hadoop.hbase.ServerName;
3740
import org.apache.hadoop.hbase.TableName;
@@ -123,6 +126,9 @@ final class RSGroupInfoManagerImpl implements RSGroupInfoManager {
123126
@VisibleForTesting
124127
static final byte[] META_QUALIFIER_BYTES = Bytes.toBytes("i");
125128

129+
@VisibleForTesting
130+
static final String MIGRATE_THREAD_NAME = "Migrate-RSGroup-Tables";
131+
126132
private static final byte[] ROW_KEY = { 0 };
127133

128134
/** Table descriptor for <code>hbase:rsgroup</code> catalog table */
@@ -164,12 +170,13 @@ private RSGroupInfoManagerImpl(MasterServices masterServices) throws IOException
164170

165171

166172
private synchronized void init() throws IOException {
167-
refresh();
173+
refresh(false);
168174
serverEventsListenerThread.start();
169175
masterServices.getServerManager().registerListener(serverEventsListenerThread);
170176
failedOpenUpdaterThread = new FailedOpenUpdaterThread(masterServices.getConfiguration());
171177
failedOpenUpdaterThread.start();
172178
masterServices.getServerManager().registerListener(failedOpenUpdaterThread);
179+
migrate();
173180
}
174181

175182
static RSGroupInfoManager getInstance(MasterServices master) throws IOException {
@@ -356,9 +363,129 @@ private List<RSGroupInfo> retrieveGroupListFromZookeeper() throws IOException {
356363
return RSGroupInfoList;
357364
}
358365

359-
@Override
360-
public void refresh() throws IOException {
361-
refresh(false);
366+
private void waitUntilSomeProcsDone(Set<Long> pendingProcIds) {
367+
int size = pendingProcIds.size();
368+
while (!masterServices.isStopped()) {
369+
for (Iterator<Long> iter = pendingProcIds.iterator(); iter.hasNext();) {
370+
long procId = iter.next();
371+
if (masterServices.getMasterProcedureExecutor().isFinished(procId)) {
372+
iter.remove();
373+
}
374+
}
375+
if (pendingProcIds.size() < size) {
376+
return;
377+
}
378+
try {
379+
Thread.sleep(1000);
380+
} catch (InterruptedException e) {
381+
Thread.currentThread().interrupt();
382+
}
383+
}
384+
}
385+
386+
private void waitUntilMasterStarted() {
387+
while (!masterServices.isInitialized() && !masterServices.isStopped()) {
388+
try {
389+
Thread.sleep(1000);
390+
} catch (InterruptedException e) {
391+
Thread.currentThread().interrupt();
392+
}
393+
}
394+
}
395+
396+
private void migrate(Collection<RSGroupInfo> groupList, int maxConcurrency) {
397+
waitUntilMasterStarted();
398+
Set<Long> pendingProcIds = new HashSet<>();
399+
for (RSGroupInfo groupInfo : groupList) {
400+
if (groupInfo.getName().equals(RSGroupInfo.DEFAULT_GROUP)) {
401+
continue;
402+
}
403+
SortedSet<TableName> failedTables = new TreeSet<>();
404+
for (TableName tableName : groupInfo.getTables()) {
405+
LOG.info("Migrating {} in group {}", tableName, groupInfo.getName());
406+
TableDescriptor oldTd;
407+
try {
408+
oldTd = masterServices.getTableDescriptors().get(tableName);
409+
} catch (IOException e) {
410+
LOG.warn("Failed to migrate {} in group {}", tableName, groupInfo.getName(), e);
411+
failedTables.add(tableName);
412+
continue;
413+
}
414+
if (oldTd == null) {
415+
continue;
416+
}
417+
if (oldTd.getRegionServerGroup().isPresent()) {
418+
// either we have already migrated it or that user has set the rs group using the new
419+
// code which will set the group directly on table descriptor, skip.
420+
LOG.debug("Skip migrating {} since it is already in group {}", tableName,
421+
oldTd.getRegionServerGroup().get());
422+
continue;
423+
}
424+
TableDescriptor newTd = TableDescriptorBuilder.newBuilder(oldTd)
425+
.setRegionServerGroup(groupInfo.getName()).build();
426+
try {
427+
pendingProcIds.add(
428+
masterServices.modifyTable(tableName, newTd, HConstants.NO_NONCE, HConstants.NO_NONCE));
429+
} catch (IOException e) {
430+
LOG.warn("Failed to migrate {} in group {}", tableName, groupInfo.getName(), e);
431+
failedTables.add(tableName);
432+
continue;
433+
}
434+
if (pendingProcIds.size() >= maxConcurrency) {
435+
waitUntilSomeProcsDone(pendingProcIds);
436+
}
437+
}
438+
LOG.info("Done migrating {}, failed tables {}", groupInfo.getName(), failedTables);
439+
synchronized (RSGroupInfoManagerImpl.this) {
440+
RSGroupInfo currentInfo = rsGroupMap.get(groupInfo.getName());
441+
if (currentInfo != null) {
442+
RSGroupInfo newInfo =
443+
new RSGroupInfo(currentInfo.getName(), currentInfo.getServers(), failedTables);
444+
Map<String, RSGroupInfo> newGroupMap = new HashMap<>(rsGroupMap);
445+
newGroupMap.put(groupInfo.getName(), newInfo);
446+
try {
447+
flushConfig(newGroupMap);
448+
} catch (IOException e) {
449+
LOG.warn("Failed to persist rs group", e);
450+
}
451+
}
452+
}
453+
}
454+
}
455+
456+
// Migrate the table rs group info from RSGroupInfo into the table descriptor
457+
// Notice that we do not want to block the initialize so this will be done in background, and
458+
// during the migrating, the rs group info maybe incomplete and cause region to be misplaced.
459+
private void migrate() {
460+
Thread migrateThread = new Thread(MIGRATE_THREAD_NAME) {
461+
462+
@Override
463+
public void run() {
464+
LOG.info("Start migrating table rs group config");
465+
int maxConcurrency = 8;
466+
while (!masterServices.isStopped()) {
467+
Collection<RSGroupInfo> groups = rsGroupMap.values();
468+
boolean hasTables = groups.stream().anyMatch(r -> !r.getTables().isEmpty());
469+
if (hasTables) {
470+
migrate(groups, maxConcurrency);
471+
} else if (isOnline()) {
472+
// we have done migrating, quit.
473+
break;
474+
} else {
475+
// The rs group table is still not online yet, need to wait until it is online since the
476+
// rs groups maybe changed.
477+
try {
478+
Thread.sleep(1000);
479+
} catch (InterruptedException e) {
480+
Thread.currentThread().interrupt();
481+
}
482+
}
483+
}
484+
LOG.info("Done migrating table rs group info");
485+
}
486+
};
487+
migrateThread.setDaemon(true);
488+
migrateThread.start();
362489
}
363490

364491
/**
@@ -403,9 +530,9 @@ private void flushConfigTable(Map<String, RSGroupInfo> groupMap) throws IOExcept
403530
}
404531

405532
// populate puts
406-
for (RSGroupInfo RSGroupInfo : groupMap.values()) {
407-
RSGroupProtos.RSGroupInfo proto = ProtobufUtil.toProtoGroupInfo(RSGroupInfo);
408-
Put p = new Put(Bytes.toBytes(RSGroupInfo.getName()));
533+
for (RSGroupInfo rsGroupInfo : groupMap.values()) {
534+
RSGroupProtos.RSGroupInfo proto = ProtobufUtil.toProtoGroupInfo(rsGroupInfo);
535+
Put p = new Put(Bytes.toBytes(rsGroupInfo.getName()));
409536
p.addColumn(META_FAMILY_BYTES, META_QUALIFIER_BYTES, proto.toByteArray());
410537
mutations.add(p);
411538
}
Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.hbase.rsgroup;
19+
20+
import static org.apache.hadoop.hbase.rsgroup.RSGroupInfoManagerImpl.META_FAMILY_BYTES;
21+
import static org.apache.hadoop.hbase.rsgroup.RSGroupInfoManagerImpl.META_QUALIFIER_BYTES;
22+
import static org.apache.hadoop.hbase.rsgroup.RSGroupInfoManagerImpl.RSGROUP_TABLE_NAME;
23+
import static org.junit.Assert.assertTrue;
24+
25+
import java.io.IOException;
26+
import org.apache.hadoop.hbase.HBaseClassTestRule;
27+
import org.apache.hadoop.hbase.TableName;
28+
import org.apache.hadoop.hbase.client.Get;
29+
import org.apache.hadoop.hbase.client.Put;
30+
import org.apache.hadoop.hbase.client.Result;
31+
import org.apache.hadoop.hbase.client.Table;
32+
import org.apache.hadoop.hbase.client.TableDescriptor;
33+
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
34+
import org.apache.hadoop.hbase.protobuf.generated.RSGroupProtos;
35+
import org.apache.hadoop.hbase.testclassification.MediumTests;
36+
import org.apache.hadoop.hbase.util.Bytes;
37+
import org.junit.AfterClass;
38+
import org.junit.BeforeClass;
39+
import org.junit.ClassRule;
40+
import org.junit.Test;
41+
import org.junit.experimental.categories.Category;
42+
43+
/**
44+
* Testcase for HBASE-22819
45+
*/
46+
@Category({ MediumTests.class })
47+
public class TestMigrateRSGroupInfo extends TestRSGroupsBase {
48+
49+
@ClassRule
50+
public static final HBaseClassTestRule CLASS_RULE =
51+
HBaseClassTestRule.forClass(TestMigrateRSGroupInfo.class);
52+
53+
private static String TABLE_NAME_PREFIX = "Table_";
54+
55+
private static int NUM_TABLES = 10;
56+
57+
private static byte[] FAMILY = Bytes.toBytes("family");
58+
59+
@BeforeClass
60+
public static void setUp() throws Exception {
61+
setUpTestBeforeClass();
62+
for (int i = 0; i < NUM_TABLES; i++) {
63+
TEST_UTIL.createTable(TableName.valueOf(TABLE_NAME_PREFIX + i), FAMILY);
64+
}
65+
}
66+
67+
@AfterClass
68+
public static void tearDown() throws Exception {
69+
tearDownAfterClass();
70+
}
71+
72+
@Test
73+
public void testMigrate() throws IOException, InterruptedException {
74+
String groupName = name.getMethodName();
75+
addGroup(groupName, TEST_UTIL.getMiniHBaseCluster().getRegionServerThreads().size() - 1);
76+
RSGroupInfo rsGroupInfo = rsGroupAdmin.getRSGroupInfo(groupName);
77+
assertTrue(rsGroupInfo.getTables().isEmpty());
78+
for (int i = 0; i < NUM_TABLES; i++) {
79+
rsGroupInfo.addTable(TableName.valueOf(TABLE_NAME_PREFIX + i));
80+
}
81+
try (Table table = TEST_UTIL.getConnection().getTable(RSGROUP_TABLE_NAME)) {
82+
RSGroupProtos.RSGroupInfo proto = ProtobufUtil.toProtoGroupInfo(rsGroupInfo);
83+
Put p = new Put(Bytes.toBytes(rsGroupInfo.getName()));
84+
p.addColumn(META_FAMILY_BYTES, META_QUALIFIER_BYTES, proto.toByteArray());
85+
table.put(p);
86+
}
87+
TEST_UTIL.getMiniHBaseCluster().stopMaster(0).join();
88+
TEST_UTIL.getMiniHBaseCluster().startMaster();
89+
TEST_UTIL.waitFor(60000, () -> {
90+
for (int i = 0; i < NUM_TABLES; i++) {
91+
TableDescriptor td;
92+
try {
93+
td = TEST_UTIL.getAdmin().getDescriptor(TableName.valueOf(TABLE_NAME_PREFIX + i));
94+
} catch (IOException e) {
95+
return false;
96+
}
97+
if (!rsGroupInfo.getName().equals(td.getRegionServerGroup().orElse(null))) {
98+
return false;
99+
}
100+
}
101+
return true;
102+
});
103+
// make sure that we persist the result to hbase, where we delete all the tables in the rs
104+
// group.
105+
TEST_UTIL.waitFor(30000, () -> {
106+
try (Table table = TEST_UTIL.getConnection().getTable(RSGROUP_TABLE_NAME)) {
107+
Result result = table.get(new Get(Bytes.toBytes(rsGroupInfo.getName())));
108+
RSGroupProtos.RSGroupInfo proto = RSGroupProtos.RSGroupInfo
109+
.parseFrom(result.getValue(META_FAMILY_BYTES, META_QUALIFIER_BYTES));
110+
RSGroupInfo gi = ProtobufUtil.toGroupInfo(proto);
111+
return gi.getTables().isEmpty();
112+
}
113+
});
114+
// make sure that the migrate thread has quit.
115+
TEST_UTIL.waitFor(30000, () -> Thread.getAllStackTraces().keySet().stream()
116+
.noneMatch(t -> t.getName().equals(RSGroupInfoManagerImpl.MIGRATE_THREAD_NAME)));
117+
}
118+
}

0 commit comments

Comments
 (0)