Skip to content

Commit cfdf001

Browse files
paul8263xintongsong
authored andcommitted
[FLINK-22618][runtime] Fix incorrect free resource metrics of task managers
This closes #15887
1 parent c32022b commit cfdf001

File tree

2 files changed

+82
-1
lines changed

2 files changed

+82
-1
lines changed

flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManager.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -644,7 +644,7 @@ public ResourceProfile getFreeResource() {
644644

645645
@Override
646646
public ResourceProfile getFreeResourceOf(InstanceID instanceID) {
647-
return taskManagerTracker.getRegisteredResourceOf(instanceID);
647+
return taskManagerTracker.getFreeResourceOf(instanceID);
648648
}
649649

650650
@Override

flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerTest.java

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import org.apache.flink.runtime.slots.ResourceRequirement;
3333
import org.apache.flink.runtime.slots.ResourceRequirements;
3434
import org.apache.flink.runtime.taskexecutor.SlotReport;
35+
import org.apache.flink.runtime.taskexecutor.SlotStatus;
3536
import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
3637
import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
3738

@@ -737,6 +738,86 @@ public void testMaxTotalResourceCpuExceeded() throws Exception {
737738
testMaxTotalResourceExceededRegisterResource(maxTotalResourceSetter);
738739
}
739740

741+
@Test
742+
public void testGetResourceOverview() throws Exception {
743+
final TaskExecutorConnection taskExecutorConnection1 = createTaskExecutorConnection();
744+
final TaskExecutorConnection taskExecutorConnection2 = createTaskExecutorConnection();
745+
final ResourceID resourceId1 = ResourceID.generate();
746+
final ResourceID resourceId2 = ResourceID.generate();
747+
748+
final SlotID slotId1 = new SlotID(resourceId1, 0);
749+
final SlotID slotId2 = new SlotID(resourceId2, 0);
750+
final ResourceProfile resourceProfile1 = ResourceProfile.fromResources(1, 10);
751+
final ResourceProfile resourceProfile2 = ResourceProfile.fromResources(2, 20);
752+
final SlotStatus slotStatus1 =
753+
new SlotStatus(slotId1, resourceProfile1, new JobID(), new AllocationID());
754+
final SlotStatus slotStatus2 =
755+
new SlotStatus(slotId2, resourceProfile2, new JobID(), new AllocationID());
756+
final SlotReport slotReport1 = new SlotReport(slotStatus1);
757+
final SlotReport slotReport2 = new SlotReport(slotStatus2);
758+
759+
new Context() {
760+
{
761+
runTest(
762+
() -> {
763+
final CompletableFuture<Boolean> registerTaskManagerFuture1 =
764+
new CompletableFuture<>();
765+
final CompletableFuture<Boolean> registerTaskManagerFuture2 =
766+
new CompletableFuture<>();
767+
runInMainThread(
768+
() -> {
769+
registerTaskManagerFuture1.complete(
770+
getSlotManager()
771+
.registerTaskManager(
772+
taskExecutorConnection1,
773+
slotReport1,
774+
resourceProfile1.multiply(2),
775+
resourceProfile1));
776+
registerTaskManagerFuture2.complete(
777+
getSlotManager()
778+
.registerTaskManager(
779+
taskExecutorConnection2,
780+
slotReport2,
781+
resourceProfile2.multiply(2),
782+
resourceProfile2));
783+
});
784+
assertThat(
785+
assertFutureCompleteAndReturn(registerTaskManagerFuture1),
786+
is(true));
787+
assertThat(
788+
assertFutureCompleteAndReturn(registerTaskManagerFuture2),
789+
is(true));
790+
assertThat(
791+
getSlotManager().getFreeResource(),
792+
equalTo(resourceProfile1.merge(resourceProfile2)));
793+
assertThat(
794+
getSlotManager()
795+
.getFreeResourceOf(
796+
taskExecutorConnection1.getInstanceID()),
797+
equalTo(resourceProfile1));
798+
assertThat(
799+
getSlotManager()
800+
.getFreeResourceOf(
801+
taskExecutorConnection2.getInstanceID()),
802+
equalTo(resourceProfile2));
803+
assertThat(
804+
getSlotManager().getRegisteredResource(),
805+
equalTo(resourceProfile1.merge(resourceProfile2).multiply(2)));
806+
assertThat(
807+
getSlotManager()
808+
.getRegisteredResourceOf(
809+
taskExecutorConnection1.getInstanceID()),
810+
equalTo(resourceProfile1.multiply(2)));
811+
assertThat(
812+
getSlotManager()
813+
.getRegisteredResourceOf(
814+
taskExecutorConnection2.getInstanceID()),
815+
equalTo(resourceProfile2.multiply(2)));
816+
});
817+
}
818+
};
819+
}
820+
740821
@Test
741822
public void testMaxTotalResourceMemoryExceeded() throws Exception {
742823
Consumer<SlotManagerConfigurationBuilder> maxTotalResourceSetter =

0 commit comments

Comments
 (0)