Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
31611cd
feat(server-common): add inter-broker async sender API and implementa…
Gezi-lzq Mar 10, 2026
464825f
feat(core): add fetch listener hooks for fetch and session close
Gezi-lzq Mar 11, 2026
540a055
feat(core): dispatch fetch listener callbacks asynchronously
Gezi-lzq Mar 11, 2026
9e4e14b
chore: mark AutoMQ inject blocks for fetch listener
Gezi-lzq Mar 11, 2026
404439a
feat(extension): add methods to retrieve topic name and partition in …
Gezi-lzq Mar 11, 2026
26bd0e1
refactor: use AsyncFetchListener to avoid blocking in synchronized se…
Gezi-lzq Mar 11, 2026
c512b22
refactor(lag): sample leo index from unified-log latest append state
Gezi-lzq Mar 24, 2026
346926c
fix: resolve PR review issues
Gezi-lzq Mar 24, 2026
8b7c8cd
refactor: use bounded queue for fetch listener executor
Gezi-lzq Mar 24, 2026
bf71388
fix: add closed guard to AsyncSender and awaitTermination on shutdown
Gezi-lzq Mar 24, 2026
032dc42
feat(enterprise): add broker extension facade hook for enterprise loo…
Gezi-lzq Mar 24, 2026
9e52d9f
refactor: simplify fetch listener dispatch and skip NOOP overhead
Gezi-lzq Mar 24, 2026
e81f120
refactor: use AsyncFetchListener.onSessionClosedBatch in BrokerServer
Gezi-lzq Mar 24, 2026
feecfef
refactor(enterprise): keep enterprise offset timestamp protocol out o…
Gezi-lzq Mar 24, 2026
bc47a2f
feat: implement AsyncFetchListener for asynchronous fetch handling
Gezi-lzq Apr 13, 2026
ac4cf94
refactor: remove AsyncFetchListener and its associated test cases
Gezi-lzq Apr 13, 2026
c2f6b8f
refactor: remove unused fetchListenerExecutor and related code from B…
Gezi-lzq Apr 13, 2026
2533d55
refactor: remove unused ExecutorService import from BrokerServer
Gezi-lzq Apr 13, 2026
f212bdd
refactor: reorder import statements in FetchListener.java
Gezi-lzq Apr 13, 2026
bf5f33d
feat(core): add non-blocking contract Javadoc to FetchListener
Gezi-lzq Apr 13, 2026
5f14d70
refactor: update FetchListener to use TopicIdPartition instead of Top…
Gezi-lzq Apr 24, 2026
d3a40d4
feat(consumer-lag): enhance request handling during close to wait for…
Gezi-lzq Apr 24, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions core/src/main/scala/kafka/log/UnifiedLog.scala
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,10 @@ class UnifiedLog(@volatile var logStartOffset: Long,

def highestOffsetInRemoteStorage(): Long = _highestOffsetInRemoteStorage

// AutoMQ inject start
@volatile private var _latestAppendState: Option[LatestAppendState] = None
Comment thread
Gezi-lzq marked this conversation as resolved.
// AutoMQ inject end

locally {
def updateLocalLogStartOffset(offset: Long): Unit = {
_localLogStartOffset = offset
Expand Down Expand Up @@ -910,6 +914,11 @@ class UnifiedLog(@volatile var logStartOffset: Long,
// ProducerStateManager will not be updated and the last stable offset will not advance
// if the append to the transaction index fails.
localLog.append(appendInfo.lastOffset, validRecords)
// AutoMQ inject start
if (appendInfo.maxTimestamp >= 0 && appendInfo.lastOffset >= 0) {
_latestAppendState = Some(LatestAppendState(appendInfo.lastOffset, appendInfo.maxTimestamp))
}
// AutoMQ inject end
updateHighWatermarkWithLogEndOffset()

// update the producer state
Expand Down Expand Up @@ -1691,6 +1700,16 @@ class UnifiedLog(@volatile var logStartOffset: Long,
*/
def logEndOffsetMetadata: LogOffsetMetadata = localLog.logEndOffsetMetadata

/**
* The latest append-time sparse sample from the most recent successful append.
*
* maxTimestampOffset is sourced from LogAppendInfo.lastOffset.
* maxTimestamp is sourced from LogAppendInfo.maxTimestamp.
*/
// AutoMQ inject start
def latestAppendState: Option[LatestAppendState] = _latestAppendState
// AutoMQ inject end

/**
* Roll the log over to a new empty log segment if necessary.
* The segment will be rolled if one of the following conditions met:
Expand Down Expand Up @@ -1887,6 +1906,9 @@ class UnifiedLog(@volatile var logStartOffset: Long,
leaderEpochCache.foreach(_.truncateFromEndAsyncFlush(targetOffset))
logStartOffset = math.min(targetOffset, logStartOffset)
rebuildProducerState(targetOffset, producerStateManager)
// AutoMQ inject start
_latestAppendState = None
// AutoMQ inject end
if (highWatermark >= localLog.logEndOffset)
updateHighWatermark(localLog.logEndOffsetMetadata)
}
Expand All @@ -1913,6 +1935,9 @@ class UnifiedLog(@volatile var logStartOffset: Long,
logStartOffset = logStartOffsetOpt.getOrElse(newOffset)
if (remoteLogEnabled()) _localLogStartOffset = newOffset
rebuildProducerState(newOffset, producerStateManager)
// AutoMQ inject start
_latestAppendState = None
// AutoMQ inject end
updateHighWatermark(localLog.logEndOffsetMetadata)
}
}
Expand Down Expand Up @@ -2037,6 +2062,10 @@ class UnifiedLog(@volatile var logStartOffset: Long,
}

object UnifiedLog extends Logging {
Comment thread
Gezi-lzq marked this conversation as resolved.
// AutoMQ inject start
case class LatestAppendState(maxTimestampOffset: Long, maxTimestamp: Long)
// AutoMQ inject end

val LogFileSuffix: String = LogFileUtils.LOG_FILE_SUFFIX

val IndexFileSuffix: String = LogFileUtils.INDEX_FILE_SUFFIX
Expand Down
31 changes: 28 additions & 3 deletions core/src/main/scala/kafka/server/BrokerServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import kafka.log.streamaspect.ElasticLogManager
import kafka.network.{DataPlaneAcceptor, SocketServer}
import kafka.raft.KafkaRaftManager
import kafka.server.metadata.{AclPublisher, BrokerMetadataPublisher, ClientQuotaMetadataManager, DelegationTokenPublisher, DynamicClientQuotaPublisher, DynamicConfigPublisher, KRaftMetadataCache, ScramPublisher}
import kafka.server.streamaspect.{ElasticKafkaApis, ElasticReplicaManager, PartitionLifecycleListener}
import kafka.server.streamaspect.{ElasticKafkaApis, ElasticReplicaManager, FetchListener, PartitionLifecycleListener}
import kafka.utils.CoreUtils
import org.apache.kafka.clients.admin.ClusterEventPublisher
import org.apache.kafka.common.config.ConfigException
Expand Down Expand Up @@ -426,13 +426,25 @@ class BrokerServer(
// The FetchSessionCache is divided into config.numIoThreads shards, each responsible
// for Math.max(1, shardNum * sessionIdRange) <= sessionId < (shardNum + 1) * sessionIdRange
val sessionIdRange = Int.MaxValue / NumFetchSessionCacheShards
// AutoMQ inject start
val fetchListener = Option(newFetchListener()).getOrElse(FetchListener.NOOP)
val sessionRemovalListener = new FetchSessionCacheShard.SessionRemovalListener {
override def onRemove(sessionId: Int, partitions: FetchSession.CACHE_MAP): Unit = {
fetchListener.onSessionClosedBatch(sessionId, partitions)
}
}
// AutoMQ inject end
val fetchSessionCacheShards = (0 until NumFetchSessionCacheShards)
.map(shardNum => new FetchSessionCacheShard(
config.maxIncrementalFetchSessionCacheSlots / NumFetchSessionCacheShards,
KafkaServer.MIN_INCREMENTAL_FETCH_SESSION_EVICTION_MS,
sessionIdRange,
shardNum
shardNum,
// AutoMQ inject start
sessionRemovalListener
// AutoMQ inject end
))

val fetchManager = new FetchManager(Time.SYSTEM, new FetchSessionCache(fetchSessionCacheShards))

// Create the request processor objects.
Expand All @@ -458,6 +470,7 @@ class BrokerServer(
tokenManager = tokenManager,
apiVersionManager = apiVersionManager,
clientMetricsManager = Some(clientMetricsManager))
dataPlaneRequestProcessor.asInstanceOf[ElasticKafkaApis].setEnterpriseFacade(newEnterpriseBrokerFacade())

dataPlaneRequestHandlerPool = new KafkaRequestHandlerPool(config.nodeId,
socketServer.dataPlaneRequestChannel, dataPlaneRequestProcessor, time,
Expand Down Expand Up @@ -592,7 +605,9 @@ class BrokerServer(
}
ElasticLogManager.init(config, clusterId, this)
trafficInterceptor = newTrafficInterceptor()
dataPlaneRequestProcessor.asInstanceOf[ElasticKafkaApis].setTrafficInterceptor(trafficInterceptor)
val elasticKafkaApis = dataPlaneRequestProcessor.asInstanceOf[ElasticKafkaApis]
elasticKafkaApis.setTrafficInterceptor(trafficInterceptor)
elasticKafkaApis.setFetchListener(fetchListener)
replicaManager.setTrafficInterceptor(trafficInterceptor)
replicaManager.setS3StreamContext(com.automq.stream.Context.instance())

Expand Down Expand Up @@ -902,6 +917,16 @@ class BrokerServer(
trafficInterceptor
}

// AutoMQ inject start
protected def newFetchListener(): FetchListener = {
FetchListener.NOOP
}

protected def newEnterpriseBrokerFacade(): AnyRef = {
null
}
// AutoMQ inject end

protected def newPartitionLifecycleListeners(): util.List[PartitionLifecycleListener] = {
val list = new util.ArrayList[PartitionLifecycleListener]()
list.add(tableManager)
Expand Down
24 changes: 23 additions & 1 deletion core/src/main/scala/kafka/server/FetchSession.scala
Original file line number Diff line number Diff line change
Expand Up @@ -612,6 +612,13 @@ case class EvictableKey(privileged: Boolean, size: Int, id: Int) extends Compara
// AutoMQ inject end
}

// AutoMQ inject start
object FetchSessionCacheShard {
trait SessionRemovalListener {
def onRemove(sessionId: Int, partitions: FetchSession.CACHE_MAP): Unit
}
}
// AutoMQ inject end

/**
* Caches fetch sessions.
Expand All @@ -631,7 +638,11 @@ case class EvictableKey(privileged: Boolean, size: Int, id: Int) extends Compara
class FetchSessionCacheShard(private val maxEntries: Int,
private val evictionMs: Long,
val sessionIdRange: Int = Int.MaxValue,
private val shardNum: Int = 0) extends Logging {
private val shardNum: Int = 0,
// AutoMQ inject start
private val sessionRemovalListener: FetchSessionCacheShard.SessionRemovalListener = null
// AutoMQ inject end
) extends Logging {
Comment thread
Gezi-lzq marked this conversation as resolved.

this.logIdent = s"[Shard $shardNum] "

Expand Down Expand Up @@ -798,10 +809,21 @@ class FetchSessionCacheShard(private val maxEntries: Int,
val removeResult = sessions.remove(session.id)
if (removeResult.isDefined) {
numPartitions = numPartitions - session.cachedSize
// AutoMQ inject start
notifySessionRemoved(session.id, session.partitionMap)
// AutoMQ inject end
}
removeResult
}

// AutoMQ inject start
private def notifySessionRemoved(sessionId: Int, partitions: FetchSession.CACHE_MAP): Unit = {
if (sessionRemovalListener != null) {
sessionRemovalListener.onRemove(sessionId, partitions)
}
Comment thread
Gezi-lzq marked this conversation as resolved.
}
// AutoMQ inject end

/**
* Update a session's position in the lastUsed and evictable trees.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import org.apache.kafka.common.replica.ClientMetadata
import org.apache.kafka.common.replica.ClientMetadata.DefaultClientMetadata
import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
import org.apache.kafka.common.requests.s3.{AutomqGetPartitionSnapshotRequest, AutomqUpdateGroupRequest, AutomqUpdateGroupResponse, AutomqZoneRouterRequest}
import org.apache.kafka.common.requests.{AbstractResponse, DeleteTopicsRequest, DeleteTopicsResponse, FetchRequest, FetchResponse, ProduceRequest, ProduceResponse, RequestUtils}
import org.apache.kafka.common.requests.{AbstractResponse, DeleteTopicsRequest, DeleteTopicsResponse, FetchMetadata => JFetchMetadata, FetchRequest, FetchResponse, ProduceRequest, ProduceResponse, RequestUtils}
import org.apache.kafka.common.resource.Resource.CLUSTER_NAME
import org.apache.kafka.common.resource.ResourceType.{CLUSTER, TOPIC, TRANSACTIONAL_ID}
import org.apache.kafka.common.utils.Time
Expand Down Expand Up @@ -92,6 +92,7 @@ class ElasticKafkaApis(

private var trafficInterceptor: TrafficInterceptor = new NoopTrafficInterceptor(this, metadataCache)
private var snapshotAwaitReadySupplier: Supplier[CompletableFuture[Void]] = () => CompletableFuture.completedFuture(null)
@volatile private var enterpriseFacadeRef: AnyRef = null
private val brokerExtensionContext: BrokerExtensionContext = new ElasticBrokerExtensionContext
private val brokerExtensionHandleDispatcher: BrokerExtensionHandleDispatcher =
createBrokerExtensionHandleDispatcher(brokerExtensionContext)
Expand All @@ -100,6 +101,7 @@ class ElasticKafkaApis(
BrokerExtensionHandleDispatcher.load(context)

protected def isExtensionApi(apiKey: ApiKeys): Boolean = ApiKeys.isExtensionApi(apiKey)
@volatile private var fetchListener: FetchListener = FetchListener.NOOP

/**
* Generate a map of topic -> [(partitionId, epochId)] based on provided topicsRequestData.
Expand Down Expand Up @@ -639,13 +641,24 @@ class ElasticKafkaApis(
}
}

val requestSessionId = fetchRequest.metadata.sessionId
val requestFetchOffsets = interesting.map { case (tp, partitionData) =>
tp -> partitionData.fetchOffset
}.toMap

// the callback for process a fetch response, invoked before throttling
def processResponseCallback(responsePartitionData: Seq[(TopicIdPartition, FetchPartitionData)]): Unit = {
val partitions = new util.LinkedHashMap[TopicIdPartition, FetchResponseData.PartitionData]
val reassigningPartitions = mutable.Set[TopicIdPartition]()
val nodeEndpoints = new mutable.HashMap[Int, Node]
val clientIdMetadata = ClientIdMetadata.of(request.header.clientId(), request.context.clientAddress, request.context.connectionId)
responsePartitionData.foreach { case (tp, data) =>
notifyFetchListener(
tp,
if (requestSessionId == JFetchMetadata.INVALID_SESSION_ID) FetchListener.NONE_SESSION_ID else requestSessionId,
requestFetchOffsets.getOrElse(tp, -1L),
data.records
)
val abortedTransactions = data.abortedTransactions.orElse(null)
val lastStableOffset: Long = data.lastStableOffset.orElse(FetchResponse.INVALID_LAST_STABLE_OFFSET)
if (data.isReassignmentFetch) reassigningPartitions.add(tp)
Expand Down Expand Up @@ -885,6 +898,44 @@ class ElasticKafkaApis(
this.snapshotAwaitReadySupplier = supplier
}

def setFetchListener(fetchListener: FetchListener): Unit = {
this.fetchListener = if (fetchListener == null) FetchListener.NOOP else fetchListener
}
Comment thread
Gezi-lzq marked this conversation as resolved.

def setEnterpriseFacade(enterpriseFacade: AnyRef): Unit = {
this.enterpriseFacadeRef = enterpriseFacade
}

private[streamaspect] def enterpriseFacade(): AnyRef = {
enterpriseFacadeRef
}

private def notifyFetchListener(topicIdPartition: TopicIdPartition,
sessionId: Int,
fetchOffset: Long,
records: Records): Unit = {
if (fetchListener eq FetchListener.NOOP) return
val (reportedOffset, timestamp) = extractFetchOffsetAndTimestamp(fetchOffset, records)
fetchListener.onFetch(topicIdPartition, sessionId, reportedOffset, timestamp)
}

private def extractFetchOffsetAndTimestamp(defaultOffset: Long, records: Records): (Long, Long) = {
if (records == null) {
(defaultOffset, RecordBatch.NO_TIMESTAMP)
} else {
val batches = records.batches().iterator()
if (!batches.hasNext) {
(defaultOffset, RecordBatch.NO_TIMESTAMP)
} else {
var lastBatch = batches.next()
while (batches.hasNext) {
lastBatch = batches.next()
}
(lastBatch.lastOffset(), lastBatch.maxTimestamp())
}
}
}

private final class ElasticBrokerExtensionContext extends BrokerExtensionContext {
override def forwardToControllerOrFail(request: RequestChannel.Request): Unit =
ElasticKafkaApis.this.forwardToControllerOrFail(request)
Expand All @@ -906,6 +957,15 @@ class ElasticKafkaApis(

override def handleInvalidVersionsDuringForwarding(request: RequestChannel.Request): Unit =
ElasticKafkaApis.this.handleInvalidVersionsDuringForwarding(request)

override def getTopicName(topicId: Uuid): Optional[String] =
OptionConverters.toJava(metadataCache.getTopicName(topicId))

override def getPartition(topicPartition: TopicPartition): HostedPartition =
replicaManager.getPartition(topicPartition)

override def enterpriseFacade(): AnyRef =
ElasticKafkaApis.this.enterpriseFacade()
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Copyright 2026, AutoMQ HK Limited.
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package kafka.server.streamaspect;

import kafka.server.CachedPartition;

import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.requests.FetchMetadata;
import org.apache.kafka.common.utils.ImplicitLinkedHashCollection;

/**
* All callbacks are invoked on the request-handler I/O path; implementations must not block.
*/
public interface FetchListener {
int NONE_SESSION_ID = FetchMetadata.INVALID_SESSION_ID;
FetchListener NOOP = new FetchListener() {
@Override
public void onFetch(TopicIdPartition topicIdPartition, int sessionId, long fetchOffset, long timestamp) {
}

@Override
public void onSessionClosed(TopicIdPartition topicIdPartition, int sessionId) {
}
};

/**
* Reports one fetched partition result.
* <p>
* For now, {@code fetchOffset} and {@code timestamp} are derived from the last batch in returned records:
* {@code lastBatch.lastOffset()} and {@code lastBatch.maxTimestamp()}.
*/
void onFetch(TopicIdPartition topicIdPartition, int sessionId, long fetchOffset, long timestamp);

void onSessionClosed(TopicIdPartition topicIdPartition, int sessionId);

default void onSessionClosedBatch(int sessionId, ImplicitLinkedHashCollection<CachedPartition> partitions) {
for (CachedPartition partition : partitions) {
onSessionClosed(
new TopicIdPartition(
partition.topicId(),
new TopicPartition(partition.topic(), partition.partition())),
sessionId);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,12 @@
package kafka.server.streamaspect.extension

import kafka.network.RequestChannel
import kafka.server.RequestLocal
import kafka.server.{HostedPartition, RequestLocal}

import org.apache.kafka.common.{TopicPartition, Uuid}
import org.apache.kafka.common.requests.AbstractResponse

import java.util.Optional
import java.util.ServiceLoader

import scala.jdk.CollectionConverters.IteratorHasAsScala
Expand Down Expand Up @@ -60,6 +62,12 @@ trait BrokerExtensionContext {
def handleError(request: RequestChannel.Request, t: Throwable): Unit

def handleInvalidVersionsDuringForwarding(request: RequestChannel.Request): Unit

def getTopicName(topicId: Uuid): Optional[String]

def getPartition(topicPartition: TopicPartition): HostedPartition

def enterpriseFacade(): AnyRef
}

trait BrokerExtensionHandleDispatcher {
Expand Down
Loading
Loading