Skip to content

Commit 787d524

Browse files
authored
HBASE-28090 Make entryReader field final in ReplicationSourceShipper class (#5409)
Signed-off-by: Wellington Chevreuil <wchevreuil@apache.org>
1 parent ef7b854 commit 787d524

File tree

5 files changed

+20
-27
lines changed

5 files changed

+20
-27
lines changed

hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,9 @@
2727
public class RecoveredReplicationSource extends ReplicationSource {
2828

2929
@Override
30-
protected RecoveredReplicationSourceShipper createNewShipper(String walGroupId) {
31-
return new RecoveredReplicationSourceShipper(conf, walGroupId, logQueue, this, queueStorage,
30+
protected RecoveredReplicationSourceShipper createNewShipper(String walGroupId,
31+
ReplicationSourceWALReader walReader) {
32+
return new RecoveredReplicationSourceShipper(conf, walGroupId, this, walReader, queueStorage,
3233
() -> {
3334
if (workerThreads.isEmpty()) {
3435
this.getSourceMetrics().clear();

hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipper.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,9 @@ public class RecoveredReplicationSourceShipper extends ReplicationSourceShipper
3030
private final Runnable tryFinish;
3131

3232
public RecoveredReplicationSourceShipper(Configuration conf, String walGroupId,
33-
ReplicationSourceLogQueue logQueue, RecoveredReplicationSource source,
33+
RecoveredReplicationSource source, ReplicationSourceWALReader walReader,
3434
ReplicationQueueStorage queueStorage, Runnable tryFinish) {
35-
super(conf, walGroupId, logQueue, source);
35+
super(conf, walGroupId, source, walReader);
3636
this.tryFinish = tryFinish;
3737
}
3838

hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -367,14 +367,13 @@ private void tryStartNewShipper(String walGroupId) {
367367
return value;
368368
} else {
369369
LOG.debug("{} starting shipping worker for walGroupId={}", logPeerId(), walGroupId);
370-
ReplicationSourceShipper worker = createNewShipper(walGroupId);
371370
ReplicationSourceWALReader walReader =
372371
createNewWALReader(walGroupId, getStartOffset(walGroupId));
372+
ReplicationSourceShipper worker = createNewShipper(walGroupId, walReader);
373373
Threads.setDaemonThreadRunning(
374374
walReader, Thread.currentThread().getName() + ".replicationSource.wal-reader."
375375
+ walGroupId + "," + queueId,
376376
(t, e) -> this.uncaughtException(t, e, this.manager, this.getPeerId()));
377-
worker.setWALReader(walReader);
378377
worker.startup((t, e) -> this.uncaughtException(t, e, this.manager, this.getPeerId()));
379378
return worker;
380379
}
@@ -428,8 +427,9 @@ private long getFileSize(Path currentPath) throws IOException {
428427
return fileSize;
429428
}
430429

431-
protected ReplicationSourceShipper createNewShipper(String walGroupId) {
432-
return new ReplicationSourceShipper(conf, walGroupId, logQueue, this);
430+
protected ReplicationSourceShipper createNewShipper(String walGroupId,
431+
ReplicationSourceWALReader walReader) {
432+
return new ReplicationSourceShipper(conf, walGroupId, this, walReader);
433433
}
434434

435435
private ReplicationSourceWALReader createNewWALReader(String walGroupId, long startPosition) {
@@ -665,7 +665,7 @@ public void terminate(String reason, Exception cause, boolean clearMetrics) {
665665
terminate(reason, cause, clearMetrics, true);
666666
}
667667

668-
public void terminate(String reason, Exception cause, boolean clearMetrics, boolean join) {
668+
private void terminate(String reason, Exception cause, boolean clearMetrics, boolean join) {
669669
if (cause == null) {
670670
LOG.info("{} Closing source {} because: {}", logPeerId(), this.queueId, reason);
671671
} else {
@@ -684,9 +684,7 @@ public void terminate(String reason, Exception cause, boolean clearMetrics, bool
684684

685685
for (ReplicationSourceShipper worker : workers) {
686686
worker.stopWorker();
687-
if (worker.entryReader != null) {
688-
worker.entryReader.setReaderRunning(false);
689-
}
687+
worker.entryReader.setReaderRunning(false);
690688
}
691689

692690
if (this.replicationEndpoint != null) {

hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java

Lines changed: 8 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -55,9 +55,8 @@ public enum WorkerState {
5555
}
5656

5757
private final Configuration conf;
58-
protected final String walGroupId;
59-
protected final ReplicationSourceLogQueue logQueue;
60-
protected final ReplicationSource source;
58+
final String walGroupId;
59+
private final ReplicationSource source;
6160

6261
// Last position in the log that we sent to ZooKeeper
6362
// It will be accessed by the stats thread so make it volatile
@@ -66,22 +65,22 @@ public enum WorkerState {
6665
private Path currentPath;
6766
// Current state of the worker thread
6867
private volatile WorkerState state;
69-
protected ReplicationSourceWALReader entryReader;
68+
final ReplicationSourceWALReader entryReader;
7069

7170
// How long should we sleep for each retry
72-
protected final long sleepForRetries;
71+
private final long sleepForRetries;
7372
// Maximum number of retries before taking bold actions
74-
protected final int maxRetriesMultiplier;
73+
private final int maxRetriesMultiplier;
7574
private final int DEFAULT_TIMEOUT = 20000;
7675
private final int getEntriesTimeout;
7776
private final int shipEditsTimeout;
7877

79-
public ReplicationSourceShipper(Configuration conf, String walGroupId,
80-
ReplicationSourceLogQueue logQueue, ReplicationSource source) {
78+
public ReplicationSourceShipper(Configuration conf, String walGroupId, ReplicationSource source,
79+
ReplicationSourceWALReader walReader) {
8180
this.conf = conf;
8281
this.walGroupId = walGroupId;
83-
this.logQueue = logQueue;
8482
this.source = source;
83+
this.entryReader = walReader;
8584
// 1 second
8685
this.sleepForRetries = this.conf.getLong("replication.source.sleepforretries", 1000);
8786
// 5 minutes @ 1 sec per
@@ -295,10 +294,6 @@ long getCurrentPosition() {
295294
return currentPosition;
296295
}
297296

298-
void setWALReader(ReplicationSourceWALReader entryReader) {
299-
this.entryReader = entryReader;
300-
}
301-
302297
protected boolean isActive() {
303298
return source.isSourceActive() && state == WorkerState.RUNNING && !isInterrupted();
304299
}

hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -291,8 +291,7 @@ public void testTerminateClearsBuffer() throws Exception {
291291
mock(MetricsSource.class));
292292
ReplicationSourceWALReader reader =
293293
new ReplicationSourceWALReader(null, conf, null, 0, null, source, null);
294-
ReplicationSourceShipper shipper = new ReplicationSourceShipper(conf, null, null, source);
295-
shipper.entryReader = reader;
294+
ReplicationSourceShipper shipper = new ReplicationSourceShipper(conf, null, source, reader);
296295
source.workerThreads.put("testPeer", shipper);
297296
WALEntryBatch batch = new WALEntryBatch(10, logDir);
298297
WAL.Entry mockEntry = mock(WAL.Entry.class);

0 commit comments

Comments
 (0)