Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
26cf1dd
Rework scheduler connect + reconnect logic
ssalinas Oct 25, 2019
4db72c6
some cleanup, most tests passing
ssalinas Oct 25, 2019
493a908
more WIP, use session error policy not standard
ssalinas Oct 28, 2019
b185a2a
Don't immediately call notLeader
ssalinas Oct 28, 2019
15f4a97
Don't abort in pollers for KeeperException
ssalinas Oct 28, 2019
f7b2fad
no abort for zk exception on status updates
ssalinas Oct 28, 2019
707ed24
Buffering of status updates during pause for zk
ssalinas Oct 28, 2019
78f3370
resolve race in timer task
ssalinas Oct 28, 2019
95fea0b
synchronized
ssalinas Oct 28, 2019
6a6c9f9
missing toString
ssalinas Oct 28, 2019
851b179
logging, consider status update a heartbeat
ssalinas Oct 29, 2019
337f3df
Better handling of observable restart
ssalinas Oct 29, 2019
ffc8424
Add test resource call to more easily retry mesos reconnect
ssalinas Oct 29, 2019
528280a
fix queued status updates and handling of mesos reconnect
ssalinas Oct 29, 2019
f009e5c
Mark scheduler stopped to avoid exception on shutdown
ssalinas Oct 29, 2019
eeb5786
Mark scheduler stopped to avoid exception on shutdown
ssalinas Oct 29, 2019
ee54958
run startup with request locks
ssalinas Oct 29, 2019
ef1d0b6
update last offer time immediately
ssalinas Oct 30, 2019
c74216c
expire status update delats out after 10s
ssalinas Oct 30, 2019
d2ff84a
Add KeeperException as cause of multi-get timeout
ssalinas Oct 31, 2019
8781eba
Subscribe on separate threads in mesos client
ssalinas Oct 30, 2019
da47781
Only delay for status updates when many requests are delayed
ssalinas Oct 31, 2019
ce703e9
fix iterator
ssalinas Oct 31, 2019
0031717
merge status update delta changes
ssalinas Oct 31, 2019
3fa117b
comment
ssalinas Oct 31, 2019
f54471e
Make the update delta poller just a histogram
ssalinas Oct 31, 2019
8c386fc
run startup updates in parallel
ssalinas Nov 1, 2019
0f10596
short circuit offer checks and launch if taking too long, parallel st…
ssalinas Nov 1, 2019
fcd043b
fix tests
ssalinas Nov 1, 2019
9ebd48d
missing config
ssalinas Nov 1, 2019
5d9a659
more test fixing
ssalinas Nov 1, 2019
cd8a6ac
retune default concurrencies
ssalinas Nov 1, 2019
9e09288
retune default concurrencies
ssalinas Nov 1, 2019
8dbc54f
merge conflicts
ssalinas Nov 1, 2019
ac640e8
different thread strategy
ssalinas Nov 1, 2019
b9285b1
log delay
ssalinas Nov 1, 2019
1b118ad
don't timeout for lock in offers
ssalinas Nov 1, 2019
5253094
unused method
ssalinas Nov 1, 2019
4b9da86
fix lots of tests now that this is async
ssalinas Nov 1, 2019
d41ac97
auto decline offers while we process startup and leader cache loading
ssalinas Nov 1, 2019
748dddb
return type
ssalinas Nov 1, 2019
831dc09
log level change
ssalinas Nov 1, 2019
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions SingularityService/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,11 @@
<artifactId>okhttp</artifactId>
</dependency>

<dependency>
<groupId>com.squareup.tape2</groupId>
<artifactId>tape</artifactId>
</dependency>

<dependency>
<groupId>com.sun.mail</groupId>
<artifactId>javax.mail</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,6 @@
import javax.inject.Named;
import javax.inject.Singleton;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.framework.state.ConnectionStateListener;
import org.eclipse.jetty.server.Server;
import org.slf4j.ILoggerFactory;
import org.slf4j.Logger;
Expand All @@ -32,7 +29,7 @@
import ch.qos.logback.classic.LoggerContext;

@Singleton
public class SingularityAbort implements ConnectionStateListener {
public class SingularityAbort {

private static final Logger LOG = LoggerFactory.getLogger(SingularityAbort.class);

Expand Down Expand Up @@ -60,14 +57,6 @@ public SingularityAbort(SingularitySmtpSender smtpSender,
this.hostAndPort = hostAndPort;
}

@Override
public void stateChanged(CuratorFramework client, ConnectionState newState) {
if (newState == ConnectionState.LOST) {
LOG.error("Aborting due to new connection state received from ZooKeeper: {}", newState);
abort(AbortReason.LOST_ZK_CONNECTION, Optional.empty());
}
}

public enum AbortReason {
LOST_ZK_CONNECTION, LOST_LEADERSHIP, UNRECOVERABLE_ERROR, ERROR_IN_LEADER_ONLY_POLLER, TEST_ABORT, MESOS_ERROR, LOST_MESOS_CONNECTION;
}
Expand Down Expand Up @@ -137,7 +126,7 @@ private void sendAbortMail(final String message, final Optional<Throwable> throw
StringWriter sw = new StringWriter();
PrintWriter pw = new PrintWriter(sw);
throwable.get().printStackTrace(pw);
body = throwable.get().getMessage() + "\n" + sw.toString();
body = "<pre>\n" + throwable.get().getMessage() + "\n" + sw.toString() + "\n</pre>";
} else {
body = "(no stack trace)";
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,12 @@

import static com.google.common.base.Preconditions.checkNotNull;

import java.util.Set;

import javax.inject.Inject;
import javax.inject.Provider;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.curator.framework.state.SessionConnectionStateErrorPolicy;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -23,24 +21,20 @@ public class SingularityCuratorProvider implements Provider<CuratorFramework> {
private final CuratorFramework curatorFramework;

@Inject
public SingularityCuratorProvider(final SingularityConfiguration configuration, final Set<ConnectionStateListener> connectionStateListeners) {
public SingularityCuratorProvider(final SingularityConfiguration configuration) {

checkNotNull(configuration, "configuration is null");
checkNotNull(connectionStateListeners, "connectionStateListeners is null");

ZooKeeperConfiguration zookeeperConfig = configuration.getZooKeeperConfiguration();

this.curatorFramework = CuratorFrameworkFactory.builder()
.defaultData(null)
.connectionStateErrorPolicy(new SessionConnectionStateErrorPolicy())
.sessionTimeoutMs(zookeeperConfig.getSessionTimeoutMillis())
.connectionTimeoutMs(zookeeperConfig.getConnectTimeoutMillis())
.connectString(zookeeperConfig.getQuorum())
.retryPolicy(new ExponentialBackoffRetry(zookeeperConfig.getRetryBaseSleepTimeMilliseconds(), zookeeperConfig.getRetryMaxTries()))
.namespace(zookeeperConfig.getZkNamespace()).build();

for (ConnectionStateListener connectionStateListener : connectionStateListeners) {
curatorFramework.getConnectionStateListenable().addListener(connectionStateListener);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,19 @@
import java.lang.management.ManagementFactory;
import java.lang.management.RuntimeMXBean;
import java.util.Optional;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

import javax.inject.Singleton;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.leader.LeaderLatchListener;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.mesos.v1.Protos.MasterInfo;
import org.apache.mesos.v1.Protos.Offer;
import org.slf4j.Logger;
Expand All @@ -28,9 +33,10 @@
import com.hubspot.singularity.sentry.SingularityExceptionNotifier;

@Singleton
public class SingularityLeaderController implements LeaderLatchListener {
public class SingularityLeaderController implements LeaderLatchListener, ConnectionStateListener {

private static final Logger LOG = LoggerFactory.getLogger(SingularityLeaderController.class);
private static final Timer TIMER = new Timer();

private final StateManager stateManager;
private final SingularityAbort abort;
Expand All @@ -40,7 +46,10 @@ public class SingularityLeaderController implements LeaderLatchListener {
private final StatePoller statePoller;
private final SingularityMesosScheduler scheduler;
private final OfferCache offerCache;
private final SingularityConfiguration configuration;
private final ReentrantLock stateHandlerLock;

private volatile TimerTask lostConnectionStateChecker;
private volatile boolean master;

@Inject
Expand All @@ -59,18 +68,64 @@ public SingularityLeaderController(StateManager stateManager,
this.saveStateEveryMs = TimeUnit.SECONDS.toMillis(configuration.getSaveStateEverySeconds());
this.statePoller = new StatePoller();
this.scheduler = scheduler;

this.configuration = configuration;
this.offerCache = offerCache;

this.master = false;
this.stateHandlerLock = new ReentrantLock();
}

public void start() {
statePoller.start();
}

public void stop() {
scheduler.notifyStopping();
statePoller.finish();
TIMER.cancel();
}

@Override
public void stateChanged(CuratorFramework client, ConnectionState newState) {
LOG.info("Received update for new zk connection state {}", newState);
stateHandlerLock.lock();
try {
// If the new state is not connected `setZkConnectionState` will effectively pause all pollers from
// continuing to process events. An explicit call to pauseForDatastoreReconnect is not needed here
scheduler.setZkConnectionState(newState);
if (!newState.isConnected()) {
LOG.info("No longer connected to zk, pausing scheduler actions and waiting up to {}ms for reconnect",
configuration.getZooKeeperConfiguration().getAbortAfterConnectionLostForMillis());
if (lostConnectionStateChecker != null) {
LOG.warn("Already started connection state check, due in {}ms", lostConnectionStateChecker.scheduledExecutionTime() - System.currentTimeMillis());
return;
}
lostConnectionStateChecker = new TimerTask() {
@Override
public void run() {
stateHandlerLock.lock();
if (scheduler.getState().getZkConnectionState().isConnected()) {
LOG.debug("Reconnected to zk, will not abort");
return;
}
try {
LOG.error("Aborting due to loss of zookeeper connection for {}ms. Current connection state {}",
configuration.getZooKeeperConfiguration().getAbortAfterConnectionLostForMillis(), newState);
abort.abort(AbortReason.LOST_ZK_CONNECTION, Optional.empty());
} finally {
stateHandlerLock.unlock();
}
}
};
TIMER.schedule(lostConnectionStateChecker, configuration.getZooKeeperConfiguration().getAbortAfterConnectionLostForMillis());
} else if (lostConnectionStateChecker != null) {
LOG.info("Reconnected to zk, scheduler actions resumed");
lostConnectionStateChecker.cancel();
lostConnectionStateChecker = null;
}
} finally {
stateHandlerLock.unlock();
}
}

protected boolean isTestMode() {
Expand All @@ -79,23 +134,28 @@ protected boolean isTestMode() {

@Override
public void isLeader() {
LOG.info("We are now the leader! Current state {}", scheduler.getState());

master = true;
try {
if (!isTestMode()) {
scheduler.start();
statePoller.wake();
}
} catch (Throwable t) {
LOG.error("While starting driver", t);
exceptionNotifier.notify(String.format("Error starting driver (%s)", t.getMessage()), t);
stateHandlerLock.lock();
try {
LOG.info("We are now the leader! Current state {}", scheduler.getState());

master = true;
try {
scheduler.notifyStopping();
} catch (Throwable th) {
LOG.warn("While stopping scheduler due to bad initial start({})", th.getMessage());
if (!isTestMode()) {
scheduler.start();
statePoller.wake();
}
} catch (Throwable t) {
LOG.error("While starting driver", t);
exceptionNotifier.notify(String.format("Error starting driver (%s)", t.getMessage()), t);
try {
scheduler.notifyStopping();
} catch (Throwable th) {
LOG.warn("While stopping scheduler due to bad initial start({})", th.getMessage());
}
abort.abort(AbortReason.UNRECOVERABLE_ERROR, Optional.of(t));
}
abort.abort(AbortReason.UNRECOVERABLE_ERROR, Optional.of(t));
} finally {
stateHandlerLock.unlock();
}
}

Expand All @@ -113,20 +173,34 @@ public Optional<Long> getLastOfferTimestamp() {

@Override
public void notLeader() {
LOG.info("We are not the leader! Current state {}", scheduler.getState());

master = false;

if (scheduler.isRunning() && !isTestMode()) {
try {
scheduler.notifyStopping();
statePoller.wake();
} catch (Throwable t) {
LOG.error("While stopping driver", t);
exceptionNotifier.notify(String.format("Error while stopping driver (%s)", t.getMessage()), t);
} finally {
abort.abort(AbortReason.LOST_LEADERSHIP, Optional.<Throwable>empty());
stateHandlerLock.lock();
try {
LOG.info("We are not the leader! Current state {}", scheduler.getState());
master = false;
if (!isTestMode()) {
LOG.info("Might not be the leader, pausing scheduler actions");
scheduler.pauseForDatastoreReconnect();
// Check again if we are still not leader in a few seconds. LeaderLatch.reset can often get called on reconnect, which
// will call notLeader/isLeader is quick succession.
TIMER.schedule(new TimerTask() {
@Override
public void run() {
stateHandlerLock.lock();
try {
if (master) {
LOG.debug("Reconnected as leader before shutdown timeout");
} else {
LOG.info("No longer the leader, stopping scheduler actions");
scheduler.notLeader();
}
} finally {
stateHandlerLock.unlock();
}
}
}, 5000);
}
} finally {
stateHandlerLock.unlock();
}
}

Expand Down Expand Up @@ -157,7 +231,7 @@ private SingularityHostState getHostState() {
numCachedOffers++;
}

return new SingularityHostState(master, uptime, scheduler.getState().name(), millisSinceLastOfferTimestamp, hostAndPort.getHost(), hostAndPort.getHost(), mesosMaster, scheduler.isRunning(),
return new SingularityHostState(master, uptime, scheduler.getState().getMesosSchedulerState().name(), millisSinceLastOfferTimestamp, hostAndPort.getHost(), hostAndPort.getHost(), mesosMaster, scheduler.isRunning(),
numCachedOffers, cachedCpus, cachedMemoryBytes);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;

import javax.inject.Inject;
Expand All @@ -21,10 +22,10 @@
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.leader.LeaderLatch;
import org.apache.curator.framework.recipes.leader.LeaderLatchListener;
import org.apache.curator.framework.state.ConnectionStateListener;

import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.services.s3.AmazonS3Client;
import com.codahale.metrics.Histogram;
import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
import com.fasterxml.jackson.databind.ObjectMapper;
Expand Down Expand Up @@ -69,6 +70,7 @@
import com.hubspot.singularity.mesos.SingularityMesosStatusUpdateHandler;
import com.hubspot.singularity.mesos.SingularityNoOfferCache;
import com.hubspot.singularity.mesos.SingularityOfferCache;
import com.hubspot.singularity.mesos.StatusUpdateQueue;
import com.hubspot.singularity.metrics.SingularityGraphiteReporter;
import com.hubspot.singularity.resources.SingularityServiceUIModule;
import com.hubspot.singularity.scheduler.SingularityLeaderOnlyPoller;
Expand Down Expand Up @@ -113,7 +115,6 @@ public class SingularityMainModule implements Module {

public static final String LOST_TASKS_METER = "singularity.lost.tasks.meter";

public static final String STATUS_UPDATE_DELTA_30S_AVERAGE = "singularity.status.update.delta.minute.average";
public static final String STATUS_UPDATE_DELTAS = "singularity.status.update.deltas";
public static final String LAST_MESOS_MASTER_HEARTBEAT_TIME = "singularity.last.mesos.master.heartbeat.time";

Expand All @@ -130,9 +131,6 @@ public void configure(Binder binder) {
binder.bind(LeaderLatch.class).to(SingularityLeaderLatch.class).in(Scopes.SINGLETON);
binder.bind(CuratorFramework.class).toProvider(SingularityCuratorProvider.class).in(Scopes.SINGLETON);

Multibinder<ConnectionStateListener> connectionStateListeners = Multibinder.newSetBinder(binder, ConnectionStateListener.class);
connectionStateListeners.addBinding().to(SingularityAbort.class).in(Scopes.SINGLETON);

Multibinder<LeaderLatchListener> leaderLatchListeners = Multibinder.newSetBinder(binder, LeaderLatchListener.class);
leaderLatchListeners.addBinding().to(SingularityLeaderController.class).in(Scopes.SINGLETON);

Expand All @@ -147,7 +145,7 @@ public void configure(Binder binder) {
binder.bind(SingularityExceptionNotifier.class).in(Scopes.SINGLETON);
binder.bind(LoadBalancerClient.class).to(LoadBalancerClientImpl.class).in(Scopes.SINGLETON);
binder.bind(SingularityMailRecordCleaner.class).in(Scopes.SINGLETON);

binder.bind(StatusUpdateQueue.class).in(Scopes.SINGLETON);
binder.bind(SingularityWebhookPoller.class).in(Scopes.SINGLETON);

binder.bind(SingularityAbort.class).in(Scopes.SINGLETON);
Expand All @@ -173,7 +171,7 @@ public void configure(Binder binder) {
binder.bindConstant().annotatedWith(Names.named(SERVER_ID_PROPERTY)).to(UUID.randomUUID().toString());

binder.bind(SingularityManagedScheduledExecutorServiceFactory.class).in(Scopes.SINGLETON);
binder.bind(SingularityManagedCachedThreadPoolFactory.class).in(Scopes.SINGLETON);
binder.bind(SingularityManagedThreadPoolFactory.class).in(Scopes.SINGLETON);

binder.bind(SingularityGraphiteReporter.class).in(Scopes.SINGLETON);

Expand Down Expand Up @@ -393,18 +391,11 @@ public Meter providesLostTasksMeter(MetricRegistry registry) {
return registry.meter("com.hubspot.singularity.lostTasks");
}

@Provides
@Singleton
@Named(STATUS_UPDATE_DELTA_30S_AVERAGE)
public AtomicLong provideDeltasMap() {
return new AtomicLong(0);
}

@Provides
@Singleton
@Named(STATUS_UPDATE_DELTAS)
public ConcurrentHashMap<Long, Long> provideUpdateDeltasMap() {
return new ConcurrentHashMap<>();
public Histogram provideUpdateDeltasMap(MetricRegistry registry) {
return registry.histogram("status update delta");
}

@Provides
Expand Down
Loading