Skip to content

Commit 9821fd8

Browse files
wchevreuilapurtell
authored andcommitted
HBASE-22784 OldWALs not cleared in a replication slave cluster (cyclic replication bw 2 clusters)
Signed-off-by: Andrew Purtell <apurtell@apache.org>
1 parent 73a48b7 commit 9821fd8

File tree

4 files changed

+70
-10
lines changed

4 files changed

+70
-10
lines changed

hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -783,6 +783,7 @@ protected void shipEdits(WALEntryBatch entryBatch) {
783783
}
784784

785785
private void updateLogPosition(long lastReadPosition) {
786+
manager.setPendingShipment(false);
786787
manager.logPositionAndCleanOldLogs(currentPath, peerClusterZnode, lastReadPosition,
787788
this.replicationQueueInfo.isQueueRecovered(), false);
788789
lastLoggedPosition = lastReadPosition;

hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java

Lines changed: 20 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -121,9 +121,10 @@ public class ReplicationSourceManager implements ReplicationListener {
121121
private final Random rand;
122122
private final boolean replicationForBulkLoadDataEnabled;
123123

124-
125124
private AtomicLong totalBufferUsed = new AtomicLong();
126125

126+
private boolean pendingShipment;
127+
127128
/**
128129
* Creates a replication manager and sets the watch on all the other registered region servers
129130
* @param replicationQueues the interface for manipulating replication queues
@@ -189,14 +190,20 @@ public ReplicationSourceManager(final ReplicationQueues replicationQueues,
189190
* @param queueRecovered indicates if this queue comes from another region server
190191
* @param holdLogInZK if true then the log is retained in ZK
191192
*/
192-
public void logPositionAndCleanOldLogs(Path log, String id, long position,
193+
public synchronized void logPositionAndCleanOldLogs(Path log, String id, long position,
193194
boolean queueRecovered, boolean holdLogInZK) {
194-
String fileName = log.getName();
195-
this.replicationQueues.setLogPosition(id, fileName, position);
196-
if (holdLogInZK) {
197-
return;
195+
if (!this.pendingShipment) {
196+
String fileName = log.getName();
197+
this.replicationQueues.setLogPosition(id, fileName, position);
198+
if (holdLogInZK) {
199+
return;
200+
}
201+
cleanOldLogs(fileName, id, queueRecovered);
198202
}
199-
cleanOldLogs(fileName, id, queueRecovered);
203+
}
204+
205+
public synchronized void setPendingShipment(boolean pendingShipment) {
206+
this.pendingShipment = pendingShipment;
200207
}
201208

202209
/**
@@ -209,9 +216,12 @@ public void logPositionAndCleanOldLogs(Path log, String id, long position,
209216
public void cleanOldLogs(String key, String id, boolean queueRecovered) {
210217
String logPrefix = DefaultWALProvider.getWALPrefixFromWALName(key);
211218
if (queueRecovered) {
212-
SortedSet<String> wals = walsByIdRecoveredQueues.get(id).get(logPrefix);
213-
if (wals != null && !wals.first().equals(key)) {
214-
cleanOldLogs(wals, key, id);
219+
Map<String, SortedSet<String>> walsForPeer = walsByIdRecoveredQueues.get(id);
220+
if(walsForPeer != null) {
221+
SortedSet<String> wals = walsForPeer.get(logPrefix);
222+
if (wals != null && !wals.first().equals(key)) {
223+
cleanOldLogs(wals, key, id);
224+
}
215225
}
216226
} else {
217227
synchronized (this.walsById) {

hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,8 @@ public class ReplicationSourceWALReaderThread extends Thread {
8181
private AtomicLong totalBufferUsed;
8282
private long totalBufferQuota;
8383

84+
private ReplicationSourceManager replicationSourceManager;
85+
8486
/**
8587
* Creates a reader worker for a given WAL queue. Reads WAL entries off a given queue, batches the
8688
* entries, and puts them on a batch queue.
@@ -109,6 +111,7 @@ public ReplicationSourceWALReaderThread(ReplicationSourceManager manager,
109111
// memory used will be batchSizeCapacity * (nb.batches + 1)
110112
// the +1 is for the current thread reading before placing onto the queue
111113
int batchCount = conf.getInt("replication.source.nb.batches", 1);
114+
this.replicationSourceManager = manager;
112115
this.totalBufferUsed = manager.getTotalBufferUsed();
113116
this.totalBufferQuota = conf.getLong(HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_KEY,
114117
HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_DFAULT);
@@ -148,6 +151,7 @@ public void run() {
148151
long entrySize = getEntrySizeIncludeBulkLoad(entry);
149152
long entrySizeExlucdeBulkLoad = getEntrySizeExcludeBulkLoad(entry);
150153
batch.addEntry(entry);
154+
replicationSourceManager.setPendingShipment(true);
151155
updateBatchStats(batch, entry, entryStream.getPosition(), entrySize);
152156
boolean totalBufferTooLarge = acquireBufferQuota(entrySizeExlucdeBulkLoad);
153157
// Stop if too many entries or too big
@@ -156,6 +160,11 @@ public void run() {
156160
break;
157161
}
158162
}
163+
} else {
164+
replicationSourceManager.logPositionAndCleanOldLogs(entryStream.getCurrentPath(),
165+
this.replicationQueueInfo.getPeerClusterZnode(),
166+
entryStream.getPosition(),
167+
this.replicationQueueInfo.isQueueRecovered(), false);
159168
}
160169
}
161170
if (batch != null && (!batch.getLastSeqIds().isEmpty() || batch.getNbEntries() > 0)) {

hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,11 @@
2525
import static org.junit.Assert.assertNotNull;
2626
import static org.junit.Assert.assertTrue;
2727
import static org.junit.Assert.fail;
28+
import static org.mockito.Matchers.any;
29+
import static org.mockito.Matchers.anyBoolean;
30+
import static org.mockito.Matchers.anyString;
31+
import static org.mockito.Mockito.times;
32+
import static org.mockito.Mockito.verify;
2833
import static org.mockito.Mockito.when;
2934

3035
import java.io.IOException;
@@ -72,6 +77,7 @@
7277
import org.junit.experimental.categories.Category;
7378
import org.junit.rules.TestName;
7479
import org.junit.runner.RunWith;
80+
import org.mockito.ArgumentCaptor;
7581
import org.mockito.Mockito;
7682
import org.mockito.runners.MockitoJUnitRunner;
7783

@@ -371,6 +377,40 @@ public void testReplicationSourceWALReaderThread() throws Exception {
371377
assertEquals(getRow(entryBatch.getWalEntries().get(0)), "foo");
372378
}
373379

380+
@Test
381+
public void testReplicationSourceUpdatesLogPositionOnFilteredEntries() throws Exception {
382+
appendEntriesToLog(3);
383+
// get ending position
384+
long position;
385+
try (WALEntryStream entryStream =
386+
new WALEntryStream(walQueue, fs, conf, new MetricsSource("1"))) {
387+
entryStream.next();
388+
entryStream.next();
389+
entryStream.next();
390+
position = entryStream.getPosition();
391+
}
392+
// start up a readerThread with a WALEntryFilter that always filter the entries
393+
ReplicationSourceManager mockSourceManager = Mockito.mock(ReplicationSourceManager.class);
394+
when(mockSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong(0));
395+
ReplicationSourceWALReaderThread readerThread = new ReplicationSourceWALReaderThread(
396+
mockSourceManager, getQueueInfo(), walQueue, 0, fs, conf, new WALEntryFilter() {
397+
@Override
398+
public Entry filter(Entry entry) {
399+
return null;
400+
}
401+
}, new MetricsSource("1"));
402+
readerThread.start();
403+
Thread.sleep(100);
404+
ArgumentCaptor<Long> positionCaptor = ArgumentCaptor.forClass(Long.class);
405+
verify(mockSourceManager, times(3))
406+
.logPositionAndCleanOldLogs(any(Path.class),
407+
anyString(),
408+
positionCaptor.capture(),
409+
anyBoolean(),
410+
anyBoolean());
411+
assertEquals(position, positionCaptor.getValue().longValue());
412+
}
413+
374414
@Test
375415
public void testWALKeySerialization() throws Exception {
376416
Map<String, byte[]> attributes = new HashMap<String, byte[]>();

0 commit comments

Comments
 (0)