Skip to content

Commit

Permalink
KAFKA-15900, KAFKA-18310: fix flaky test testOutdatedCoordinatorAssig…
Browse files Browse the repository at this point in the history
…nment and AbstractCoordinatorTest

Signed-off-by: PoAn Yang <payang@apache.org>
  • Loading branch information
FrankYang0529 committed Feb 19, 2025
1 parent 900d81b commit 53d3094
Show file tree
Hide file tree
Showing 7 changed files with 177 additions and 81 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@
import org.apache.kafka.common.telemetry.internals.ClientTelemetryProvider;
import org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter;
import org.apache.kafka.common.utils.ExponentialBackoff;
import org.apache.kafka.common.utils.KafkaThread;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Timer;
Expand All @@ -84,7 +83,7 @@
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;

/**
* AbstractCoordinator implements group management for a single group member by interacting with
Expand Down Expand Up @@ -135,6 +134,7 @@ public boolean hasNotJoinedGroup() {
private final GroupCoordinatorMetrics sensors;
private final GroupRebalanceConfig rebalanceConfig;
private final Optional<ClientTelemetryReporter> clientTelemetryReporter;
private final Supplier<AbstractHeartbeatThread> heartbeatThreadSupplier;

protected final Time time;
protected final ConsumerNetworkClient client;
Expand All @@ -144,7 +144,7 @@ public boolean hasNotJoinedGroup() {
private String rejoinReason = "";
private boolean rejoinNeeded = true;
private boolean needsJoinPrepare = true;
private HeartbeatThread heartbeatThread = null;
private AbstractHeartbeatThread heartbeatThread = null;
private RequestFuture<ByteBuffer> joinFuture = null;
private RequestFuture<Void> findCoordinatorFuture = null;
private volatile RuntimeException fatalFindCoordinatorException = null;
Expand All @@ -165,7 +165,7 @@ public AbstractCoordinator(GroupRebalanceConfig rebalanceConfig,
Metrics metrics,
String metricGrpPrefix,
Time time) {
this(rebalanceConfig, logContext, client, metrics, metricGrpPrefix, time, Optional.empty());
this(rebalanceConfig, logContext, client, metrics, metricGrpPrefix, time, Optional.empty(), Optional.empty());
}

public AbstractCoordinator(GroupRebalanceConfig rebalanceConfig,
Expand All @@ -174,7 +174,8 @@ public AbstractCoordinator(GroupRebalanceConfig rebalanceConfig,
Metrics metrics,
String metricGrpPrefix,
Time time,
Optional<ClientTelemetryReporter> clientTelemetryReporter) {
Optional<ClientTelemetryReporter> clientTelemetryReporter,
Optional<Supplier<AbstractHeartbeatThread>> heartbeatThreadSupplier) {
Objects.requireNonNull(rebalanceConfig.groupId,
"Expected a non-null group id for coordinator construction");
this.rebalanceConfig = rebalanceConfig;
Expand All @@ -189,6 +190,7 @@ public AbstractCoordinator(GroupRebalanceConfig rebalanceConfig,
this.heartbeat = new Heartbeat(rebalanceConfig, time);
this.sensors = new GroupCoordinatorMetrics(metrics, metricGrpPrefix);
this.clientTelemetryReporter = clientTelemetryReporter;
this.heartbeatThreadSupplier = heartbeatThreadSupplier.orElseGet(() -> HeartbeatThread::new);
}

/**
Expand Down Expand Up @@ -417,13 +419,13 @@ boolean ensureActiveGroup(final Timer timer) {

private synchronized void startHeartbeatThreadIfNeeded() {
if (heartbeatThread == null) {
heartbeatThread = new HeartbeatThread();
heartbeatThread = heartbeatThreadSupplier.get();
heartbeatThread.start();
}
}

private void closeHeartbeatThread() {
HeartbeatThread thread;
AbstractHeartbeatThread thread;
synchronized (this) {
if (heartbeatThread == null)
return;
Expand Down Expand Up @@ -1330,6 +1332,13 @@ protected final Meter createMeter(Metrics metrics, String groupName, String base
String.format("The total number of %s", descriptiveName)));
}

/**
* Visible for testing.
*/
protected boolean isHeartbeatThreadEnabled() {
return heartbeatThread != null && heartbeatThread.isEnabled();
}

private class GroupCoordinatorMetrics {
public final String metricGrpName;

Expand Down Expand Up @@ -1436,56 +1445,40 @@ public GroupCoordinatorMetrics(Metrics metrics, String metricGrpPrefix) {
}
}

private class HeartbeatThread extends KafkaThread implements AutoCloseable {
private boolean enabled = false;
private boolean closed = false;
private final AtomicReference<RuntimeException> failed = new AtomicReference<>(null);
private class HeartbeatThread extends AbstractHeartbeatThread {

private HeartbeatThread() {
super(HEARTBEAT_THREAD_PREFIX + (rebalanceConfig.groupId.isEmpty() ? "" : " | " + rebalanceConfig.groupId), true);
}

@Override
public void enable() {
synchronized (AbstractCoordinator.this) {
log.debug("Enabling heartbeat thread");
this.enabled = true;
super.enable();
heartbeat.resetTimeouts();
AbstractCoordinator.this.notify();
}
}

public void disable() {
synchronized (AbstractCoordinator.this) {
log.debug("Disabling heartbeat thread");
this.enabled = false;
}
}

@Override
public void close() {
synchronized (AbstractCoordinator.this) {
this.closed = true;
super.close();
AbstractCoordinator.this.notify();
}
}

private boolean hasFailed() {
return failed.get() != null;
}

private RuntimeException failureCause() {
return failed.get();
}

@Override
public void run() {
try {
log.debug("Heartbeat thread started");
while (true) {
synchronized (AbstractCoordinator.this) {
if (closed)
if (isClosed())
return;

if (!enabled) {
if (!isEnabled()) {
AbstractCoordinator.this.wait();
continue;
}
Expand Down Expand Up @@ -1547,7 +1540,7 @@ public void onFailure(RuntimeException e) {
heartbeat.receiveHeartbeat();
} else if (e instanceof FencedInstanceIdException) {
log.error("Caught fenced group.instance.id {} error in heartbeat thread", rebalanceConfig.groupInstanceId);
heartbeatThread.failed.set(e);
setFailed(e);
} else {
heartbeat.failHeartbeat();
// wake up the thread if it's sleeping to reschedule the heartbeat
Expand All @@ -1561,28 +1554,25 @@ public void onFailure(RuntimeException e) {
}
} catch (AuthenticationException e) {
log.error("An authentication error occurred in the heartbeat thread", e);
this.failed.set(e);
setFailed(e);
} catch (GroupAuthorizationException e) {
log.error("A group authorization error occurred in the heartbeat thread", e);
this.failed.set(e);
setFailed(e);
} catch (InterruptedException | InterruptException e) {
Thread.interrupted();
log.error("Unexpected interrupt received in heartbeat thread", e);
this.failed.set(new RuntimeException(e));
setFailed(new RuntimeException(e));
} catch (Throwable e) {
log.error("Heartbeat thread failed due to unexpected error", e);
if (e instanceof RuntimeException)
this.failed.set((RuntimeException) e);
setFailed((RuntimeException) e);
else
this.failed.set(new RuntimeException(e));
setFailed(new RuntimeException(e));
} finally {
log.debug("Heartbeat thread has closed");
synchronized (AbstractCoordinator.this) {
this.closed = true;
}
close();
}
}

}

protected static class Generation {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.consumer.internals;

import org.apache.kafka.common.utils.KafkaThread;

import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

public abstract class AbstractHeartbeatThread extends KafkaThread implements AutoCloseable {
private final AtomicBoolean enabled = new AtomicBoolean(false);
private final AtomicBoolean closed = new AtomicBoolean(false);
private final AtomicReference<RuntimeException> failed = new AtomicReference<>(null);

public AbstractHeartbeatThread(String name, boolean daemon) {
super(name, daemon);
}

public void enable() {
enabled.set(true);
}

public void disable() {
enabled.set(false);
}

public boolean isEnabled() {
return enabled.get();
}

public void setFailed(RuntimeException e) {
failed.set(e);
}

public boolean hasFailed() {
return failed.get() != null;
}

public RuntimeException failureCause() {
return failed.get();
}

public void close() {
closed.set(true);
}

public boolean isClosed() {
return closed.get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,8 @@ public class ClassicKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
this.interceptors,
config.getBoolean(THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED),
config.getString(ConsumerConfig.CLIENT_RACK_CONFIG),
clientTelemetryReporter);
clientTelemetryReporter,
Optional.empty());
}
this.fetcher = new Fetcher<>(
logContext,
Expand Down Expand Up @@ -345,8 +346,8 @@ public class ClassicKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
interceptors,
throwOnStableOffsetNotSupported,
rackId,
clientTelemetryReporter
);
clientTelemetryReporter,
Optional.empty());
} else {
this.coordinator = null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import java.util.stream.Collectors;

import static org.apache.kafka.clients.consumer.ConsumerConfig.ASSIGN_FROM_SUBSCRIBED_ASSIGNORS;
Expand Down Expand Up @@ -175,14 +176,16 @@ public ConsumerCoordinator(GroupRebalanceConfig rebalanceConfig,
ConsumerInterceptors<?, ?> interceptors,
boolean throwOnFetchStableOffsetsUnsupported,
String rackId,
Optional<ClientTelemetryReporter> clientTelemetryReporter) {
Optional<ClientTelemetryReporter> clientTelemetryReporter,
Optional<Supplier<AbstractHeartbeatThread>> heartbeatThreadSupplier) {
super(rebalanceConfig,
logContext,
client,
metrics,
metricGrpPrefix,
time,
clientTelemetryReporter);
clientTelemetryReporter,
heartbeatThreadSupplier);
this.rebalanceConfig = rebalanceConfig;
this.log = logContext.logger(ConsumerCoordinator.class);
this.metadata = metadata;
Expand Down
Loading

0 comments on commit 53d3094

Please sign in to comment.