Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,9 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
/** All currently registered TaskExecutors with their framework specific worker information. */
private final Map<ResourceID, WorkerRegistration<WorkerType>> taskExecutors;

/** Index for fast lookup of WorkerRegistration by InstanceID. */
private final Map<InstanceID, WorkerRegistration<WorkerType>> taskExecutorsByInstanceId;

/** Ongoing registration of TaskExecutors per resource ID. */
private final Map<ResourceID, CompletableFuture<TaskExecutorGateway>>
taskExecutorGatewayFutures;
Expand Down Expand Up @@ -208,6 +211,7 @@ public ResourceManager(
this.jobManagerRegistrations = CollectionUtil.newHashMapWithExpectedSize(4);
this.jmResourceIdRegistrations = CollectionUtil.newHashMapWithExpectedSize(4);
this.taskExecutors = CollectionUtil.newHashMapWithExpectedSize(8);
this.taskExecutorsByInstanceId = CollectionUtil.newHashMapWithExpectedSize(8);
this.taskExecutorGatewayFutures = CollectionUtil.newHashMapWithExpectedSize(8);
this.blocklistHandler =
blocklistHandlerFactory.create(
Expand Down Expand Up @@ -1047,6 +1051,7 @@ private RegistrationResponse registerTaskExecutorInternal(
WorkerRegistration<WorkerType> oldRegistration =
taskExecutors.remove(taskExecutorResourceId);
if (oldRegistration != null) {
taskExecutorsByInstanceId.remove(oldRegistration.getInstanceID());
// TODO :: suggest old taskExecutor to stop itself
log.debug(
"Replacing old registration of TaskExecutor {}.",
Expand Down Expand Up @@ -1093,6 +1098,7 @@ private RegistrationResponse registerTaskExecutorInternal(
taskExecutorResourceId.getStringWithMetadata(),
taskExecutorAddress);
taskExecutors.put(taskExecutorResourceId, registration);
taskExecutorsByInstanceId.put(registration.getInstanceID(), registration);

taskManagerHeartbeatManager.monitorTarget(
taskExecutorResourceId, new TaskExecutorHeartbeatSender(taskExecutorGateway));
Expand All @@ -1114,6 +1120,7 @@ private void clearStateInternal() {
jobManagerRegistrations.clear();
jmResourceIdRegistrations.clear();
taskExecutors.clear();
taskExecutorsByInstanceId.clear();

try {
jobLeaderIdService.clear();
Expand Down Expand Up @@ -1180,6 +1187,7 @@ protected Optional<WorkerType> closeTaskManagerConnection(
WorkerRegistration<WorkerType> workerRegistration = taskExecutors.remove(resourceID);

if (workerRegistration != null) {
taskExecutorsByInstanceId.remove(workerRegistration.getInstanceID());
log.info(
"Closing TaskExecutor connection {} because: {}",
resourceID.getStringWithMetadata(),
Expand Down Expand Up @@ -1254,17 +1262,8 @@ public Optional<InstanceID> getInstanceIdByResourceId(ResourceID resourceID) {
}

protected WorkerType getWorkerByInstanceId(InstanceID instanceId) {
WorkerType worker = null;
// TODO: Improve performance by having an index on the instanceId
for (Map.Entry<ResourceID, WorkerRegistration<WorkerType>> entry :
taskExecutors.entrySet()) {
if (entry.getValue().getInstanceID().equals(instanceId)) {
worker = entry.getValue().getWorker();
break;
}
}

return worker;
WorkerRegistration<WorkerType> registration = taskExecutorsByInstanceId.get(instanceId);
return registration != null ? registration.getWorker() : null;
}

private enum ResourceRequirementHandling {
Expand Down