From 53d30940ed84638a59da066397234143407df907 Mon Sep 17 00:00:00 2001 From: PoAn Yang Date: Wed, 19 Feb 2025 21:40:30 +0800 Subject: [PATCH] KAFKA-15900, KAFKA-18310: fix flaky test testOutdatedCoordinatorAssignment and AbstractCoordinatorTest Signed-off-by: PoAn Yang --- .../internals/AbstractCoordinator.java | 70 ++++++++----------- .../internals/AbstractHeartbeatThread.java | 64 +++++++++++++++++ .../internals/ClassicKafkaConsumer.java | 7 +- .../internals/ConsumerCoordinator.java | 7 +- .../internals/AbstractCoordinatorTest.java | 55 ++++++--------- .../internals/ConsumerCoordinatorTest.java | 32 +++++++-- .../internals/MockHeartbeatThread.java | 23 ++++++ 7 files changed, 177 insertions(+), 81 deletions(-) create mode 100644 clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatThread.java create mode 100644 clients/src/test/java/org/apache/kafka/clients/consumer/internals/MockHeartbeatThread.java diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java index b530ca562b981..e7bfd4fdcb898 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java @@ -67,7 +67,6 @@ import org.apache.kafka.common.telemetry.internals.ClientTelemetryProvider; import org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter; import org.apache.kafka.common.utils.ExponentialBackoff; -import org.apache.kafka.common.utils.KafkaThread; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Timer; @@ -84,7 +83,7 @@ import java.util.Objects; import java.util.Optional; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Supplier; /** * AbstractCoordinator implements group management for a single group member by interacting with @@ -135,6 +134,7 @@ public boolean hasNotJoinedGroup() { private final GroupCoordinatorMetrics sensors; private final GroupRebalanceConfig rebalanceConfig; private final Optional clientTelemetryReporter; + private final Supplier heartbeatThreadSupplier; protected final Time time; protected final ConsumerNetworkClient client; @@ -144,7 +144,7 @@ public boolean hasNotJoinedGroup() { private String rejoinReason = ""; private boolean rejoinNeeded = true; private boolean needsJoinPrepare = true; - private HeartbeatThread heartbeatThread = null; + private AbstractHeartbeatThread heartbeatThread = null; private RequestFuture joinFuture = null; private RequestFuture findCoordinatorFuture = null; private volatile RuntimeException fatalFindCoordinatorException = null; @@ -165,7 +165,7 @@ public AbstractCoordinator(GroupRebalanceConfig rebalanceConfig, Metrics metrics, String metricGrpPrefix, Time time) { - this(rebalanceConfig, logContext, client, metrics, metricGrpPrefix, time, Optional.empty()); + this(rebalanceConfig, logContext, client, metrics, metricGrpPrefix, time, Optional.empty(), Optional.empty()); } public AbstractCoordinator(GroupRebalanceConfig rebalanceConfig, @@ -174,7 +174,8 @@ public AbstractCoordinator(GroupRebalanceConfig rebalanceConfig, Metrics metrics, String metricGrpPrefix, Time time, - Optional clientTelemetryReporter) { + Optional clientTelemetryReporter, + Optional> heartbeatThreadSupplier) { Objects.requireNonNull(rebalanceConfig.groupId, "Expected a non-null group id for coordinator construction"); this.rebalanceConfig = rebalanceConfig; @@ -189,6 +190,7 @@ public AbstractCoordinator(GroupRebalanceConfig rebalanceConfig, this.heartbeat = new Heartbeat(rebalanceConfig, time); this.sensors = new GroupCoordinatorMetrics(metrics, metricGrpPrefix); this.clientTelemetryReporter = clientTelemetryReporter; + this.heartbeatThreadSupplier = heartbeatThreadSupplier.orElseGet(() -> HeartbeatThread::new); } /** @@ -417,13 +419,13 @@ boolean ensureActiveGroup(final Timer timer) { private synchronized void startHeartbeatThreadIfNeeded() { if (heartbeatThread == null) { - heartbeatThread = new HeartbeatThread(); + heartbeatThread = heartbeatThreadSupplier.get(); heartbeatThread.start(); } } private void closeHeartbeatThread() { - HeartbeatThread thread; + AbstractHeartbeatThread thread; synchronized (this) { if (heartbeatThread == null) return; @@ -1330,6 +1332,13 @@ protected final Meter createMeter(Metrics metrics, String groupName, String base String.format("The total number of %s", descriptiveName))); } + /** + * Visible for testing. + */ + protected boolean isHeartbeatThreadEnabled() { + return heartbeatThread != null && heartbeatThread.isEnabled(); + } + private class GroupCoordinatorMetrics { public final String metricGrpName; @@ -1436,56 +1445,40 @@ public GroupCoordinatorMetrics(Metrics metrics, String metricGrpPrefix) { } } - private class HeartbeatThread extends KafkaThread implements AutoCloseable { - private boolean enabled = false; - private boolean closed = false; - private final AtomicReference failed = new AtomicReference<>(null); + private class HeartbeatThread extends AbstractHeartbeatThread { private HeartbeatThread() { super(HEARTBEAT_THREAD_PREFIX + (rebalanceConfig.groupId.isEmpty() ? "" : " | " + rebalanceConfig.groupId), true); } + @Override public void enable() { synchronized (AbstractCoordinator.this) { log.debug("Enabling heartbeat thread"); - this.enabled = true; + super.enable(); heartbeat.resetTimeouts(); AbstractCoordinator.this.notify(); } } - public void disable() { - synchronized (AbstractCoordinator.this) { - log.debug("Disabling heartbeat thread"); - this.enabled = false; - } - } - + @Override public void close() { synchronized (AbstractCoordinator.this) { - this.closed = true; + super.close(); AbstractCoordinator.this.notify(); } } - private boolean hasFailed() { - return failed.get() != null; - } - - private RuntimeException failureCause() { - return failed.get(); - } - @Override public void run() { try { log.debug("Heartbeat thread started"); while (true) { synchronized (AbstractCoordinator.this) { - if (closed) + if (isClosed()) return; - if (!enabled) { + if (!isEnabled()) { AbstractCoordinator.this.wait(); continue; } @@ -1547,7 +1540,7 @@ public void onFailure(RuntimeException e) { heartbeat.receiveHeartbeat(); } else if (e instanceof FencedInstanceIdException) { log.error("Caught fenced group.instance.id {} error in heartbeat thread", rebalanceConfig.groupInstanceId); - heartbeatThread.failed.set(e); + setFailed(e); } else { heartbeat.failHeartbeat(); // wake up the thread if it's sleeping to reschedule the heartbeat @@ -1561,28 +1554,25 @@ public void onFailure(RuntimeException e) { } } catch (AuthenticationException e) { log.error("An authentication error occurred in the heartbeat thread", e); - this.failed.set(e); + setFailed(e); } catch (GroupAuthorizationException e) { log.error("A group authorization error occurred in the heartbeat thread", e); - this.failed.set(e); + setFailed(e); } catch (InterruptedException | InterruptException e) { Thread.interrupted(); log.error("Unexpected interrupt received in heartbeat thread", e); - this.failed.set(new RuntimeException(e)); + setFailed(new RuntimeException(e)); } catch (Throwable e) { log.error("Heartbeat thread failed due to unexpected error", e); if (e instanceof RuntimeException) - this.failed.set((RuntimeException) e); + setFailed((RuntimeException) e); else - this.failed.set(new RuntimeException(e)); + setFailed(new RuntimeException(e)); } finally { log.debug("Heartbeat thread has closed"); - synchronized (AbstractCoordinator.this) { - this.closed = true; - } + close(); } } - } protected static class Generation { diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatThread.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatThread.java new file mode 100644 index 0000000000000..03f7ad1e388a2 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatThread.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.common.utils.KafkaThread; + +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +public abstract class AbstractHeartbeatThread extends KafkaThread implements AutoCloseable { + private final AtomicBoolean enabled = new AtomicBoolean(false); + private final AtomicBoolean closed = new AtomicBoolean(false); + private final AtomicReference failed = new AtomicReference<>(null); + + public AbstractHeartbeatThread(String name, boolean daemon) { + super(name, daemon); + } + + public void enable() { + enabled.set(true); + } + + public void disable() { + enabled.set(false); + } + + public boolean isEnabled() { + return enabled.get(); + } + + public void setFailed(RuntimeException e) { + failed.set(e); + } + + public boolean hasFailed() { + return failed.get() != null; + } + + public RuntimeException failureCause() { + return failed.get(); + } + + public void close() { + closed.set(true); + } + + public boolean isClosed() { + return closed.get(); + } +} diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java index 9a8c637d77857..e654e7bd04315 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java @@ -230,7 +230,8 @@ public class ClassicKafkaConsumer implements ConsumerDelegate { this.interceptors, config.getBoolean(THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED), config.getString(ConsumerConfig.CLIENT_RACK_CONFIG), - clientTelemetryReporter); + clientTelemetryReporter, + Optional.empty()); } this.fetcher = new Fetcher<>( logContext, @@ -345,8 +346,8 @@ public class ClassicKafkaConsumer implements ConsumerDelegate { interceptors, throwOnStableOffsetNotSupported, rackId, - clientTelemetryReporter - ); + clientTelemetryReporter, + Optional.empty()); } else { this.coordinator = null; } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java index 584a03736f97f..2938f44489c40 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java @@ -86,6 +86,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Supplier; import java.util.stream.Collectors; import static org.apache.kafka.clients.consumer.ConsumerConfig.ASSIGN_FROM_SUBSCRIBED_ASSIGNORS; @@ -175,14 +176,16 @@ public ConsumerCoordinator(GroupRebalanceConfig rebalanceConfig, ConsumerInterceptors interceptors, boolean throwOnFetchStableOffsetsUnsupported, String rackId, - Optional clientTelemetryReporter) { + Optional clientTelemetryReporter, + Optional> heartbeatThreadSupplier) { super(rebalanceConfig, logContext, client, metrics, metricGrpPrefix, time, - clientTelemetryReporter); + clientTelemetryReporter, + heartbeatThreadSupplier); this.rebalanceConfig = rebalanceConfig; this.log = logContext.logger(ConsumerCoordinator.class); this.metadata = metadata; diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java index f144ccf5061c5..3d70eae479d0f 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java @@ -51,7 +51,6 @@ import org.apache.kafka.common.requests.RequestTestUtils; import org.apache.kafka.common.requests.SyncGroupRequest; import org.apache.kafka.common.requests.SyncGroupResponse; -import org.apache.kafka.common.test.api.Flaky; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.Time; @@ -75,6 +74,7 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Supplier; import static java.util.Collections.emptyMap; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -120,15 +120,15 @@ public void closeCoordinator() { private void setupCoordinator() { setupCoordinator(RETRY_BACKOFF_MS, RETRY_BACKOFF_MAX_MS, REBALANCE_TIMEOUT_MS, - Optional.empty()); + Optional.empty(), Optional.empty()); } private void setupCoordinator(int retryBackoffMs, int retryBackoffMaxMs) { setupCoordinator(retryBackoffMs, retryBackoffMaxMs, REBALANCE_TIMEOUT_MS, - Optional.empty()); + Optional.empty(), Optional.empty()); } - private void setupCoordinator(int retryBackoffMs, int retryBackoffMaxMs, int rebalanceTimeoutMs, Optional groupInstanceId) { + private void setupCoordinator(int retryBackoffMs, int retryBackoffMaxMs, int rebalanceTimeoutMs, Optional groupInstanceId, Optional> heartbeatThreadSupplier) { LogContext logContext = new LogContext(); this.mockTime = new MockTime(); ConsumerMetadata metadata = new ConsumerMetadata(retryBackoffMs, retryBackoffMaxMs, 60 * 60 * 1000L, @@ -160,7 +160,8 @@ false, false, new SubscriptionState(logContext, AutoOffsetResetStrategy.EARLIEST this.coordinator = new DummyCoordinator(rebalanceConfig, consumerClient, metrics, - mockTime); + mockTime, + heartbeatThreadSupplier); } private void joinGroup() { @@ -349,8 +350,7 @@ public void testGroupMaxSizeExceptionIsFatal() { @Test public void testJoinGroupRequestTimeout() { - setupCoordinator(RETRY_BACKOFF_MS, RETRY_BACKOFF_MAX_MS, REBALANCE_TIMEOUT_MS, - Optional.empty()); + setupCoordinator(); mockClient.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); coordinator.ensureCoordinatorReady(mockTime.timer(0)); @@ -367,7 +367,7 @@ public void testJoinGroupRequestTimeout() { @Test public void testJoinGroupRequestTimeoutLowerBoundedByDefaultRequestTimeout() { int rebalanceTimeoutMs = REQUEST_TIMEOUT_MS - 10000; - setupCoordinator(RETRY_BACKOFF_MS, RETRY_BACKOFF_MAX_MS, rebalanceTimeoutMs, Optional.empty()); + setupCoordinator(RETRY_BACKOFF_MS, RETRY_BACKOFF_MAX_MS, rebalanceTimeoutMs, Optional.empty(), Optional.empty()); mockClient.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); coordinator.ensureCoordinatorReady(mockTime.timer(0)); @@ -387,7 +387,7 @@ public void testJoinGroupRequestMaxTimeout() { // Ensure we can handle the maximum allowed rebalance timeout setupCoordinator(RETRY_BACKOFF_MS, RETRY_BACKOFF_MAX_MS, Integer.MAX_VALUE, - Optional.empty()); + Optional.empty(), Optional.empty()); mockClient.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); coordinator.ensureCoordinatorReady(mockTime.timer(0)); @@ -1094,7 +1094,7 @@ public void testLeaveGroupSentWithGroupInstanceIdUnSet() { } private void checkLeaveGroupRequestSent(Optional groupInstanceId) { - setupCoordinator(RETRY_BACKOFF_MS, RETRY_BACKOFF_MAX_MS, Integer.MAX_VALUE, groupInstanceId); + setupCoordinator(RETRY_BACKOFF_MS, RETRY_BACKOFF_MAX_MS, Integer.MAX_VALUE, groupInstanceId, Optional.empty()); mockClient.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); mockClient.prepareResponse(joinGroupFollowerResponse(1, memberId, leaderId, Errors.NONE)); @@ -1189,7 +1189,7 @@ private RequestFuture setupLeaveGroup(LeaveGroupResponse leaveGroupRespons private RequestFuture setupLeaveGroup(LeaveGroupResponse leaveGroupResponse, String leaveReason, String expectedLeaveReason) { - setupCoordinator(RETRY_BACKOFF_MS, RETRY_BACKOFF_MAX_MS, Integer.MAX_VALUE, Optional.empty()); + setupCoordinator(RETRY_BACKOFF_MS, RETRY_BACKOFF_MAX_MS, Integer.MAX_VALUE, Optional.empty(), Optional.empty()); mockClient.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); mockClient.prepareResponse(joinGroupFollowerResponse(1, memberId, leaderId, Errors.NONE)); @@ -1435,10 +1435,10 @@ public void testWakeupAfterJoinGroupReceivedExternalCompletion() throws Exceptio awaitFirstHeartbeat(heartbeatReceived); } - @Flaky("KAFKA-18310") @Test public void testWakeupAfterSyncGroupSentExternalCompletion() throws Exception { - setupCoordinator(); + setupCoordinator(RETRY_BACKOFF_MS, RETRY_BACKOFF_MAX_MS, REBALANCE_TIMEOUT_MS, + Optional.empty(), Optional.of(MockHeartbeatThread::new)); mockClient.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); mockClient.prepareResponse(joinGroupFollowerResponse(1, memberId, leaderId, Errors.NONE)); @@ -1454,13 +1454,12 @@ public boolean matches(AbstractRequest body) { return isSyncGroupRequest; } }, syncGroupResponse(Errors.NONE)); - AtomicBoolean heartbeatReceived = prepareFirstHeartbeat(); assertThrows(WakeupException.class, () -> coordinator.ensureActiveGroup(), "Should have woken up from ensureActiveGroup()"); assertEquals(1, coordinator.onJoinPrepareInvokes); assertEquals(0, coordinator.onJoinCompleteInvokes); - assertFalse(heartbeatReceived.get()); + assertTrue(coordinator.isHeartbeatThreadEnabled()); // the join group completes in this poll() consumerClient.poll(mockTime.timer(0)); @@ -1468,14 +1467,12 @@ public boolean matches(AbstractRequest body) { assertEquals(1, coordinator.onJoinPrepareInvokes); assertEquals(1, coordinator.onJoinCompleteInvokes); - - awaitFirstHeartbeat(heartbeatReceived); } - @Flaky("KAFKA-18310") @Test public void testWakeupAfterSyncGroupReceived() throws Exception { - setupCoordinator(); + setupCoordinator(RETRY_BACKOFF_MS, RETRY_BACKOFF_MAX_MS, REBALANCE_TIMEOUT_MS, + Optional.empty(), Optional.of(MockHeartbeatThread::new)); mockClient.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); mockClient.prepareResponse(joinGroupFollowerResponse(1, memberId, leaderId, Errors.NONE)); @@ -1486,7 +1483,6 @@ public void testWakeupAfterSyncGroupReceived() throws Exception { consumerClient.wakeup(); return isSyncGroupRequest; }, syncGroupResponse(Errors.NONE)); - AtomicBoolean heartbeatReceived = prepareFirstHeartbeat(); try { coordinator.ensureActiveGroup(); @@ -1496,20 +1492,18 @@ public void testWakeupAfterSyncGroupReceived() throws Exception { assertEquals(1, coordinator.onJoinPrepareInvokes); assertEquals(0, coordinator.onJoinCompleteInvokes); - assertFalse(heartbeatReceived.get()); + assertTrue(coordinator.isHeartbeatThreadEnabled()); coordinator.ensureActiveGroup(); assertEquals(1, coordinator.onJoinPrepareInvokes); assertEquals(1, coordinator.onJoinCompleteInvokes); - - awaitFirstHeartbeat(heartbeatReceived); } - @Flaky("KAFKA-15474,KAFKA-18310") @Test public void testWakeupAfterSyncGroupReceivedExternalCompletion() throws Exception { - setupCoordinator(); + setupCoordinator(RETRY_BACKOFF_MS, RETRY_BACKOFF_MAX_MS, REBALANCE_TIMEOUT_MS, + Optional.empty(), Optional.of(MockHeartbeatThread::new)); mockClient.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); mockClient.prepareResponse(joinGroupFollowerResponse(1, memberId, leaderId, Errors.NONE)); @@ -1520,20 +1514,17 @@ public void testWakeupAfterSyncGroupReceivedExternalCompletion() throws Exceptio consumerClient.wakeup(); return isSyncGroupRequest; }, syncGroupResponse(Errors.NONE)); - AtomicBoolean heartbeatReceived = prepareFirstHeartbeat(); assertThrows(WakeupException.class, () -> coordinator.ensureActiveGroup(), "Should have woken up from ensureActiveGroup()"); assertEquals(1, coordinator.onJoinPrepareInvokes); assertEquals(0, coordinator.onJoinCompleteInvokes); - assertFalse(heartbeatReceived.get()); + assertTrue(coordinator.isHeartbeatThreadEnabled()); coordinator.ensureActiveGroup(); assertEquals(1, coordinator.onJoinPrepareInvokes); assertEquals(1, coordinator.onJoinCompleteInvokes); - - awaitFirstHeartbeat(heartbeatReceived); } @Test @@ -1707,8 +1698,9 @@ public static class DummyCoordinator extends AbstractCoordinator { DummyCoordinator(GroupRebalanceConfig rebalanceConfig, ConsumerNetworkClient client, Metrics metrics, - Time time) { - super(rebalanceConfig, new LogContext(), client, metrics, METRIC_GROUP_PREFIX, time); + Time time, + Optional> heartbeatThreadSupplier) { + super(rebalanceConfig, new LogContext(), client, metrics, METRIC_GROUP_PREFIX, time, Optional.empty(), heartbeatThreadSupplier); } @Override @@ -1750,5 +1742,4 @@ protected void onJoinComplete(int generation, String memberId, String protocol, onJoinCompleteInvokes++; } } - } diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java index 3dd91dc7639f5..36578da620a3c 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java @@ -76,7 +76,6 @@ import org.apache.kafka.common.requests.RequestTestUtils; import org.apache.kafka.common.requests.SyncGroupRequest; import org.apache.kafka.common.requests.SyncGroupResponse; -import org.apache.kafka.common.test.api.Flaky; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.Time; @@ -1010,10 +1009,9 @@ public void testNormalJoinGroupLeader() { assertEquals(getAdded(owned, assigned), rebalanceListener.assigned); } - - @Flaky("KAFKA-15900") @Test public void testOutdatedCoordinatorAssignment() { + createMockHeartbeatThreadCoordinator(); final String consumerId = "outdated_assignment"; final List owned = Collections.emptyList(); final List oldSubscription = singletonList(topic2); @@ -3696,6 +3694,7 @@ private void supportStableFlag(final short upperVersion, final boolean expectThr null, true, null, + Optional.empty(), Optional.empty()); client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); @@ -3860,6 +3859,7 @@ private ConsumerCoordinator buildCoordinator(final GroupRebalanceConfig rebalanc null, false, null, + Optional.empty(), Optional.empty()); } @@ -4086,7 +4086,7 @@ private void createRackAwareCoordinator(String rackId, MockPartitionAssignor ass coordinator = new ConsumerCoordinator(rebalanceConfig, new LogContext(), consumerClient, Collections.singletonList(assignor), metadata, subscriptions, - metrics, consumerId + groupId, time, false, autoCommitIntervalMs, null, false, rackId, Optional.empty()); + metrics, consumerId + groupId, time, false, autoCommitIntervalMs, null, false, rackId, Optional.empty(), Optional.empty()); } private static MetadataResponse rackAwareMetadata(int numNodes, @@ -4145,4 +4145,28 @@ public Map> assign(Map partitionsP return super.assign(partitionsPerTopic, subscriptions); } } + + private void createMockHeartbeatThreadCoordinator() { + metrics.close(); + coordinator.close(time.timer(0)); + + metrics = new Metrics(time); + coordinator = new ConsumerCoordinator( + rebalanceConfig, + new LogContext(), + consumerClient, + assignors, + metadata, + subscriptions, + metrics, + consumerId + groupId, + time, + false, + autoCommitIntervalMs, + null, + false, + null, + Optional.empty(), + Optional.of(MockHeartbeatThread::new)); + } } diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/MockHeartbeatThread.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/MockHeartbeatThread.java new file mode 100644 index 0000000000000..bca2d62fea8ce --- /dev/null +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/MockHeartbeatThread.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +public class MockHeartbeatThread extends AbstractHeartbeatThread { + MockHeartbeatThread() { + super("MockHeartbeatThread", true); + } +}