|
20 | 20 | import java.io.ByteArrayInputStream; |
21 | 21 | import java.io.IOException; |
22 | 22 | import java.util.ArrayList; |
| 23 | +import java.util.Collection; |
23 | 24 | import java.util.Collections; |
24 | 25 | import java.util.HashMap; |
25 | 26 | import java.util.HashSet; |
| 27 | +import java.util.Iterator; |
26 | 28 | import java.util.List; |
27 | 29 | import java.util.Map; |
28 | 30 | import java.util.OptionalLong; |
|
32 | 34 | import org.apache.hadoop.conf.Configuration; |
33 | 35 | import org.apache.hadoop.hbase.Coprocessor; |
34 | 36 | import org.apache.hadoop.hbase.DoNotRetryIOException; |
| 37 | +import org.apache.hadoop.hbase.HConstants; |
35 | 38 | import org.apache.hadoop.hbase.NamespaceDescriptor; |
36 | 39 | import org.apache.hadoop.hbase.ServerName; |
37 | 40 | import org.apache.hadoop.hbase.TableName; |
@@ -123,6 +126,9 @@ final class RSGroupInfoManagerImpl implements RSGroupInfoManager { |
123 | 126 | @VisibleForTesting |
124 | 127 | static final byte[] META_QUALIFIER_BYTES = Bytes.toBytes("i"); |
125 | 128 |
|
| 129 | + @VisibleForTesting |
| 130 | + static final String MIGRATE_THREAD_NAME = "Migrate-RSGroup-Tables"; |
| 131 | + |
126 | 132 | private static final byte[] ROW_KEY = { 0 }; |
127 | 133 |
|
128 | 134 | /** Table descriptor for <code>hbase:rsgroup</code> catalog table */ |
@@ -164,12 +170,13 @@ private RSGroupInfoManagerImpl(MasterServices masterServices) throws IOException |
164 | 170 |
|
165 | 171 |
|
166 | 172 | private synchronized void init() throws IOException { |
167 | | - refresh(); |
| 173 | + refresh(false); |
168 | 174 | serverEventsListenerThread.start(); |
169 | 175 | masterServices.getServerManager().registerListener(serverEventsListenerThread); |
170 | 176 | failedOpenUpdaterThread = new FailedOpenUpdaterThread(masterServices.getConfiguration()); |
171 | 177 | failedOpenUpdaterThread.start(); |
172 | 178 | masterServices.getServerManager().registerListener(failedOpenUpdaterThread); |
| 179 | + migrate(); |
173 | 180 | } |
174 | 181 |
|
175 | 182 | static RSGroupInfoManager getInstance(MasterServices master) throws IOException { |
@@ -356,9 +363,129 @@ private List<RSGroupInfo> retrieveGroupListFromZookeeper() throws IOException { |
356 | 363 | return RSGroupInfoList; |
357 | 364 | } |
358 | 365 |
|
359 | | - @Override |
360 | | - public void refresh() throws IOException { |
361 | | - refresh(false); |
| 366 | + private void waitUntilSomeProcsDone(Set<Long> pendingProcIds) { |
| 367 | + int size = pendingProcIds.size(); |
| 368 | + while (!masterServices.isStopped()) { |
| 369 | + for (Iterator<Long> iter = pendingProcIds.iterator(); iter.hasNext();) { |
| 370 | + long procId = iter.next(); |
| 371 | + if (masterServices.getMasterProcedureExecutor().isFinished(procId)) { |
| 372 | + iter.remove(); |
| 373 | + } |
| 374 | + } |
| 375 | + if (pendingProcIds.size() < size) { |
| 376 | + return; |
| 377 | + } |
| 378 | + try { |
| 379 | + Thread.sleep(1000); |
| 380 | + } catch (InterruptedException e) { |
| 381 | + Thread.currentThread().interrupt(); |
| 382 | + } |
| 383 | + } |
| 384 | + } |
| 385 | + |
| 386 | + private void waitUntilMasterStarted() { |
| 387 | + while (!masterServices.isInitialized() && !masterServices.isStopped()) { |
| 388 | + try { |
| 389 | + Thread.sleep(1000); |
| 390 | + } catch (InterruptedException e) { |
| 391 | + Thread.currentThread().interrupt(); |
| 392 | + } |
| 393 | + } |
| 394 | + } |
| 395 | + |
| 396 | + private void migrate(Collection<RSGroupInfo> groupList, int maxConcurrency) { |
| 397 | + waitUntilMasterStarted(); |
| 398 | + Set<Long> pendingProcIds = new HashSet<>(); |
| 399 | + for (RSGroupInfo groupInfo : groupList) { |
| 400 | + if (groupInfo.getName().equals(RSGroupInfo.DEFAULT_GROUP)) { |
| 401 | + continue; |
| 402 | + } |
| 403 | + SortedSet<TableName> failedTables = new TreeSet<>(); |
| 404 | + for (TableName tableName : groupInfo.getTables()) { |
| 405 | + LOG.info("Migrating {} in group {}", tableName, groupInfo.getName()); |
| 406 | + TableDescriptor oldTd; |
| 407 | + try { |
| 408 | + oldTd = masterServices.getTableDescriptors().get(tableName); |
| 409 | + } catch (IOException e) { |
| 410 | + LOG.warn("Failed to migrate {} in group {}", tableName, groupInfo.getName(), e); |
| 411 | + failedTables.add(tableName); |
| 412 | + continue; |
| 413 | + } |
| 414 | + if (oldTd == null) { |
| 415 | + continue; |
| 416 | + } |
| 417 | + if (oldTd.getRegionServerGroup().isPresent()) { |
| 418 | + // either we have already migrated it or that user has set the rs group using the new |
| 419 | + // code which will set the group directly on table descriptor, skip. |
| 420 | + LOG.debug("Skip migrating {} since it is already in group {}", tableName, |
| 421 | + oldTd.getRegionServerGroup().get()); |
| 422 | + continue; |
| 423 | + } |
| 424 | + TableDescriptor newTd = TableDescriptorBuilder.newBuilder(oldTd) |
| 425 | + .setRegionServerGroup(groupInfo.getName()).build(); |
| 426 | + try { |
| 427 | + pendingProcIds.add( |
| 428 | + masterServices.modifyTable(tableName, newTd, HConstants.NO_NONCE, HConstants.NO_NONCE)); |
| 429 | + } catch (IOException e) { |
| 430 | + LOG.warn("Failed to migrate {} in group {}", tableName, groupInfo.getName(), e); |
| 431 | + failedTables.add(tableName); |
| 432 | + continue; |
| 433 | + } |
| 434 | + if (pendingProcIds.size() >= maxConcurrency) { |
| 435 | + waitUntilSomeProcsDone(pendingProcIds); |
| 436 | + } |
| 437 | + } |
| 438 | + LOG.info("Done migrating {}, failed tables {}", groupInfo.getName(), failedTables); |
| 439 | + synchronized (RSGroupInfoManagerImpl.this) { |
| 440 | + RSGroupInfo currentInfo = rsGroupMap.get(groupInfo.getName()); |
| 441 | + if (currentInfo != null) { |
| 442 | + RSGroupInfo newInfo = |
| 443 | + new RSGroupInfo(currentInfo.getName(), currentInfo.getServers(), failedTables); |
| 444 | + Map<String, RSGroupInfo> newGroupMap = new HashMap<>(rsGroupMap); |
| 445 | + newGroupMap.put(groupInfo.getName(), newInfo); |
| 446 | + try { |
| 447 | + flushConfig(newGroupMap); |
| 448 | + } catch (IOException e) { |
| 449 | + LOG.warn("Failed to persist rs group", e); |
| 450 | + } |
| 451 | + } |
| 452 | + } |
| 453 | + } |
| 454 | + } |
| 455 | + |
| 456 | + // Migrate the table rs group info from RSGroupInfo into the table descriptor |
| 457 | + // Notice that we do not want to block the initialize so this will be done in background, and |
| 458 | + // during the migrating, the rs group info maybe incomplete and cause region to be misplaced. |
| 459 | + private void migrate() { |
| 460 | + Thread migrateThread = new Thread(MIGRATE_THREAD_NAME) { |
| 461 | + |
| 462 | + @Override |
| 463 | + public void run() { |
| 464 | + LOG.info("Start migrating table rs group config"); |
| 465 | + int maxConcurrency = 8; |
| 466 | + while (!masterServices.isStopped()) { |
| 467 | + Collection<RSGroupInfo> groups = rsGroupMap.values(); |
| 468 | + boolean hasTables = groups.stream().anyMatch(r -> !r.getTables().isEmpty()); |
| 469 | + if (hasTables) { |
| 470 | + migrate(groups, maxConcurrency); |
| 471 | + } else if (isOnline()) { |
| 472 | + // we have done migrating, quit. |
| 473 | + break; |
| 474 | + } else { |
| 475 | + // The rs group table is still not online yet, need to wait until it is online since the |
| 476 | + // rs groups maybe changed. |
| 477 | + try { |
| 478 | + Thread.sleep(1000); |
| 479 | + } catch (InterruptedException e) { |
| 480 | + Thread.currentThread().interrupt(); |
| 481 | + } |
| 482 | + } |
| 483 | + } |
| 484 | + LOG.info("Done migrating table rs group info"); |
| 485 | + } |
| 486 | + }; |
| 487 | + migrateThread.setDaemon(true); |
| 488 | + migrateThread.start(); |
362 | 489 | } |
363 | 490 |
|
364 | 491 | /** |
|
0 commit comments