Skip to content

Commit dde504c

Browse files
authored
HBASE-28155 RecoveredReplicationSource quit when there are still unfinished groups (#5466)
Signed-off-by: Guanghao Zhang <zghao@apache.org>
1 parent e07d1fe commit dde504c

File tree

2 files changed

+37
-8
lines changed

2 files changed

+37
-8
lines changed

hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,22 @@
2626
@InterfaceAudience.Private
2727
public class RecoveredReplicationSource extends ReplicationSource {
2828

29+
@Override
30+
protected void startShippers() {
31+
for (String walGroupId : logQueue.getQueues().keySet()) {
32+
workerThreads.put(walGroupId, createNewShipper(walGroupId));
33+
}
34+
// start shippers after initializing the workerThreads, as in the below postFinish logic, if
35+
// workerThreads is empty, we will mark the RecoveredReplicationSource as finished. So if we
36+
// start the worker on the fly, it is possible that a shipper has already finished its work and
37+
// called postFinish, and find out the workerThreads is empty and then mark the
38+
// RecoveredReplicationSource as finish, while the next shipper has not been added to
39+
// workerThreads yet. See HBASE-28155 for more details.
40+
for (ReplicationSourceShipper shipper : workerThreads.values()) {
41+
startShipper(shipper);
42+
}
43+
}
44+
2945
@Override
3046
protected RecoveredReplicationSourceShipper createNewShipper(String walGroupId,
3147
ReplicationSourceWALReader walReader) {

hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java

Lines changed: 21 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -360,19 +360,28 @@ private long getStartOffset(String walGroupId) {
360360
}
361361
}
362362

363+
protected final ReplicationSourceShipper createNewShipper(String walGroupId) {
364+
ReplicationSourceWALReader walReader =
365+
createNewWALReader(walGroupId, getStartOffset(walGroupId));
366+
ReplicationSourceShipper worker = createNewShipper(walGroupId, walReader);
367+
Threads.setDaemonThreadRunning(walReader, Thread.currentThread().getName()
368+
+ ".replicationSource.wal-reader." + walGroupId + "," + queueId, this::retryRefreshing);
369+
return worker;
370+
}
371+
372+
protected final void startShipper(ReplicationSourceShipper worker) {
373+
worker.startup(this::retryRefreshing);
374+
}
375+
363376
private void tryStartNewShipper(String walGroupId) {
364377
workerThreads.compute(walGroupId, (key, value) -> {
365378
if (value != null) {
366379
LOG.debug("{} preempted start of shipping worker walGroupId={}", logPeerId(), walGroupId);
367380
return value;
368381
} else {
369382
LOG.debug("{} starting shipping worker for walGroupId={}", logPeerId(), walGroupId);
370-
ReplicationSourceWALReader walReader =
371-
createNewWALReader(walGroupId, getStartOffset(walGroupId));
372-
ReplicationSourceShipper worker = createNewShipper(walGroupId, walReader);
373-
Threads.setDaemonThreadRunning(walReader, Thread.currentThread().getName()
374-
+ ".replicationSource.wal-reader." + walGroupId + "," + queueId, this::retryRefreshing);
375-
worker.startup(this::retryRefreshing);
383+
ReplicationSourceShipper worker = createNewShipper(walGroupId);
384+
startShipper(worker);
376385
return worker;
377386
}
378387
});
@@ -522,7 +531,7 @@ private long getCurrentBandwidth() {
522531
* @param sleepMultiplier by how many times the default sleeping time is augmented
523532
* @return True if <code>sleepMultiplier</code> is &lt; <code>maxRetriesMultiplier</code>
524533
*/
525-
protected boolean sleepForRetries(String msg, int sleepMultiplier) {
534+
private boolean sleepForRetries(String msg, int sleepMultiplier) {
526535
try {
527536
if (LOG.isTraceEnabled()) {
528537
LOG.trace("{} {}, sleeping {} times {}", logPeerId(), msg, sleepForRetries,
@@ -605,10 +614,14 @@ private void initialize() {
605614
queueId, logQueue.getNumQueues(), clusterId, peerClusterId);
606615
initializeWALEntryFilter(peerClusterId);
607616
// Start workers
617+
startShippers();
618+
setSourceStartupStatus(false);
619+
}
620+
621+
protected void startShippers() {
608622
for (String walGroupId : logQueue.getQueues().keySet()) {
609623
tryStartNewShipper(walGroupId);
610624
}
611-
setSourceStartupStatus(false);
612625
}
613626

614627
private synchronized void setSourceStartupStatus(boolean initializing) {

0 commit comments

Comments
 (0)