Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.hubspot.singularity;

import java.util.Collections;
import java.util.List;
import java.util.Objects;

Expand All @@ -14,16 +15,22 @@ public class SingularityTaskIdsByStatus {
private List<SingularityTaskId> notYetHealthy;
private List<SingularityPendingTaskId> pending;
private List<SingularityTaskId> cleaning;
private List<SingularityTaskId> loadBalanced;
private List<SingularityTaskId> killed;

@JsonCreator
public SingularityTaskIdsByStatus(@JsonProperty("healthy") List<SingularityTaskId> healthy,
@JsonProperty("notYetHealthy") List<SingularityTaskId> notYetHealthy,
@JsonProperty("pending") List<SingularityPendingTaskId> pending,
@JsonProperty("cleaning") List<SingularityTaskId> cleaning) {
@JsonProperty("cleaning") List<SingularityTaskId> cleaning,
@JsonProperty("loadBalanced") List<SingularityTaskId> loadBalanced,
@JsonProperty("killed") List<SingularityTaskId> killed) {
this.healthy = healthy;
this.notYetHealthy = notYetHealthy;
this.pending = pending;
this.cleaning = cleaning;
this.loadBalanced = loadBalanced != null ? loadBalanced : Collections.emptyList();
this.killed = killed != null ? killed : Collections.emptyList();
}

@Schema(description = "Active tasks whose healthchecks and load balancer updates (when applicable) have finished successfully")
Expand All @@ -46,24 +53,54 @@ public List<SingularityTaskId> getCleaning() {
return cleaning;
}

@Schema(description = "Tasks that are currently active in the load balancer")
public List<SingularityTaskId> getLoadBalanced() {
return loadBalanced;
}

@Schema(description = "Tasks which have been sent a kill signal")
public List<SingularityTaskId> getKilled() {
return killed;
}

@Override
public boolean equals(Object obj) {
if (this == obj) {
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (obj instanceof SingularityTaskIdsByStatus) {
final SingularityTaskIdsByStatus that = (SingularityTaskIdsByStatus) obj;
return Objects.equals(this.healthy, that.healthy) &&
Objects.equals(this.notYetHealthy, that.notYetHealthy) &&
Objects.equals(this.pending, that.pending) &&
Objects.equals(this.cleaning, that.cleaning);
if (o == null || getClass() != o.getClass()) {
return false;
}

SingularityTaskIdsByStatus that = (SingularityTaskIdsByStatus) o;

if (healthy != null ? !healthy.equals(that.healthy) : that.healthy != null) {
return false;
}
if (notYetHealthy != null ? !notYetHealthy.equals(that.notYetHealthy) : that.notYetHealthy != null) {
return false;
}
if (pending != null ? !pending.equals(that.pending) : that.pending != null) {
return false;
}
if (cleaning != null ? !cleaning.equals(that.cleaning) : that.cleaning != null) {
return false;
}
if (loadBalanced != null ? !loadBalanced.equals(that.loadBalanced) : that.loadBalanced != null) {
return false;
}
return false;
return killed != null ? killed.equals(that.killed) : that.killed == null;
}

@Override
public int hashCode() {
return Objects.hash(healthy, notYetHealthy, pending, cleaning);
int result = healthy != null ? healthy.hashCode() : 0;
result = 31 * result + (notYetHealthy != null ? notYetHealthy.hashCode() : 0);
result = 31 * result + (pending != null ? pending.hashCode() : 0);
result = 31 * result + (cleaning != null ? cleaning.hashCode() : 0);
result = 31 * result + (loadBalanced != null ? loadBalanced.hashCode() : 0);
result = 31 * result + (killed != null ? killed.hashCode() : 0);
return result;
}

@Override
Expand All @@ -73,6 +110,8 @@ public String toString() {
", notYetHealthy=" + notYetHealthy +
", pending=" + pending +
", cleaning=" + cleaning +
", loadBalanced=" + loadBalanced +
", killed=" + killed +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -364,24 +364,6 @@ public List<SingularityTaskId> getAllTaskIds() {
return getChildrenAsIdsForParents("getAllTaskIds", paths, taskIdTranscoder);
}

private List<SingularityTaskId> getTaskIds(String root) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

were these methods unused?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah all unused

return getChildrenAsIds(root, taskIdTranscoder);
}

public List<String> getActiveTaskIdsAsStrings() {
if (leaderCache.active()) {
return leaderCache.getActiveTaskIdsAsStrings();
}

List<String> results = new ArrayList<>();

for (String requestId : getChildren(LAST_ACTIVE_TASK_STATUSES_PATH_ROOT)) {
results.addAll(getChildren(getLastActiveTaskParent(requestId)));
}

return results;
}

public List<SingularityTaskId> getActiveTaskIds() {
return getActiveTaskIds(false);
}
Expand Down Expand Up @@ -836,6 +818,13 @@ public Optional<SingularityLoadBalancerUpdate> getLoadBalancerState(SingularityT
return getData(getLoadBalancerStatePath(taskId, requestType), taskLoadBalancerUpdateTranscoder);
}

public boolean isInLoadBalancer(SingularityTaskId taskId) {
if (exists(getLoadBalancerStatePath(taskId, LoadBalancerRequestType.REMOVE))) {
return false;
}
return exists(getLoadBalancerStatePath(taskId, LoadBalancerRequestType.ADD));
}

public Optional<SingularityPendingTask> getPendingTask(SingularityPendingTaskId pendingTaskId) {
if (leaderCache.active()) {
return leaderCache.getPendingTask(pendingTaskId);
Expand Down Expand Up @@ -1025,6 +1014,13 @@ public SingularityCreateResult saveKilledRecord(SingularityKilledTaskIdRecord ki
return save(getKilledPath(killedTaskIdRecord.getTaskId()), killedTaskIdRecord, killedTaskIdRecordTranscoder);
}

public boolean isKilledTask(SingularityTaskId taskId) {
if (leaderCache.active()) {
return leaderCache.getKilledTaskRecord(taskId).isPresent();
}
return exists(getKilledPath(taskId));
}

public List<SingularityKilledTaskIdRecord> getKilledTaskIdRecords() {
if (leaderCache.active()) {
return leaderCache.getKilledTasks();
Expand Down Expand Up @@ -1183,10 +1179,6 @@ public void purgeStaleRequests(List<String> activeRequestIds, long deleteBeforeT
}
}

public SingularityDeleteResult deleteRequestId(String requestId) {
return delete(getRequestPath(requestId));
}

public long getTaskStatusBytes() {
return countBytes(getChildren(LAST_ACTIVE_TASK_STATUSES_PATH_ROOT));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,6 @@
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.inject.Inject;
import com.google.inject.Singleton;
import com.hubspot.mesos.JavaUtils;
Expand Down Expand Up @@ -41,7 +38,6 @@
import com.hubspot.singularity.data.SingularityValidator;
import com.hubspot.singularity.data.TaskManager;
import com.hubspot.singularity.data.UserManager;
import com.hubspot.singularity.data.history.TaskHistoryHelper;
import com.hubspot.singularity.expiring.SingularityExpiringBounce;
import com.hubspot.singularity.expiring.SingularityExpiringPause;
import com.hubspot.singularity.expiring.SingularityExpiringScale;
Expand All @@ -51,16 +47,13 @@

@Singleton
public class RequestHelper {
private static final Logger LOG = LoggerFactory.getLogger(RequestHelper.class);

private final RequestManager requestManager;
private final SingularityMailer mailer;
private final DeployManager deployManager;
private final SingularityValidator validator;
private final UserManager userManager;
private final TaskManager taskManager;
private final SingularityDeployHealthHelper deployHealthHelper;
private final TaskHistoryHelper taskHistoryHelper;

@Inject
public RequestHelper(RequestManager requestManager,
Expand All @@ -69,16 +62,14 @@ public RequestHelper(RequestManager requestManager,
SingularityValidator validator,
UserManager userManager,
TaskManager taskManager,
SingularityDeployHealthHelper deployHealthHelper,
TaskHistoryHelper taskHistoryHelper) {
SingularityDeployHealthHelper deployHealthHelper) {
this.requestManager = requestManager;
this.mailer = mailer;
this.deployManager = deployManager;
this.validator = validator;
this.userManager = userManager;
this.taskManager = taskManager;
this.deployHealthHelper = deployHealthHelper;
this.taskHistoryHelper = taskHistoryHelper;
}

public long unpause(SingularityRequest request, Optional<String> user, Optional<String> message, Optional<Boolean> skipHealthchecks) {
Expand Down Expand Up @@ -237,7 +228,6 @@ public List<SingularityRequestParent> fillDataForRequestsAndFilter(List<Singular
Long lastActionTime = null;
if (includeFullRequestData) {
lastActionTime = getLastActionTimeForRequest(
request.getRequest(),
requestIdToLastHistory.getOrDefault(request.getRequest().getId(), Optional.empty()),
Optional.ofNullable(deployStates.get(request.getRequest().getId()))
);
Expand Down Expand Up @@ -309,6 +299,7 @@ private Optional<SingularityTaskIdsByStatus> getTaskIdsByStatusForRequest(Singul
activeTaskIds.removeAll(cleaningTaskIds);

List<SingularityTaskId> healthyTaskIds = new ArrayList<>();
List<SingularityTaskId> killedTaskIds = new ArrayList<>();
List<SingularityTaskId> notYetHealthyTaskIds = new ArrayList<>();
Map<String, List<SingularityTaskId>> taskIdsByDeployId = activeTaskIds.stream().collect(Collectors.groupingBy(SingularityTaskId::getDeployId));
for (Map.Entry<String, List<SingularityTaskId>> entry : taskIdsByDeployId.entrySet()) {
Expand All @@ -319,15 +310,27 @@ private Optional<SingularityTaskIdsByStatus> getTaskIdsByStatusForRequest(Singul
entry.getValue(),
pendingDeploy.isPresent() && pendingDeploy.get().getDeployMarker().getDeployId().equals(entry.getKey()));
for (SingularityTaskId taskId : entry.getValue()) {
if (healthyTasksIdsForDeploy.contains(taskId)) {
if (taskManager.isKilledTask(taskId)) {
killedTaskIds.add(taskId);
} else if (healthyTasksIdsForDeploy.contains(taskId)) {
healthyTaskIds.add(taskId);
} else {
notYetHealthyTaskIds.add(taskId);
}
}
}

return Optional.of(new SingularityTaskIdsByStatus(healthyTaskIds, notYetHealthyTaskIds, pendingTaskIds, cleaningTaskIds));
List<SingularityTaskId> loadBalanced = new ArrayList<>();
if (requestWithState.getRequest().isLoadBalanced()) {
healthyTaskIds.stream()
.filter(taskManager::isInLoadBalancer)
.forEach(loadBalanced::add);
cleaningTaskIds.stream()
.filter(taskManager::isInLoadBalancer)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does the task manager automatically update itself when tasks go in and out of the load balancer?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, the isInLoadBalancer here is a live call to fetch the state (cached on the leader in memory)

.forEach(loadBalanced::add);
}

return Optional.of(new SingularityTaskIdsByStatus(healthyTaskIds, notYetHealthyTaskIds, pendingTaskIds, cleaningTaskIds, loadBalanced, killedTaskIds));
}

private boolean userAssociatedWithDeploy(Optional<SingularityRequestDeployState> deployState, SingularityUser user) {
Expand All @@ -345,7 +348,7 @@ private boolean userModifiedRequestLast(Optional<SingularityRequestHistory> last
return lastHistory.isPresent() && userMatches(lastHistory.get().getUser(), user);
}

private long getLastActionTimeForRequest(SingularityRequest request, Optional<SingularityRequestHistory> lastHistory, Optional<SingularityRequestDeployState> deployState) {
private long getLastActionTimeForRequest(Optional<SingularityRequestHistory> lastHistory, Optional<SingularityRequestDeployState> deployState) {
long lastUpdate = 0;
if (lastHistory.isPresent()) {
lastUpdate = lastHistory.get().getCreatedAt();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,10 @@ public List<SingularityKilledTaskIdRecord> getKilledTasks() {
return new ArrayList<>(killedTasks.values());
}

public Optional<SingularityKilledTaskIdRecord> getKilledTaskRecord(SingularityTaskId taskId) {
return Optional.ofNullable(killedTasks.get(taskId));
}

public void addKilledTask(SingularityKilledTaskIdRecord killedTask) {
if (!active) {
LOG.warn("addKilledTask {}, but not active", killedTask.getTaskId().getId());
Expand Down
19 changes: 16 additions & 3 deletions SingularityUI/app/components/requestDetail/ActiveTasksTable.jsx
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import Utils from '../../utils';
import UITable from '../common/table/UITable';
import {
Health,
LoadBalancerState,
InstanceNumberWithHostname,
Host,
LastTaskState,
Expand All @@ -25,7 +26,7 @@ import { FetchTaskHistoryForRequest } from '../../actions/api/history';

import TaskStateBreakdown from './TaskStateBreakdown';

const ActiveTasksTable = ({request, requestId, tasksAPI, healthyTaskIds, cleaningTaskIds, fetchTaskHistoryForRequest}) => {
const ActiveTasksTable = ({request, requestId, tasksAPI, healthyTaskIds, cleaningTaskIds,loadBalancedTaskIds, killedTaskIds, fetchTaskHistoryForRequest}) => {
const tasks = tasksAPI ? tasksAPI.data : [];
const emptyTableMessage = (Utils.api.isFirstLoad(tasksAPI)
? <p>Loading...</p>
Expand All @@ -49,12 +50,15 @@ const ActiveTasksTable = ({request, requestId, tasksAPI, healthyTaskIds, cleanin
health = 'healthy';
} else if (_.contains(cleaningTaskIds, task.taskId.id)) {
health = 'cleaning';
} else if (_.contains(killedTaskIds, task.taskId.id)) {
health = 'terminating'
} else {
health = 'not yet healthy'
}
return {
...task,
health: health
health: health,
activeInLb: _.contains(loadBalancedTaskIds, task.taskId.id)
}
});
const title = <span>Running instances {maybeAggregateTailButton}</span>;
Expand All @@ -69,6 +73,7 @@ const ActiveTasksTable = ({request, requestId, tasksAPI, healthyTaskIds, cleanin
triggerOnDataSizeChange={fetchTaskHistoryForRequest}
>
{Health}
{request.request.loadBalanced && LoadBalancerState}
{InstanceNumberWithHostname}
{LastTaskState}
{DeployId}
Expand All @@ -86,6 +91,8 @@ ActiveTasksTable.propTypes = {
tasksAPI: PropTypes.object.isRequired,
healthyTaskIds: PropTypes.array.isRequired,
cleaningTaskIds: PropTypes.array.isRequired,
loadBalancedTaskIds: PropTypes.array.isRequired,
killedTaskIds: PropTypes.array.isRequired,
fetchTaskHistoryForRequest: PropTypes.func.isRequired
};

Expand All @@ -100,7 +107,13 @@ const mapStateToProps = (state, ownProps) => {
healthyTaskIds: _.map(Utils.maybe(request, ['taskIds', 'healthy'], []), (task) => {
return task.id;
}),
cleaningTaskIds: _.map(Utils.maybe(request, ['data', 'taskIds', 'cleaning'], []), (task) => {
cleaningTaskIds: _.map(Utils.maybe(request, ['taskIds', 'cleaning'], []), (task) => {
return task.id;
}),
loadBalancedTaskIds: _.map(Utils.maybe(request, ['taskIds', 'loadBalanced'], []), (task) => {
return task.id;
}),
killedTaskIds: _.map(Utils.maybe(request, ['taskIds', 'killed'], []), (task) => {
return task.id;
})
}};
Expand Down
Loading