Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -597,12 +597,8 @@ Collection<SingularityOfferHolder> checkOffers(
scorePerOffer.get(bestOffer.getAgentId()),
bestOffer.getSanitizedHost()
);
SingularityMesosTaskHolder taskHolder = acceptTask(
bestOffer,
taskRequestHolder
);
acceptTask(bestOffer, taskRequestHolder);
tasksScheduled.getAndIncrement();
bestOffer.addMatchedTask(taskHolder);
updateAgentUsageScores(
taskRequestHolder,
currentUsagesById,
Expand Down Expand Up @@ -999,7 +995,11 @@ private double calculateScore(
return score;
}

private SingularityMesosTaskHolder acceptTask(
// This method is synchronized to avoid resource reuse within a single offer.
// Decisions about resources to use, specifically ports are made during the buildTask call, however
// these resources aren't subtracted from the offer until the addMatchedTask call, making this method
// not thread safe
private synchronized void acceptTask(
SingularityOfferHolder offerHolder,
SingularityTaskRequestHolder taskRequestHolder
) {
Expand All @@ -1023,7 +1023,7 @@ private SingularityMesosTaskHolder acceptTask(
);

taskManager.createTaskAndDeletePendingTask(zkTask);
return taskHolder;
offerHolder.addMatchedTask(taskHolder);
}

private boolean isTooManyInstancesForRequest(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ public class SingularityMesosStatusUpdateHandler {
MesosTaskState.TASK_STARTING,
MesosTaskState.TASK_RUNNING
);
private static final String RESOURCE_MISMATCH_ERR =
"required by task and its executor is more than available";

private final TaskManager taskManager;
private final DeployManager deployManager;
Expand Down Expand Up @@ -328,6 +330,23 @@ private StatusUpdateResult unsafeProcessStatusUpdate(
);
final ExtendedTaskState taskState = MesosUtils.fromTaskState(status.getState());

if (
taskState == ExtendedTaskState.TASK_ERROR &&
status.getMessage() != null &&
status.getMessage().contains(RESOURCE_MISMATCH_ERR)
) {
LOG.error(
"Possible duplicate resource allocation",
new IllegalStateException(
String.format(
"Duplicate resource allocation for %s: %s",
taskId,
status.getMessage()
)
)
);
}

if (
isRecoveryStatusUpdate(
previousTaskStatusHolder,
Expand Down