Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,13 @@ public Integer call() throws Exception {
GetNodesResult rst = admin.getNodes(Collections.emptyList(), new GetNodesOptions());
List<NodeMetadata> nodeMetadataList = rst.nodes().get();
nodeMetadataList.sort(Comparator.comparingInt(NodeMetadata::getNodeId));
System.out.println("Nodes:");
for (NodeMetadata nodeMetadata : nodeMetadataList) {
System.out.println(nodeMetadata);
System.out.println(" " + nodeMetadata);
}
GetNodesResult.RouterChannelEpoch epoch = rst.routerChannelEpoch().get();
System.out.println("RouterChannelEpoch:");
System.out.println(" " + epoch);
}
return 0;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,75 @@
import java.util.List;

public class GetNodesResult {
private final KafkaFuture<List<NodeMetadata>> future;
private final KafkaFuture<Response> future;

public GetNodesResult(KafkaFuture<List<NodeMetadata>> future) {
public GetNodesResult(KafkaFuture<Response> future) {
this.future = future;
}

public KafkaFuture<List<NodeMetadata>> nodes() {
return future;
return future.thenApply(Response::nodes);
}

public KafkaFuture<RouterChannelEpoch> routerChannelEpoch() {
return future.thenApply(Response::routerChannelEpoch);
}

static class Response {
private final List<NodeMetadata> nodes;
private final RouterChannelEpoch routerChannelEpoch;

Response(List<NodeMetadata> nodes, RouterChannelEpoch routerChannelEpoch) {
this.nodes = nodes;
this.routerChannelEpoch = routerChannelEpoch;
}

List<NodeMetadata> nodes() {
return nodes;
}

RouterChannelEpoch routerChannelEpoch() {
return routerChannelEpoch;
}
}

public static class RouterChannelEpoch {
private final long committed;
private final long fenced;
private final long current;
private final long lastBumpUpTimestamp;

public RouterChannelEpoch(long committed, long fenced, long current, long lastBumpUpTimestamp) {
this.committed = committed;
this.fenced = fenced;
this.current = current;
this.lastBumpUpTimestamp = lastBumpUpTimestamp;
}

public long getCommitted() {
return committed;
}

public long getFenced() {
return fenced;
}

public long getCurrent() {
return current;
}

public long getLastBumpUpTimestamp() {
return lastBumpUpTimestamp;
}

@Override
public String toString() {
return "RouterChannelEpoch{" +
"committed=" + committed +
", fenced=" + fenced +
", current=" + current +
", lastBumpUpTimestamp=" + lastBumpUpTimestamp +
'}';
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@
import org.apache.kafka.common.message.ApiVersionsResponseData.FinalizedFeatureKey;
import org.apache.kafka.common.message.ApiVersionsResponseData.SupportedFeatureKey;
import org.apache.kafka.common.message.AutomqGetNodesRequestData;
import org.apache.kafka.common.message.AutomqGetNodesResponseData;
import org.apache.kafka.common.message.CreateAclsRequestData;
import org.apache.kafka.common.message.CreateAclsRequestData.AclCreation;
import org.apache.kafka.common.message.CreateAclsResponseData.AclCreationResult;
Expand Down Expand Up @@ -4963,13 +4964,21 @@ void handleFailure(Throwable throwable) {

@Override
public GetNodesResult getNodes(Collection<Integer> nodeIdList, GetNodesOptions options) {
Comment thread
allenzhu marked this conversation as resolved.
final KafkaFutureImpl<List<NodeMetadata>> future = new KafkaFutureImpl<>();
final KafkaFutureImpl<GetNodesResult.Response> future = new KafkaFutureImpl<>();
final long now = time.milliseconds();
final Call call = new Call(
"getNodes", calcDeadlineMs(now, options.timeoutMs()), new LeastLoadedBrokerOrActiveKController()) {

private List<NodeMetadata> createResult(final AutomqGetNodesResponse response) {
return response.data().nodes().stream().map(NodeMetadata::new).collect(Collectors.toList());
private GetNodesResult.Response createResult(final AutomqGetNodesResponse response) {
List<NodeMetadata> nodes = response.data().nodes().stream().map(NodeMetadata::new).collect(Collectors.toList());
AutomqGetNodesResponseData.RouterChannelEpoch epochData = response.data().routerChannelEpoch();
GetNodesResult.RouterChannelEpoch routerChannelEpoch = new GetNodesResult.RouterChannelEpoch(
epochData.committed(),
epochData.fenced(),
epochData.current(),
epochData.lastBumpUpTimestamp()
);
return new GetNodesResult.Response(nodes, routerChannelEpoch);
}

@Override
Expand All @@ -4989,7 +4998,7 @@ void handleResponse(AbstractResponse response) {

@Override
void handleFailure(Throwable throwable) {
completeAllExceptionally(Collections.singletonList(future), throwable);
future.completeExceptionally(throwable);
}
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,20 @@
"about": "The tag value."
}
]}
]}
]},
{
"name": "RouterChannelEpoch",
"type": "RouterChannelEpoch",
"versions": "0+",
"taggedVersions": "0+",
Comment thread
allenzhu marked this conversation as resolved.
"tag": 0,
"about": "The cluster-level router channel epoch",
"fields": [
{ "name": "Committed", "type": "int64", "versions": "0+", "about": "The committed epoch" },
{ "name": "Fenced", "type": "int64", "versions": "0+", "about": "The fenced epoch" },
{ "name": "Current", "type": "int64", "versions": "0+", "about": "The current epoch" },
{ "name": "LastBumpUpTimestamp", "type": "int64", "versions": "0+", "about": "The last bump up timestamp in ms" }
Comment thread
allenzhu marked this conversation as resolved.
]
}
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@
import org.apache.kafka.controller.stream.DefaultNodeRuntimeInfoManager;
import org.apache.kafka.controller.stream.KVControlManager;
import org.apache.kafka.controller.stream.NodeControlManager;
import org.apache.kafka.controller.stream.RouterChannelEpoch;
import org.apache.kafka.controller.stream.RouterChannelEpochControlManager;
import org.apache.kafka.controller.stream.S3ObjectControlManager;
import org.apache.kafka.controller.stream.StreamClient;
Expand Down Expand Up @@ -2817,7 +2818,19 @@ public CompletableFuture<AutomqRegisterNodeResponseData> registerNode(Controller

@Override
public CompletableFuture<AutomqGetNodesResponseData> getNodes(ControllerRequestContext context, AutomqGetNodesRequest req) {
return appendWriteEvent("getNodes", context.deadlineNs(), () -> nodeControlManager.getMetadata(req));
return appendWriteEvent("getNodes", context.deadlineNs(), () -> {
ControllerResult<AutomqGetNodesResponseData> result = nodeControlManager.getMetadata(req);
AutomqGetNodesResponseData resp = result.response();
RouterChannelEpoch epoch = routerChannelEpochControlManager.getRouterChannelEpoch();
resp.setRouterChannelEpoch(
new AutomqGetNodesResponseData.RouterChannelEpoch()
.setCommitted(epoch.getCommitted())
.setFenced(epoch.getFenced())
.setCurrent(epoch.getCurrent())
.setLastBumpUpTimestamp(epoch.getLastBumpUpTimestamp())
);
return result;
});
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@ public RouterChannelEpochControlManager(SnapshotRegistry registry, QuorumControl
scheduler.scheduleWithFixedDelay(this::run, 1, 1, TimeUnit.SECONDS);
}

public RouterChannelEpoch getRouterChannelEpoch() {
return routerChannelEpoch.get();
}

Comment thread
allenzhu marked this conversation as resolved.
private void run() {
if (!quorumController.isActive()) {
return;
Expand Down
Loading