-
Notifications
You must be signed in to change notification settings - Fork 725
feat(core): add fetch listener callbacks and async inter-broker requests #3248
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
Show all changes
22 commits
Select commit
Hold shift + click to select a range
31611cd
feat(server-common): add inter-broker async sender API and implementa…
Gezi-lzq 464825f
feat(core): add fetch listener hooks for fetch and session close
Gezi-lzq 540a055
feat(core): dispatch fetch listener callbacks asynchronously
Gezi-lzq 9e4e14b
chore: mark AutoMQ inject blocks for fetch listener
Gezi-lzq 404439a
feat(extension): add methods to retrieve topic name and partition in …
Gezi-lzq 26bd0e1
refactor: use AsyncFetchListener to avoid blocking in synchronized se…
Gezi-lzq c512b22
refactor(lag): sample leo index from unified-log latest append state
Gezi-lzq 346926c
fix: resolve PR review issues
Gezi-lzq 8b7c8cd
refactor: use bounded queue for fetch listener executor
Gezi-lzq bf71388
fix: add closed guard to AsyncSender and awaitTermination on shutdown
Gezi-lzq 032dc42
feat(enterprise): add broker extension facade hook for enterprise loo…
Gezi-lzq 9e52d9f
refactor: simplify fetch listener dispatch and skip NOOP overhead
Gezi-lzq e81f120
refactor: use AsyncFetchListener.onSessionClosedBatch in BrokerServer
Gezi-lzq feecfef
refactor(enterprise): keep enterprise offset timestamp protocol out o…
Gezi-lzq bc47a2f
feat: implement AsyncFetchListener for asynchronous fetch handling
Gezi-lzq ac4cf94
refactor: remove AsyncFetchListener and its associated test cases
Gezi-lzq c2f6b8f
refactor: remove unused fetchListenerExecutor and related code from B…
Gezi-lzq 2533d55
refactor: remove unused ExecutorService import from BrokerServer
Gezi-lzq f212bdd
refactor: reorder import statements in FetchListener.java
Gezi-lzq bf5f33d
feat(core): add non-blocking contract Javadoc to FetchListener
Gezi-lzq 5f14d70
refactor: update FetchListener to use TopicIdPartition instead of Top…
Gezi-lzq d3a40d4
feat(consumer-lag): enhance request handling during close to wait for…
Gezi-lzq File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
63 changes: 63 additions & 0 deletions
63
core/src/main/scala/kafka/server/streamaspect/FetchListener.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,63 @@ | ||
| /* | ||
| * Copyright 2026, AutoMQ HK Limited. | ||
| * | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package kafka.server.streamaspect; | ||
|
|
||
| import kafka.server.CachedPartition; | ||
|
|
||
| import org.apache.kafka.common.TopicIdPartition; | ||
| import org.apache.kafka.common.TopicPartition; | ||
| import org.apache.kafka.common.requests.FetchMetadata; | ||
| import org.apache.kafka.common.utils.ImplicitLinkedHashCollection; | ||
|
|
||
| /** | ||
| * All callbacks are invoked on the request-handler I/O path; implementations must not block. | ||
| */ | ||
| public interface FetchListener { | ||
| int NONE_SESSION_ID = FetchMetadata.INVALID_SESSION_ID; | ||
| FetchListener NOOP = new FetchListener() { | ||
| @Override | ||
| public void onFetch(TopicIdPartition topicIdPartition, int sessionId, long fetchOffset, long timestamp) { | ||
| } | ||
|
|
||
| @Override | ||
| public void onSessionClosed(TopicIdPartition topicIdPartition, int sessionId) { | ||
| } | ||
| }; | ||
|
|
||
| /** | ||
| * Reports one fetched partition result. | ||
| * <p> | ||
| * For now, {@code fetchOffset} and {@code timestamp} are derived from the last batch in returned records: | ||
| * {@code lastBatch.lastOffset()} and {@code lastBatch.maxTimestamp()}. | ||
| */ | ||
| void onFetch(TopicIdPartition topicIdPartition, int sessionId, long fetchOffset, long timestamp); | ||
|
|
||
| void onSessionClosed(TopicIdPartition topicIdPartition, int sessionId); | ||
|
|
||
| default void onSessionClosedBatch(int sessionId, ImplicitLinkedHashCollection<CachedPartition> partitions) { | ||
| for (CachedPartition partition : partitions) { | ||
| onSessionClosed( | ||
| new TopicIdPartition( | ||
| partition.topicId(), | ||
| new TopicPartition(partition.topic(), partition.partition())), | ||
| sessionId); | ||
| } | ||
| } | ||
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.