Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import com.hubspot.singularity.config.SMTPConfiguration;
import com.hubspot.singularity.config.SingularityConfiguration;
import com.hubspot.singularity.managed.SingularityLifecycleManaged;
import com.hubspot.singularity.managed.SingularityPreJettyLifecycle;
import com.hubspot.singularity.sentry.SingularityExceptionNotifier;
import com.hubspot.singularity.smtp.SingularitySmtpSender;
import java.io.PrintWriter;
Expand Down Expand Up @@ -72,7 +73,11 @@ public void abort(AbortReason abortReason, Optional<Throwable> throwable) {
SingularityLifecycleManaged lifecycle = injector.getInstance(
SingularityLifecycleManaged.class
);
SingularityPreJettyLifecycle preJettyLifecycle = injector.getInstance(
SingularityPreJettyLifecycle.class
);
try {
preJettyLifecycle.stop();
lifecycle.stop();
} catch (Throwable t) {
LOG.error("While shutting down", t);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import com.hubspot.singularity.hooks.SnsWebhookRetryer;
import com.hubspot.singularity.hooks.WebhookQueueType;
import com.hubspot.singularity.managed.SingularityLifecycleManaged;
import com.hubspot.singularity.managed.SingularityPreJettyLifecycle;
import com.hubspot.singularity.mesos.OfferCache;
import com.hubspot.singularity.mesos.SingularityMesosStatusUpdateHandler;
import com.hubspot.singularity.mesos.SingularityNoOfferCache;
Expand Down Expand Up @@ -209,6 +210,7 @@ public void configure(Binder binder) {
binder.bind(SingularityMesosStatusUpdateHandler.class).in(Scopes.SINGLETON);

binder.bind(SingularityLifecycleManaged.class).asEagerSingleton();
binder.bind(SingularityPreJettyLifecycle.class).asEagerSingleton();

if (configuration.isCacheOffers()) {
binder.bind(OfferCache.class).to(SingularityOfferCache.class).in(Scopes.SINGLETON);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,15 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Singleton
public class SingularityManagedScheduledExecutorServiceFactory {
private static final Logger LOG = LoggerFactory.getLogger(
SingularityManagedScheduledExecutorServiceFactory.class
);

private final AtomicBoolean stopped = new AtomicBoolean();
private final List<ScheduledExecutorService> executorPools = new ArrayList<>();

Expand Down Expand Up @@ -68,7 +74,9 @@ public void stop() throws Exception {
final long start = System.currentTimeMillis();

if (!service.awaitTermination(timeoutLeftInMillis, TimeUnit.MILLISECONDS)) {
return;
LOG.warn("Scheduled executor service task did not exit cleanly");
service.shutdownNow();
continue;
}

timeoutLeftInMillis -= (System.currentTimeMillis() - start);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,18 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Singleton
public class SingularityManagedThreadPoolFactory {
private static final Logger LOG = LoggerFactory.getLogger(
SingularityManagedThreadPoolFactory.class
);

private final AtomicBoolean stopped = new AtomicBoolean();
private final List<ExecutorService> executorPools = new ArrayList<>();

Expand Down Expand Up @@ -88,7 +93,8 @@ public void stop() throws Exception {
final long start = System.currentTimeMillis();

if (!service.awaitTermination(timeoutLeftInMillis, TimeUnit.MILLISECONDS)) {
return;
LOG.warn("Executor service tasks did not exit in time");
continue;
}

timeoutLeftInMillis -= (System.currentTimeMillis() - start);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ public class SingularityConfiguration extends Configuration {

private int coreThreadpoolSize = 8;

private long threadpoolShutdownDelayInSeconds = 10;
private long threadpoolShutdownDelayInSeconds = 20;

@Valid
@JsonProperty("customExecutor")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,13 @@ public class SingularityLifecycleManaged implements Managed {
private final SingularityGraphiteReporter graphiteReporter;
private final ExecutorIdGenerator executorIdGenerator;
private final Set<SingularityLeaderOnlyPoller> leaderOnlyPollers;
private final SingularityPreJettyLifecycle preJettyLifecycle;
private final boolean readOnly;

private final CuratorFramework curatorFramework;
private final AtomicBoolean started = new AtomicBoolean(false);
private final AtomicBoolean stopped = new AtomicBoolean(false);
private final AtomicBoolean preJettyStopped = new AtomicBoolean(false);

@Inject
public SingularityLifecycleManaged(
Expand All @@ -60,7 +62,8 @@ public SingularityLifecycleManaged(
SingularityGraphiteReporter graphiteReporter,
ExecutorIdGenerator executorIdGenerator,
Set<SingularityLeaderOnlyPoller> leaderOnlyPollers,
SingularityConfiguration configuration
SingularityConfiguration configuration,
SingularityPreJettyLifecycle preJettyLifecycle
) {
this.cachedThreadPoolFactory = cachedThreadPoolFactory;
this.scheduledExecutorServiceFactory = scheduledExecutorServiceFactory;
Expand All @@ -73,6 +76,7 @@ public SingularityLifecycleManaged(
this.executorIdGenerator = executorIdGenerator;
this.leaderOnlyPollers = leaderOnlyPollers;
this.readOnly = configuration.isReadOnlyInstance();
this.preJettyLifecycle = preJettyLifecycle;
}

@Override
Expand All @@ -90,14 +94,15 @@ public void start() throws Exception {
if (startLeaderPollers()) {
leaderOnlyPollers.forEach(SingularityLeaderOnlyPoller::start);
}
preJettyLifecycle.registerShutdownHook(this::preJettyStop);
} else {
LOG.info("Already started, will not call again");
}
}

@Override
public void stop() throws Exception {
if (!stopped.getAndSet(true)) {
// This will run before the application stops listening on its designated port
private void preJettyStop() {
if (!preJettyStopped.getAndSet(true)) {
if (startLeaderPollers()) {
stopNewPolls(); // Marks a boolean that will short circuit new runs of any leader only pollers
}
Expand All @@ -106,6 +111,14 @@ public void stop() throws Exception {
stopHttpClients(); // Stops any additional async callbacks in healthcheck/new task check
stopExecutors(); // Shuts down the executors for pollers and async semaphores
stopLeaderLatch(); // let go of leadership
} else {
LOG.info("Already stopped pre-jetty operations");
}
}

@Override
public void stop() throws Exception {
if (!stopped.getAndSet(true)) {
stopCurator(); // disconnect from zk
stopGraphiteReporter();
} else {
Expand Down Expand Up @@ -171,10 +184,20 @@ private void stopGraphiteReporter() {

private void stopLeaderLatch() {
try {
String thisId = leaderLatch.getId();
if (!readOnly) {
LOG.info("Stopping leader latch");
leaderLatch.close();
}
// Wait until leader change has actually propagated
long start = System.currentTimeMillis();
while (
thisId.equals(leaderLatch.getLeader().getId()) &&
System.currentTimeMillis() - start < 15000
) {
LOG.warn("Instance still has leadership, waiting and checking again");
Thread.sleep(1000);
}
} catch (Throwable t) {
LOG.warn("Could not stop leader latch ({})}", t.getMessage());
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package com.hubspot.singularity.managed;

import com.google.inject.Inject;
import com.google.inject.Singleton;
import io.dropwizard.lifecycle.Managed;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import org.eclipse.jetty.util.MultiException;
import org.eclipse.jetty.util.component.AbstractLifeCycle;
import org.eclipse.jetty.util.thread.ShutdownThread;

/**
* This class runs shutdown hooks before the default Jetty server shutdown hook / lifecycle management.
* Actions will be run serially - any exceptions will be swallowed until after all have run.
*
* This class does not need to be created after the Jetty server.
*/
@Singleton
public class SingularityPreJettyLifecycle extends AbstractLifeCycle implements Managed {
private final List<Runnable> hooks;
private final AtomicBoolean hasRun;

@Inject
public SingularityPreJettyLifecycle() {
this.hooks = new CopyOnWriteArrayList<>();
this.hasRun = new AtomicBoolean();
}

@Override
protected void doStart() throws Exception {
// Registering at index 0 ensures this hook is run first during shutdown.
ShutdownThread.register(0, this);
}

@Override
protected synchronized void doStop() throws Exception {
if (hasRun.get()) {
return;
}

MultiException exceptions = new MultiException();
for (Runnable hook : hooks) {
try {
hook.run();
} catch (Exception e) {
exceptions.add(e);
}
}
hasRun.set(true);
exceptions.ifExceptionThrow();
}

public boolean registerShutdownHook(Runnable runnable) {
return hooks.add(runnable);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ public SingularityLifecycleManagedTest(
SingularityGraphiteReporter graphiteReporter,
ExecutorIdGenerator executorIdGenerator,
Set<SingularityLeaderOnlyPoller> leaderOnlyPollers,
SingularityConfiguration configuration
SingularityConfiguration configuration,
SingularityPreJettyLifecycle preJettyLifecycle
) {
super(
cachedThreadPoolFactory,
Expand All @@ -41,7 +42,8 @@ public SingularityLifecycleManagedTest(
graphiteReporter,
executorIdGenerator,
leaderOnlyPollers,
configuration
configuration,
preJettyLifecycle
);
}

Expand Down