Skip to content

Commit a48ca49

Browse files
authored
KAFKA-15643: Fix error logged when unload is called on a broker that was never a coordinator. (apache#14657)
When a new leader is elected for a __consumer_offset partition, the followers are notified to unload the state. However, only the former leader is aware of it. The remaining follower prints out the following error: `ERROR [GroupCoordinator id=1] Execution of UnloadCoordinator(tp=__consumer_offsets-1, epoch=0) failed due to This is not the correct coordinator.. (org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime)` The error is actually correct and expected when in the remaining follower case, however this could be misleading. This patch handles the case gracefully. Reviewers: David Jacot <djacot@confluent.io>
1 parent 57fd8f4 commit a48ca49

File tree

2 files changed

+60
-11
lines changed

2 files changed

+60
-11
lines changed

group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java

Lines changed: 21 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1359,18 +1359,28 @@ public void scheduleUnloadOperation(
13591359
log.info("Scheduling unloading of metadata for {} with epoch {}", tp, partitionEpoch);
13601360

13611361
scheduleInternalOperation("UnloadCoordinator(tp=" + tp + ", epoch=" + partitionEpoch + ")", tp, () -> {
1362-
withContextOrThrow(tp, context -> {
1363-
if (context.epoch < partitionEpoch) {
1364-
log.info("Started unloading metadata for {} with epoch {}.", tp, partitionEpoch);
1365-
context.transitionTo(CoordinatorState.CLOSED);
1366-
coordinators.remove(tp, context);
1367-
log.info("Finished unloading metadata for {} with epoch {}.", tp, partitionEpoch);
1368-
} else {
1369-
log.info("Ignored unloading metadata for {} in epoch {} since current epoch is {}.",
1370-
tp, partitionEpoch, context.epoch
1371-
);
1362+
CoordinatorContext context = coordinators.get(tp);
1363+
if (context != null) {
1364+
try {
1365+
context.lock.lock();
1366+
if (context.epoch < partitionEpoch) {
1367+
log.info("Started unloading metadata for {} with epoch {}.", tp, partitionEpoch);
1368+
context.transitionTo(CoordinatorState.CLOSED);
1369+
coordinators.remove(tp, context);
1370+
log.info("Finished unloading metadata for {} with epoch {}.", tp, partitionEpoch);
1371+
} else {
1372+
log.info("Ignored unloading metadata for {} in epoch {} since current epoch is {}.",
1373+
tp, partitionEpoch, context.epoch
1374+
);
1375+
}
1376+
} finally {
1377+
context.lock.unlock();
13721378
}
1373-
});
1379+
} else {
1380+
log.info("Ignored unloading metadata for {} in epoch {} since metadata was never loaded.",
1381+
tp, partitionEpoch
1382+
);
1383+
}
13741384
});
13751385
}
13761386

group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -554,6 +554,45 @@ public void testScheduleUnloading() {
554554
// Getting the coordinator context fails because it no longer exists.
555555
assertThrows(NotCoordinatorException.class, () -> runtime.contextOrThrow(TP));
556556
}
557+
@Test
558+
public void testScheduleUnloadingWhenContextDoesntExist() {
559+
MockTimer timer = new MockTimer();
560+
MockPartitionWriter writer = mock(MockPartitionWriter.class);
561+
MockCoordinatorShardBuilderSupplier supplier = mock(MockCoordinatorShardBuilderSupplier.class);
562+
MockCoordinatorShardBuilder builder = mock(MockCoordinatorShardBuilder.class);
563+
MockCoordinatorShard coordinator = mock(MockCoordinatorShard.class);
564+
565+
CoordinatorRuntime<MockCoordinatorShard, String> runtime =
566+
new CoordinatorRuntime.Builder<MockCoordinatorShard, String>()
567+
.withTime(timer.time())
568+
.withTimer(timer)
569+
.withLoader(new MockCoordinatorLoader())
570+
.withEventProcessor(new DirectEventProcessor())
571+
.withPartitionWriter(writer)
572+
.withCoordinatorShardBuilderSupplier(supplier)
573+
.withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class))
574+
.build();
575+
576+
when(builder.withSnapshotRegistry(any())).thenReturn(builder);
577+
when(builder.withLogContext(any())).thenReturn(builder);
578+
when(builder.withTime(any())).thenReturn(builder);
579+
when(builder.withTimer(any())).thenReturn(builder);
580+
when(builder.withTopicPartition(any())).thenReturn(builder);
581+
when(builder.build()).thenReturn(coordinator);
582+
when(supplier.get()).thenReturn(builder);
583+
584+
// No loading is scheduled. This is to check the case when a follower that was never a coordinator
585+
// is asked to unload its state. The unload event is skipped in this case.
586+
587+
// Schedule the unloading.
588+
runtime.scheduleUnloadOperation(TP, 11);
589+
590+
// Verify that onUnloaded is not called.
591+
verify(coordinator, times(0)).onUnloaded();
592+
593+
// Getting the coordinator context fails because it doesn't exist.
594+
assertThrows(NotCoordinatorException.class, () -> runtime.contextOrThrow(TP));
595+
}
557596

558597
@Test
559598
public void testScheduleUnloadingWithStalePartitionEpoch() {

0 commit comments

Comments
 (0)