|
30 | 30 | import org.apache.hadoop.hbase.CellUtil;
|
31 | 31 | import org.apache.hadoop.hbase.KeyValue;
|
32 | 32 | import org.apache.hadoop.hbase.replication.WALEntryFilter;
|
| 33 | +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; |
33 | 34 | import org.apache.hadoop.hbase.util.Pair;
|
34 | 35 | import org.apache.hadoop.hbase.util.Threads;
|
35 | 36 | import org.apache.hadoop.hbase.wal.WAL.Entry;
|
@@ -69,6 +70,9 @@ class ReplicationSourceWALReader extends Thread {
|
69 | 70 | // position in the WAL to start reading at
|
70 | 71 | private long currentPosition;
|
71 | 72 | private final long sleepForRetries;
|
| 73 | + private final long sleepForQuotaCheck; |
| 74 | + |
| 75 | + private final long logQuotaThrottleInterval; |
72 | 76 | private final int maxRetriesMultiplier;
|
73 | 77 |
|
74 | 78 | // Indicates whether this particular worker is running
|
@@ -102,6 +106,10 @@ public ReplicationSourceWALReader(FileSystem fs, Configuration conf,
|
102 | 106 | int batchCount = conf.getInt("replication.source.nb.batches", 1);
|
103 | 107 | // 1 second
|
104 | 108 | this.sleepForRetries = this.conf.getLong("replication.source.sleepforretries", 1000);
|
| 109 | + // 300ms |
| 110 | + this.sleepForQuotaCheck = this.conf.getLong("replication.source.sleepforquotacheck", 300); |
| 111 | + this.logQuotaThrottleInterval = |
| 112 | + this.conf.getLong("replication.source.logintervalforquotathrottle.ms", 3000); |
105 | 113 | // 5 minutes @ 1 sec per
|
106 | 114 | this.maxRetriesMultiplier = this.conf.getInt("replication.source.maxretriesmultiplier", 300);
|
107 | 115 | this.entryBatchQueue = new LinkedBlockingQueue<>(batchCount);
|
@@ -140,9 +148,7 @@ public void run() {
|
140 | 148 | Threads.sleep(sleepForRetries);
|
141 | 149 | continue;
|
142 | 150 | }
|
143 |
| - if (!checkBufferQuota()) { |
144 |
| - continue; |
145 |
| - } |
| 151 | + blockUntilFreeBufferQuota(); |
146 | 152 | Path currentPath = entryStream.getCurrentPath();
|
147 | 153 | WALEntryStream.HasNext hasNext = entryStream.hasNext();
|
148 | 154 | if (hasNext == WALEntryStream.HasNext.NO) {
|
@@ -267,13 +273,20 @@ public Path getCurrentPath() {
|
267 | 273 | }
|
268 | 274 |
|
269 | 275 | // returns false if we've already exceeded the global quota
|
270 |
| - private boolean checkBufferQuota() { |
| 276 | + private void blockUntilFreeBufferQuota() { |
271 | 277 | // try not to go over total quota
|
272 |
| - if (!this.getSourceManager().checkBufferQuota(this.source.getPeerId())) { |
273 |
| - Threads.sleep(sleepForRetries); |
274 |
| - return false; |
| 278 | + long current = EnvironmentEdgeManager.currentTime(); |
| 279 | + while ( |
| 280 | + !this.getSourceManager().checkBufferQuota(this.source.getPeerId()) && isReaderRunning() |
| 281 | + ) { |
| 282 | + if (EnvironmentEdgeManager.currentTime() - current >= logQuotaThrottleInterval) { |
| 283 | + LOG.warn( |
| 284 | + "peer={}, source reader check buffer quota failed, current wal is {}, will sleep {}ms for next retry", |
| 285 | + this.source.getPeerId(), this.getCurrentPath(), sleepForQuotaCheck); |
| 286 | + } |
| 287 | + Threads.sleep(sleepForQuotaCheck); |
| 288 | + current = EnvironmentEdgeManager.currentTime(); |
275 | 289 | }
|
276 |
| - return true; |
277 | 290 | }
|
278 | 291 |
|
279 | 292 | private WALEntryBatch createBatch(WALEntryStream entryStream) {
|
|
0 commit comments