Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public class MesosConfiguration {
private Optional<String> mesosUsername = Optional.empty();
private Optional<String> mesosPassword = Optional.empty();

private long reconnectTimeoutMillis = 60000;
private long reconnectTimeoutMillis = 120000;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is lengthened because the attempt time now takes into account the startup period, not just the time from POST -> subscribed event like it did before

// Set to a value at least a few seconds below the configured mesos offer timeout
private long offerTimeout = 45000;
private long offerLockTimeout = 1000;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ public abstract class SingularityMesosScheduler {
/**
* Called when an uncaught exception occurs while attempting to connect to the mesos master
*/
public abstract void onConnectException(Throwable t);
public abstract void onSubscribeException(Throwable t);

/**
* Singularity-specific methods used elsewhere in the code to determine scheduler
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ public void run() {
} catch (RuntimeException|URISyntaxException e) {
if (!Throwables.getCausalChain(e).stream().anyMatch((t) -> t instanceof InterruptedException)) {
LOG.error("Could not connect: ", e);
scheduler.onConnectException(e);
scheduler.onSubscribeException(e);
} else {
LOG.warn("Interruped stream from mesos on subscriber thread, closing");
}
Expand Down Expand Up @@ -213,10 +213,11 @@ private void connect(URI mesosMasterURI, FrameworkInfo frameworkInfo, Singularit

events.filter(event -> event.getType() == Event.Type.SUBSCRIBED)
.map(Event::getSubscribed)
.subscribe(subscribed -> {
.subscribe(
subscribed -> {
this.frameworkId = subscribed.getFrameworkId();
scheduler.subscribed(subscribed);
}, scheduler::onUncaughtException
}, scheduler::onSubscribeException
);

events.filter(event -> event.getType() == Event.Type.UPDATE)
Expand Down Expand Up @@ -266,6 +267,7 @@ public void close() {
if (subscriberThread != null) {
try {
if (!subscriberThread.isInterrupted()) {
LOG.info("Interrupting current subscriber thread");
subscriberThread.interrupt();
}
subscriberThread.join(10000);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -101,6 +102,9 @@ public class SingularityMesosSchedulerImpl extends SingularityMesosScheduler {

private final AtomicReference<MasterInfo> masterInfo = new AtomicReference<>();
private final StatusUpdateQueue queuedUpdates;
private final ExecutorService reconnectExecutor;
private final AtomicBoolean restartInProgress = new AtomicBoolean(false);
private final AtomicReference<Throwable> reconnectException = new AtomicReference<>(null);

@Inject
SingularityMesosSchedulerImpl(SingularitySchedulerLock lock,
Expand Down Expand Up @@ -139,7 +143,7 @@ public class SingularityMesosSchedulerImpl extends SingularityMesosScheduler {
this.subscribeExecutor = threadPoolFactory.getSingleThreaded("subscribe-scheduler");
this.state = new SchedulerState();
this.configuration = configuration;

this.reconnectExecutor = threadPoolFactory.getSingleThreaded("reconnect-scheduler");
}

@Override
Expand All @@ -149,7 +153,6 @@ public CompletableFuture<Void> subscribed(Subscribed subscribed) {
MasterInfo newMasterInfo = subscribed.getMasterInfo();
masterInfo.set(newMasterInfo);
Preconditions.checkState(state.getMesosSchedulerState() != MesosSchedulerState.SUBSCRIBED, "Asked to startup - but in invalid state: %s", state.getMesosSchedulerState());

double advertisedHeartbeatIntervalSeconds = subscribed.getHeartbeatIntervalSeconds();
if (advertisedHeartbeatIntervalSeconds > 0) {
heartbeatIntervalSeconds = Optional.of(advertisedHeartbeatIntervalSeconds);
Expand All @@ -162,6 +165,7 @@ public CompletableFuture<Void> subscribed(Subscribed subscribed) {
}
startup.startup(newMasterInfo);
state.setMesosSchedulerState(MesosSchedulerState.SUBSCRIBED);
restartInProgress.set(false);
handleQueuedStatusUpdates();
}, "subscribed", false),
subscribeExecutor);
Expand Down Expand Up @@ -286,9 +290,16 @@ public void heartbeat(Event event) {

@Override
public void onUncaughtException(Throwable t) {
LOG.error("uncaught exception", t);
if (t instanceof PrematureChannelClosureException) {
LOG.warn("PrematureChannelClosureException, will attempt restart");
} else if (t instanceof IllegalStateException && restartInProgress.get()) {
onSubscribeException(t);
} else {
LOG.error("uncaught exception", t);
}
callWithStateLock(() -> {
if (t instanceof PrematureChannelClosureException) {
state.setMesosSchedulerState(MesosSchedulerState.PAUSED_FOR_MESOS_RECONNECT);
reconnectMesos();
} else {
LOG.error("Aborting due to error: {}", t.getMessage(), t);
Expand All @@ -299,7 +310,11 @@ public void onUncaughtException(Throwable t) {
}

@Override
public void onConnectException(Throwable t) {
public void onSubscribeException(Throwable t) {
LOG.warn("Received {} ({}), checking for reconnect", t.getClass(), t.getMessage());
if (!state.isRunning()) {
reconnectException.set(t);
}
reconnectMesos();
}

Expand Down Expand Up @@ -383,34 +398,56 @@ private void callWithStateLock(Runnable function, String name, boolean ignoreIfN
}

public void reconnectMesos() {
callWithStateLock(() -> {
state.setMesosSchedulerState(MesosSchedulerState.PAUSED_FOR_MESOS_RECONNECT);
LOG.info("Paused scheduler actions, closing existing mesos connection");
mesosSchedulerClient.close();
LOG.info("Closed existing mesos connection");
try {
Retryer<Void> startRetryer = RetryerBuilder.<Void>newBuilder()
.retryIfException()
.retryIfRuntimeException()
.withWaitStrategy(WaitStrategies.exponentialWait())
.withStopStrategy(StopStrategies.stopAfterDelay(configuration.getMesosConfiguration().getReconnectTimeoutMillis(), TimeUnit.MILLISECONDS))
.build();
startRetryer.call(() -> {
start();
return null;
});
} catch (RetryException re) {
if (re.getLastFailedAttempt().getExceptionCause() != null) {
LOG.error("Unable to retry mesos master connection", re.getLastFailedAttempt().getExceptionCause());
notifyStopping();
abort.abort(AbortReason.MESOS_ERROR, Optional.of(re.getLastFailedAttempt().getExceptionCause()));
// Done on a separate thread so that possibly interrupting the subscriber thread will not loop around to call
// this method again on the same thread, causing an InterruptException
if (!restartInProgress.compareAndSet(false, true)) {
return;
}
CompletableFuture.runAsync(this::reconnectMesosSync, reconnectExecutor);
}

public void reconnectMesosSync() {
try {
Retryer<Boolean> startRetryer = RetryerBuilder.<Boolean>newBuilder()
.retryIfException()
.retryIfRuntimeException()
.retryIfResult((r) -> r == null || !r)
.withWaitStrategy(WaitStrategies.exponentialWait())
.withStopStrategy(StopStrategies.stopAfterDelay(configuration.getMesosConfiguration().getReconnectTimeoutMillis(), TimeUnit.MILLISECONDS))
.build();
startRetryer.call(() -> {
mesosSchedulerClient.close();
LOG.info("Closed existing mesos connection");
start();
long start = System.currentTimeMillis();
while (!state.isRunning() && System.currentTimeMillis() - start < 30000) {
Thread.sleep(50);
Throwable t = reconnectException.getAndSet(null);
if (t != null) {
if (t instanceof IllegalStateException) {
// Mesos master's redirect endpoint will return a 301 with empty location if a leader is not elected
// This results in an IllegalStateException from MesosClient
LOG.warn("IllegalStateException from MesosClient, mesos is likely in process of leader election, will retry");
} else {
LOG.warn("Exception during reconnect", t);
}
return false;
}
}
} catch (Throwable t) {
LOG.error("Unable to retry mesos master connection", t);
return state.isRunning();
});
} catch (RetryException re) {
if (re.getLastFailedAttempt().getExceptionCause() != null) {
LOG.error("Unable to retry mesos master connection", re.getLastFailedAttempt().getExceptionCause());
notifyStopping();
abort.abort(AbortReason.MESOS_ERROR, Optional.of(t));
abort.abort(AbortReason.MESOS_ERROR, Optional.of(re.getLastFailedAttempt().getExceptionCause()));
}
}, "reconnectMesos", false);
} catch (Throwable t) {
LOG.error("Unable to retry mesos master connection", t);
notifyStopping();
abort.abort(AbortReason.MESOS_ERROR, Optional.of(t));
}

}

public void setZkConnectionState(ConnectionState connectionState) {
Expand Down