Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kafka.clients.admin;

/**
* Callback invoked periodically by {@link ClusterEventPublisher} to emit accumulated events.
*/
@FunctionalInterface
public interface ClusterEventEmitter {

/**
* Emit accumulated data as cluster events via the given publisher.
*
* @param publisher the publisher to send events through
*/
void emit(IClusterEventPublisher publisher);
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,12 @@
import java.net.URI;
import java.time.OffsetDateTime;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

Expand All @@ -45,12 +49,18 @@
* a warning is logged and the event is dropped. Events must never block or fail the critical path
* (rebalancing, failover, request handling).
*
* <p>Registered {@link ClusterEventEmitter}s are invoked periodically by an internal scheduler
* to flush accumulated data as events.
*
* <p>Example usage:
* <pre>{@code
* // At broker startup
* ClusterEventPublisher.setup(Map.of("bootstrap.servers", "localhost:9092"));
*
* // From any code path
* // Register emitters
* ClusterEventPublisher.registerEmitter(requestErrorAccumulator);
*
* // From any code path (direct publish)
* ClusterEventPublisher.publish(
* "com.automq.risk.request_error",
* "/automq/broker/0",
Expand All @@ -69,6 +79,19 @@ public class ClusterEventPublisher implements IClusterEventPublisher {
private static final AtomicReference<IClusterEventPublisher> INSTANCE =
new AtomicReference<>(NoopClusterEventPublisher.INSTANCE);

private static final List<ClusterEventEmitter> EMITTERS = new CopyOnWriteArrayList<>();

private static final long FLUSH_INTERVAL_MS = 30_000L;

static {
Executors.newSingleThreadScheduledExecutor(r -> {
Thread t = new Thread(r, "cluster-event-publisher-flush");
t.setDaemon(true);
return t;
}).scheduleAtFixedRate(ClusterEventPublisher::flushEmitters,
FLUSH_INTERVAL_MS, FLUSH_INTERVAL_MS, TimeUnit.MILLISECONDS);
}

private final KafkaProducer<String, CloudEvent> producer;
private final AtomicBoolean closed = new AtomicBoolean(false);

Expand All @@ -95,8 +118,8 @@ private ClusterEventPublisher(Map<String, Object> config) {
// ---- Global singleton ----

/**
* Initialize the global singleton publisher. If already set up, the previous instance
* is closed and replaced.
* Initialize the global singleton publisher.
* If already set up, the previous instance is closed and replaced.
*
* @param config producer configuration map
*/
Expand All @@ -108,15 +131,30 @@ public static void setup(Map<String, Object> config) {
}

/**
* Shut down the global singleton publisher, reverting to the no-op instance.
* Shut down the global singleton publisher and clear all emitters.
*/
public static void shutdown() {
EMITTERS.clear();
IClusterEventPublisher prev = INSTANCE.getAndSet(NoopClusterEventPublisher.INSTANCE);
if (prev != null) {
prev.close();
}
}

/**
* Register an emitter to be invoked on each flush cycle.
*/
public static void registerEmitter(ClusterEventEmitter emitter) {
EMITTERS.add(emitter);
}

/**
* Remove a previously registered emitter.
*/
public static void removeEmitter(ClusterEventEmitter emitter) {
EMITTERS.remove(emitter);
}

/**
* Publish an event via the global singleton. If {@link #setup(Map)} has not been called,
* this is a no-op.
Expand All @@ -125,6 +163,19 @@ public static void publish(String type, String source, String subject, String da
INSTANCE.get().publishEvent(type, source, subject, dataSchema, data);
}

// ---- Flush logic ----

private static void flushEmitters() {
IClusterEventPublisher publisher = INSTANCE.get();
for (ClusterEventEmitter emitter : EMITTERS) {
try {
emitter.emit(publisher);
} catch (Throwable e) {
log.warn("Error invoking cluster event emitter {}", emitter.getClass().getSimpleName(), e);
}
}
}

// ---- Instance methods ----

@Override
Expand Down
43 changes: 11 additions & 32 deletions core/src/main/java/kafka/server/RequestErrorAccumulator.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@
*/
package kafka.server;

import org.apache.kafka.clients.admin.ClusterEventPublisher;
import org.apache.kafka.clients.admin.ClusterEventEmitter;
import org.apache.kafka.clients.admin.IClusterEventPublisher;
import org.apache.kafka.clients.admin.RequestErrorEventData;
import org.apache.kafka.common.protocol.ApiKeys;

Expand All @@ -28,12 +29,9 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.LongAdder;

public class RequestErrorAccumulator implements AutoCloseable {
public class RequestErrorAccumulator implements ClusterEventEmitter {

private static final Logger log = LoggerFactory.getLogger(RequestErrorAccumulator.class);

Expand All @@ -49,19 +47,10 @@ static class ErrorBucket {

private final ConcurrentHashMap<ErrorKey, ErrorBucket> buckets = new ConcurrentHashMap<>();
private final int brokerId;
private final long flushIntervalMs;
private final ScheduledExecutorService scheduler;
private volatile long lastEmitMs = System.currentTimeMillis();

public RequestErrorAccumulator(int brokerId, long flushIntervalMs) {
public RequestErrorAccumulator(int brokerId) {
this.brokerId = brokerId;
this.flushIntervalMs = flushIntervalMs;
this.scheduler = Executors.newSingleThreadScheduledExecutor(r -> {
Thread t = new Thread(r, "request-error-accumulator-flush");
t.setDaemon(true);
return t;
});
this.scheduler.scheduleAtFixedRate(this::flush,
flushIntervalMs, flushIntervalMs, TimeUnit.MILLISECONDS);
}

public static boolean isRecordable(short errorCode) {
Expand All @@ -81,16 +70,11 @@ public void record(short apiKey, short errorCode, String resource,
}
}

void flush() {
try {
doFlush();
} catch (Throwable e) {
log.warn("Error flushing request error events", e);
}
}

private void doFlush() {
double intervalSec = flushIntervalMs / 1000.0;
@Override
public void emit(IClusterEventPublisher publisher) {
long now = System.currentTimeMillis();
double intervalSec = Math.max(now - lastEmitMs, 1) / 1000.0;
lastEmitMs = now;
Iterator<Map.Entry<ErrorKey, ErrorBucket>> it = buckets.entrySet().iterator();
while (it.hasNext()) {
Map.Entry<ErrorKey, ErrorBucket> entry = it.next();
Expand Down Expand Up @@ -119,17 +103,12 @@ private void doFlush() {
String subject = key.resource().isEmpty()
? apiName : apiName + ":" + key.resource();

ClusterEventPublisher.publish(
publisher.publishEvent(
RequestErrorEventData.TYPE,
"/automq/broker/" + brokerId,
subject,
RequestErrorEventData.DATA_SCHEMA,
data.toByteArray());
}
}

@Override
public void close() {
scheduler.shutdownNow();
}
}
12 changes: 6 additions & 6 deletions core/src/main/scala/kafka/server/BrokerServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import kafka.raft.KafkaRaftManager
import kafka.server.metadata.{AclPublisher, BrokerMetadataPublisher, ClientQuotaMetadataManager, DelegationTokenPublisher, DynamicClientQuotaPublisher, DynamicConfigPublisher, KRaftMetadataCache, ScramPublisher}
import kafka.server.streamaspect.{ElasticKafkaApis, ElasticReplicaManager, PartitionLifecycleListener}
import kafka.utils.CoreUtils
import org.apache.kafka.clients.admin.ClusterEventPublisher
import org.apache.kafka.common.config.ConfigException
import org.apache.kafka.common.message.ApiMessageType.ListenerType
import org.apache.kafka.common.metrics.Metrics
Expand Down Expand Up @@ -470,8 +471,9 @@ class BrokerServer(
publisherConfig.put(k.toString, v)
}
config.rack.foreach(rack => publisherConfig.put("client.id", "automq_az=" + rack))
org.apache.kafka.clients.admin.ClusterEventPublisher.setup(publisherConfig)
requestErrorAccumulator = new RequestErrorAccumulator(config.nodeId, 30000L)
ClusterEventPublisher.setup(publisherConfig)
requestErrorAccumulator = new RequestErrorAccumulator(config.nodeId)
ClusterEventPublisher.registerEmitter(requestErrorAccumulator)
socketServer.dataPlaneRequestChannel.requestErrorAccumulator = requestErrorAccumulator
} catch {
case e: Throwable =>
Expand Down Expand Up @@ -814,10 +816,8 @@ class BrokerServer(
if (quotaManagers != null)
CoreUtils.swallow(quotaManagers.shutdown(), this)

// AutoMQ inject start - shutdown request error accumulator and cluster event publisher
if (requestErrorAccumulator != null)
CoreUtils.swallow(requestErrorAccumulator.close(), this)
CoreUtils.swallow(org.apache.kafka.clients.admin.ClusterEventPublisher.shutdown(), this)
// AutoMQ inject start - shutdown cluster event publisher (also stops emitter scheduler)
CoreUtils.swallow(ClusterEventPublisher.shutdown(), this)
// AutoMQ inject end

if (socketServer != null)
Expand Down
Loading