Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -445,6 +445,10 @@ public SingularityCreateResult savePendingTask(SingularityPendingTask task) {
return save(pendingPath, task, pendingTaskTranscoder);
}

public List<String> getRequestIdsInTaskHistory() {
return getChildren(HISTORY_PATH_ROOT);
}

public List<SingularityTaskId> getAllTaskIds() {
final List<String> requestIds = getChildren(HISTORY_PATH_ROOT);
final List<String> paths = Lists.newArrayListWithCapacity(requestIds.size());
Expand Down Expand Up @@ -652,7 +656,7 @@ public Map<SingularityTaskId, List<SingularityTaskHistoryUpdate>> getTaskHistory
}

public Map<SingularityTaskId, List<SingularityTaskHistoryUpdate>> getAllTaskHistoryUpdates() {
return getTaskHistoryUpdates(getAllTaskIds());
return getTaskHistoryUpdates(getActiveTaskIds());
}

public int getNumNonstartupHealthchecks(SingularityTaskId taskId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,8 @@
import com.hubspot.singularity.config.SingularityConfiguration;
import com.hubspot.singularity.data.DeployManager;
import com.hubspot.singularity.data.TaskManager;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -61,46 +58,48 @@ public void runActionOnPoll() {
LOG.info("Checking inactive task ids for task history persistence");

final long start = System.currentTimeMillis();
Map<String, List<SingularityTaskId>> inactiveTaskIdsByRequest = getInactiveTaskIdsByRequest();
for (Map.Entry<String, List<SingularityTaskId>> entry : inactiveTaskIdsByRequest.entrySet()) {
int forRequest = 0;
int transferred = 0;
for (SingularityTaskId taskId : entry.getValue()) {
if (moveToHistoryOrCheckForPurge(taskId, forRequest)) {
LOG.debug("Transferred task {}", taskId);
transferred++;
for (String requestId : taskManager.getRequestIdsInTaskHistory()) {
try {
LOG.info("Checking request {}", requestId);
List<SingularityTaskId> taskIds = taskManager.getTaskIdsForRequest(requestId);
taskIds.removeAll(taskManager.getActiveTaskIdsForRequest(requestId));
taskIds.removeAll(taskManager.getLBCleanupTasks());
List<SingularityPendingDeploy> pendingDeploys = deployManager.getPendingDeploys();
taskIds =
taskIds
.stream()
.filter(
taskId ->
!isPartOfPendingDeploy(pendingDeploys, taskId) &&
!couldReturnWithRecoveredAgent(taskId)
)
.sorted(SingularityTaskId.STARTED_AT_COMPARATOR_DESC)
.collect(Collectors.toList());
int forRequest = 0;
int transferred = 0;
for (SingularityTaskId taskId : taskIds) {
if (moveToHistoryOrCheckForPurge(taskId, forRequest)) {
LOG.debug("Transferred task {}", taskId);
transferred++;
}

forRequest++;
}

forRequest++;
LOG.info(
"Transferred {} out of {} inactive task ids in {}",
transferred,
taskIds.size(),
JavaUtils.duration(start)
);
} catch (Exception e) {
LOG.error("Could not persist", e);
}
LOG.info(
"Transferred {} out of {} inactive task ids in {}",
transferred,
entry.getValue().size(),
JavaUtils.duration(start)
);
}
} finally {
persisterLock.unlock();
}
}

private Map<String, List<SingularityTaskId>> getInactiveTaskIdsByRequest() {
final Set<SingularityTaskId> taskIds = new HashSet<>(taskManager.getAllTaskIds());
taskIds.removeAll(taskManager.getActiveTaskIds());
taskIds.removeAll(taskManager.getLBCleanupTasks());
List<SingularityPendingDeploy> pendingDeploys = deployManager.getPendingDeploys();
return taskIds
.stream()
.filter(
taskId ->
!isPartOfPendingDeploy(pendingDeploys, taskId) &&
!couldReturnWithRecoveredAgent(taskId)
)
.sorted(SingularityTaskId.STARTED_AT_COMPARATOR_DESC)
.collect(Collectors.groupingBy(SingularityTaskId::getRequestId));
}

private boolean isPartOfPendingDeploy(
List<SingularityPendingDeploy> pendingDeploys,
SingularityTaskId taskId
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,10 @@
import com.hubspot.singularity.config.ApiPaths;
import com.hubspot.singularity.config.SingularityConfiguration;
import com.hubspot.singularity.data.history.HistoryManager;
import com.hubspot.singularity.data.history.SingularityDeployHistoryPersister;
import com.hubspot.singularity.data.history.SingularityHistoryPurger;
import com.hubspot.singularity.data.history.SingularityRequestHistoryPersister;
import com.hubspot.singularity.data.history.SingularityTaskHistoryPersister;
import com.hubspot.singularity.mesos.SingularityMesosScheduler;
import com.hubspot.singularity.scheduler.SingularityTaskReconciliation;
import io.swagger.v3.oas.annotations.Operation;
Expand All @@ -19,6 +22,7 @@
import io.swagger.v3.oas.annotations.tags.Tag;
import io.swagger.v3.oas.annotations.tags.Tags;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
Expand All @@ -39,6 +43,9 @@ public class TestResource {
private final SingularityTaskReconciliation taskReconciliation;
private final SingularityHistoryPurger historyPurger;
private final HistoryManager historyManager;
private final SingularityTaskHistoryPersister taskHistoryPersister;
private final SingularityDeployHistoryPersister deployHistoryPersister;
private final SingularityRequestHistoryPersister requestHistoryPersister;

@Inject
public TestResource(
Expand All @@ -48,7 +55,10 @@ public TestResource(
final SingularityMesosScheduler scheduler,
SingularityTaskReconciliation taskReconciliation,
SingularityHistoryPurger historyPurger,
HistoryManager historyManager
HistoryManager historyManager,
SingularityTaskHistoryPersister taskHistoryPersister,
SingularityDeployHistoryPersister deployHistoryPersister,
SingularityRequestHistoryPersister requestHistoryPersister
) {
this.configuration = configuration;
this.managed = managed;
Expand All @@ -57,6 +67,9 @@ public TestResource(
this.taskReconciliation = taskReconciliation;
this.historyPurger = historyPurger;
this.historyManager = historyManager;
this.taskHistoryPersister = taskHistoryPersister;
this.deployHistoryPersister = deployHistoryPersister;
this.requestHistoryPersister = requestHistoryPersister;
}

@POST
Expand Down Expand Up @@ -306,4 +319,61 @@ public void reconnectMesos() {
);
scheduler.reconnectMesos();
}

@POST
@Path("/persist-task-history")
@Operation(
summary = "Trigger a task history persister run",
responses = {
@ApiResponse(
responseCode = "403",
description = "Test resource calls are currently not enabled, set `allowTestResourceCalls` to `true` in config yaml to enable"
)
}
)
public void persistTaskHistory() {
checkForbidden(
configuration.isAllowTestResourceCalls(),
"Test resource calls are disabled (set isAllowTestResourceCalls to true in configuration)"
);
CompletableFuture.runAsync(taskHistoryPersister::runActionOnPoll);
}

@POST
@Path("/persist-deploy-history")
@Operation(
summary = "Trigger a deploy history persister run",
responses = {
@ApiResponse(
responseCode = "403",
description = "Test resource calls are currently not enabled, set `allowTestResourceCalls` to `true` in config yaml to enable"
)
}
)
public void persistDeployHistory() {
checkForbidden(
configuration.isAllowTestResourceCalls(),
"Test resource calls are disabled (set isAllowTestResourceCalls to true in configuration)"
);
CompletableFuture.runAsync(deployHistoryPersister::runActionOnPoll);
}

@POST
@Path("/persist-request-history")
@Operation(
summary = "Trigger a request history persister run",
responses = {
@ApiResponse(
responseCode = "403",
description = "Test resource calls are currently not enabled, set `allowTestResourceCalls` to `true` in config yaml to enable"
)
}
)
public void persistRequestHistory() {
checkForbidden(
configuration.isAllowTestResourceCalls(),
"Test resource calls are disabled (set isAllowTestResourceCalls to true in configuration)"
);
CompletableFuture.runAsync(requestHistoryPersister::runActionOnPoll);
}
}