Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

Expand All @@ -20,6 +23,7 @@

import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.inject.Inject;
import com.hubspot.baragon.models.BaragonRequestState;
import com.hubspot.mesos.JavaUtils;
Expand Down Expand Up @@ -52,10 +56,12 @@
import com.hubspot.singularity.SingularityUpdatePendingDeployRequest;
import com.hubspot.singularity.TaskCleanupType;
import com.hubspot.singularity.api.SingularityRunNowRequest;
import com.hubspot.singularity.async.CompletableFutures;
import com.hubspot.singularity.config.SingularityConfiguration;
import com.hubspot.singularity.data.DeployManager;
import com.hubspot.singularity.data.RequestManager;
import com.hubspot.singularity.data.TaskManager;
import com.hubspot.singularity.data.usage.UsageManager;
import com.hubspot.singularity.expiring.SingularityExpiringPause;
import com.hubspot.singularity.expiring.SingularityExpiringScale;
import com.hubspot.singularity.hooks.LoadBalancerClient;
Expand All @@ -74,17 +80,21 @@ public class SingularityDeployChecker {
private final SingularityConfiguration configuration;
private final LoadBalancerClient lbClient;
private final SingularitySchedulerLock lock;
private final UsageManager usageManager;
private final ExecutorService deployCheckExecuotor;

@Inject
public SingularityDeployChecker(DeployManager deployManager, SingularityDeployHealthHelper deployHealthHelper, LoadBalancerClient lbClient, RequestManager requestManager, TaskManager taskManager,
SingularityConfiguration configuration, SingularitySchedulerLock lock) {
SingularityConfiguration configuration, SingularitySchedulerLock lock, UsageManager usageManager) {
this.configuration = configuration;
this.lbClient = lbClient;
this.deployHealthHelper = deployHealthHelper;
this.requestManager = requestManager;
this.deployManager = deployManager;
this.taskManager = taskManager;
this.lock = lock;
this.usageManager = usageManager;
this.deployCheckExecuotor = Executors.newCachedThreadPool(new ThreadFactoryBuilder().setNameFormat("deploy-checker-%d").build());
}

public int checkDeploys() {
Expand All @@ -99,11 +109,16 @@ public int checkDeploys() {
final Map<SingularityPendingDeploy, SingularityDeployKey> pendingDeployToKey = SingularityDeployKey.fromPendingDeploys(pendingDeploys);
final Map<SingularityDeployKey, SingularityDeploy> deployKeyToDeploy = deployManager.getDeploysForKeys(pendingDeployToKey.values());

pendingDeploys.parallelStream().forEach((pendingDeploy) -> {
lock.runWithRequestLock(() -> {
checkDeploy(pendingDeploy, cancelDeploys, pendingDeployToKey, deployKeyToDeploy, updateRequests);
}, pendingDeploy.getDeployMarker().getRequestId(), getClass().getSimpleName());
});
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 for switching away from the common forkjoin pool for this

CompletableFutures.allOf(pendingDeploys.stream()
.map((pendingDeploy) ->
CompletableFuture.runAsync(() ->
lock.runWithRequestLock(
() -> checkDeploy(pendingDeploy, cancelDeploys, pendingDeployToKey, deployKeyToDeploy, updateRequests),
pendingDeploy.getDeployMarker().getRequestId(),
getClass().getSimpleName()),
deployCheckExecuotor))
.collect(Collectors.toList()))
.join();

cancelDeploys.forEach(deployManager::deleteCancelDeployRequest);
updateRequests.forEach(deployManager::deleteUpdatePendingDeployRequest);
Expand Down Expand Up @@ -337,6 +352,13 @@ private void finishDeploy(SingularityRequestWithState requestWithState, Optional
deploy.isPresent() ? deploy.get().getUser() : Optional.empty(),
Optional.empty());
}
// Clear utilization since a new deploy will update usage patterns
// do this async so sql isn't on the main scheduling path for deploys
CompletableFuture.runAsync(() -> usageManager.deleteRequestUtilization(request.getId()), deployCheckExecuotor)
.exceptionally((t) -> {
LOG.error("Could not clear usage data after new deploy", t);
return null;
});
}

deployManager.saveDeployResult(pendingDeploy.getDeployMarker(), deploy, deployResult);
Expand Down