Skip to content

HBASE-24877 Add option to avoid aborting RS process upon uncaught exc… #2255

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Sep 14, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,12 @@
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Predicate;

import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.mutable.MutableBoolean;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
Expand Down Expand Up @@ -120,6 +123,14 @@ public class ReplicationSource implements ReplicationSourceInterface {
// ReplicationEndpoint which will handle the actual replication
private volatile ReplicationEndpoint replicationEndpoint;

private boolean abortOnError;
//This is needed for the startup loop to identify when there's already
//an initialization happening (but not finished yet),
//so that it doesn't try submit another initialize thread.
//NOTE: this should only be set to false at the end of initialize method, prior to return.
private AtomicBoolean startupOngoing = new AtomicBoolean(false);


/**
* A filter (or a chain of filters) for WAL entries; filters out edits.
*/
Expand Down Expand Up @@ -217,6 +228,10 @@ public void init(Configuration conf, FileSystem fs, ReplicationSourceManager man
this.throttler = new ReplicationThrottler((double) currentBandwidth / 10.0);
this.totalBufferUsed = manager.getTotalBufferUsed();
this.walFileLengthProvider = walFileLengthProvider;

this.abortOnError = this.conf.getBoolean("replication.source.regionserver.abort",
true);

LOG.info("queueId={}, ReplicationSource: {}, currentBandwidth={}", queueId,
replicationPeer.getId(), this.currentBandwidth);
}
Expand Down Expand Up @@ -372,10 +387,10 @@ private void tryStartNewShipper(String walGroupId, PriorityBlockingQueue<Path> q
createNewWALReader(walGroupId, queue, worker.getStartPosition());
Threads.setDaemonThreadRunning(
walReader, Thread.currentThread().getName()
+ ".replicationSource.wal-reader." + walGroupId + "," + queueId,
this::uncaughtException);
+ ".replicationSource.wal-reader." + walGroupId + "," + queueId,
(t,e) -> this.uncaughtException(t, e, this.manager, this.getPeerId()));
worker.setWALReader(walReader);
worker.startup(this::uncaughtException);
worker.startup((t,e) -> this.uncaughtException(t, e, this.manager, this.getPeerId()));
return worker;
}
});
Expand Down Expand Up @@ -450,11 +465,28 @@ WALEntryFilter getWalEntryFilter() {
return walEntryFilter;
}

protected final void uncaughtException(Thread t, Throwable e) {
protected final void uncaughtException(Thread t, Throwable e,
ReplicationSourceManager manager, String peerId) {
RSRpcServices.exitIfOOME(e);
LOG.error("Unexpected exception in {} currentPath={}",
t.getName(), getCurrentPath(), e);
server.abort("Unexpected exception in " + t.getName(), e);
if(abortOnError){
server.abort("Unexpected exception in " + t.getName(), e);
}
if(manager != null){
while (true) {
try {
LOG.info("Refreshing replication sources now due to previous error on thread: {}",
t.getName());
manager.refreshSources(peerId);
break;
} catch (IOException e1) {
LOG.error("Replication sources refresh failed.", e1);
sleepForRetries("Sleeping before try refreshing sources again",
maxRetriesMultiplier);
}
}
}
}

@Override
Expand Down Expand Up @@ -544,12 +576,16 @@ private void initialize() {
replicationEndpoint.stop();
if (sleepForRetries("Error starting ReplicationEndpoint", sleepMultiplier)) {
sleepMultiplier++;
} else {
this.startupOngoing.set(false);
throw new RuntimeException("Exhausted retries to start replication endpoint.");
}
}
}

if (!this.isSourceActive()) {
return;
this.startupOngoing.set(false);
throw new IllegalStateException("Source should be active.");
}

sleepMultiplier = 1;
Expand All @@ -571,7 +607,8 @@ private void initialize() {
}

if(!this.isSourceActive()) {
return;
this.startupOngoing.set(false);
throw new IllegalStateException("Source should be active.");
}
LOG.info("{} Source: {}, is now replicating from cluster: {}; to peer cluster: {};",
logPeerId(), this.replicationQueueInfo.getQueueId(), clusterId, peerClusterId);
Expand All @@ -583,16 +620,30 @@ private void initialize() {
PriorityBlockingQueue<Path> queue = entry.getValue();
tryStartNewShipper(walGroupId, queue);
}
this.startupOngoing.set(false);
}

@Override
public void startup() {
// mark we are running now
//Flag that signalizes uncaught error happening while starting up the source
// and a retry should be attempted
MutableBoolean retryStartup = new MutableBoolean(true);
this.sourceRunning = true;
initThread = new Thread(this::initialize);
Threads.setDaemonThreadRunning(initThread,
Thread.currentThread().getName() + ".replicationSource," + this.queueId,
this::uncaughtException);
do {
if(retryStartup.booleanValue()) {
retryStartup.setValue(false);
startupOngoing.set(true);
// mark we are running now
initThread = new Thread(this::initialize);
Threads.setDaemonThreadRunning(initThread,
Thread.currentThread().getName() + ".replicationSource," + this.queueId,
(t,e) -> {
sourceRunning = false;
uncaughtException(t, e, null, null);
retryStartup.setValue(!this.abortOnError);
});
}
} while (this.startupOngoing.get() && !this.abortOnError);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,8 @@ private boolean updateLogPosition(WALEntryBatch batch) {
public void startup(UncaughtExceptionHandler handler) {
String name = Thread.currentThread().getName();
Threads.setDaemonThreadRunning(this,
name + ".replicationSource.shipper" + walGroupId + "," + source.getQueueId(), handler);
name + ".replicationSource.shipper" + walGroupId + "," + source.getQueueId(),
handler::uncaughtException);
}

Path getCurrentPath() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.hadoop.hbase.replication.regionserver;
import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.META_WAL_PROVIDER_ID;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
Expand Down Expand Up @@ -396,6 +397,25 @@ protected void doStop() {
}
}

/**
* Deadend Endpoint. Does nothing.
*/
public static class FaultyReplicationEndpoint extends DoNothingReplicationEndpoint {

static int count = 0;

@Override
public synchronized UUID getPeerUUID() {
if(count==0) {
count++;
throw new RuntimeException();
} else {
return super.getPeerUUID();
}
}

}

/**
* Test HBASE-20497
* Moved here from TestReplicationSource because doesn't need cluster.
Expand Down Expand Up @@ -423,5 +443,77 @@ public void testRecoveredReplicationSourceShipperGetPosition() throws Exception
new RecoveredReplicationSourceShipper(conf, walGroupId, queue, source, storage);
assertEquals(1001L, shipper.getStartPosition());
}

/**
* Test ReplicationSource retries startup once an uncaught exception happens
* during initialization and <b>eplication.source.regionserver.abort</b> is set to false.
*/
@Test
public void testAbortFalseOnError() throws IOException {
ReplicationSource rs = new ReplicationSource();
Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
conf.setInt("replication.source.maxretriesmultiplier", 1);
conf.setBoolean("replication.source.regionserver.abort", false);
ReplicationPeer mockPeer = Mockito.mock(ReplicationPeer.class);
Mockito.when(mockPeer.getConfiguration()).thenReturn(conf);
Mockito.when(mockPeer.getPeerBandwidth()).thenReturn(0L);
ReplicationPeerConfig peerConfig = Mockito.mock(ReplicationPeerConfig.class);
Mockito.when(peerConfig.getReplicationEndpointImpl()).
thenReturn(FaultyReplicationEndpoint.class.getName());
Mockito.when(mockPeer.getPeerConfig()).thenReturn(peerConfig);
ReplicationSourceManager manager = Mockito.mock(ReplicationSourceManager.class);
Mockito.when(manager.getTotalBufferUsed()).thenReturn(new AtomicLong());
String queueId = "qid";
RegionServerServices rss =
TEST_UTIL.createMockRegionServerService(ServerName.parseServerName("a.b.c,1,1"));
rs.init(conf, null, manager, null, mockPeer, rss, queueId, null,
p -> OptionalLong.empty(), new MetricsSource(queueId));
try {
rs.startup();
assertTrue(rs.isSourceActive());
assertEquals(0, rs.getSourceMetrics().getSizeOfLogQueue());
rs.enqueueLog(new Path("a.1" + META_WAL_PROVIDER_ID));
assertEquals(0, rs.getSourceMetrics().getSizeOfLogQueue());
rs.enqueueLog(new Path("a.1"));
assertEquals(1, rs.getSourceMetrics().getSizeOfLogQueue());
} finally {
rs.terminate("Done");
rss.stop("Done");
}
}

/**
* Test ReplicationSource retries startup once an uncaught exception happens
* during initialization and <b>replication.source.regionserver.abort</b> is set to false.
*/
@Test
public void testAbortTrueOnError() throws IOException {
ReplicationSource rs = new ReplicationSource();
Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
conf.setInt("replication.source.maxretriesmultiplier", 1);
ReplicationPeer mockPeer = Mockito.mock(ReplicationPeer.class);
Mockito.when(mockPeer.getConfiguration()).thenReturn(conf);
Mockito.when(mockPeer.getPeerBandwidth()).thenReturn(0L);
ReplicationPeerConfig peerConfig = Mockito.mock(ReplicationPeerConfig.class);
Mockito.when(peerConfig.getReplicationEndpointImpl()).
thenReturn(FaultyReplicationEndpoint.class.getName());
Mockito.when(mockPeer.getPeerConfig()).thenReturn(peerConfig);
ReplicationSourceManager manager = Mockito.mock(ReplicationSourceManager.class);
Mockito.when(manager.getTotalBufferUsed()).thenReturn(new AtomicLong());
String queueId = "qid";
RegionServerServices rss =
TEST_UTIL.createMockRegionServerService(ServerName.parseServerName("a.b.c,1,1"));
rs.init(conf, null, manager, null, mockPeer, rss, queueId, null,
p -> OptionalLong.empty(), new MetricsSource(queueId));
try {
rs.startup();
Waiter.waitFor(conf, 1000, () -> FaultyReplicationEndpoint.count > 0);
assertFalse(rs.isSourceActive());
assertTrue(rss.isAborted());
} finally {
rs.terminate("Done");
rss.stop("Done");
}
}
}