Skip to content

Commit c3376e5

Browse files
authored
[transactions] Implement KIP-664 DescribeProducers (streamnative#78)
1 parent 1f2fe99 commit c3376e5

File tree

7 files changed

+420
-28
lines changed

7 files changed

+420
-28
lines changed

kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaCommandDecoder.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -320,6 +320,9 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
320320
case DESCRIBE_GROUPS:
321321
handleDescribeGroupRequest(kafkaHeaderAndRequest, responseFuture);
322322
break;
323+
case DESCRIBE_PRODUCERS:
324+
handleDescribeProducersRequest(kafkaHeaderAndRequest, responseFuture);
325+
break;
323326
case LIST_GROUPS:
324327
handleListGroupsRequest(kafkaHeaderAndRequest, responseFuture);
325328
break;
@@ -572,7 +575,12 @@ protected void handleError(KafkaHeaderAndRequest kafkaHeaderAndRequest,
572575
handleLeaveGroupRequest(KafkaHeaderAndRequest leaveGroup, CompletableFuture<AbstractResponse> response);
573576

574577
protected abstract void
575-
handleDescribeGroupRequest(KafkaHeaderAndRequest describeGroup, CompletableFuture<AbstractResponse> response);
578+
handleDescribeGroupRequest(KafkaHeaderAndRequest kafkaHeaderAndRequest,
579+
CompletableFuture<AbstractResponse> response);
580+
581+
protected abstract void
582+
handleDescribeProducersRequest(KafkaHeaderAndRequest kafkaHeaderAndRequest,
583+
CompletableFuture<AbstractResponse> response);
576584

577585
protected abstract void
578586
handleListGroupsRequest(KafkaHeaderAndRequest listGroups, CompletableFuture<AbstractResponse> response);

kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java

Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,7 @@
117117
import org.apache.kafka.common.message.DescribeClusterResponseData;
118118
import org.apache.kafka.common.message.DescribeConfigsRequestData;
119119
import org.apache.kafka.common.message.DescribeConfigsResponseData;
120+
import org.apache.kafka.common.message.DescribeProducersResponseData;
120121
import org.apache.kafka.common.message.DescribeTransactionsResponseData;
121122
import org.apache.kafka.common.message.EndTxnRequestData;
122123
import org.apache.kafka.common.message.EndTxnResponseData;
@@ -162,6 +163,8 @@
162163
import org.apache.kafka.common.requests.DescribeConfigsRequest;
163164
import org.apache.kafka.common.requests.DescribeConfigsResponse;
164165
import org.apache.kafka.common.requests.DescribeGroupsRequest;
166+
import org.apache.kafka.common.requests.DescribeProducersRequest;
167+
import org.apache.kafka.common.requests.DescribeProducersResponse;
165168
import org.apache.kafka.common.requests.DescribeTransactionsRequest;
166169
import org.apache.kafka.common.requests.DescribeTransactionsResponse;
167170
import org.apache.kafka.common.requests.EndTxnRequest;
@@ -2020,6 +2023,99 @@ protected void handleDescribeGroupRequest(KafkaHeaderAndRequest describeGroup,
20202023
));
20212024
}
20222025

2026+
@Override
2027+
protected void handleDescribeProducersRequest(KafkaHeaderAndRequest describeGroup,
2028+
CompletableFuture<AbstractResponse> responseFuture) {
2029+
// https://github.com/apache/kafka/blob/79c19da68d6a93a729a07dfdd37f238246653a46/
2030+
// core/src/main/scala/kafka/server/KafkaApis.scala#L3397
2031+
checkArgument(describeGroup.getRequest() instanceof DescribeProducersRequest);
2032+
DescribeProducersRequest request = (DescribeProducersRequest) describeGroup.getRequest();
2033+
Map<TopicPartition, DescribeProducersResponseData.PartitionResponse> allResponses = Maps.newConcurrentMap();
2034+
Map<TopicPartition, Errors> errors = Maps.newConcurrentMap();
2035+
String namespacePrefix = currentNamespacePrefix();
2036+
final int numPartitions = request.data().topics().stream()
2037+
.mapToInt(t->t.partitionIndexes().size())
2038+
.sum();
2039+
Runnable completeOne = () -> {
2040+
if (errors.size() + allResponses.size() != numPartitions) {
2041+
// not enough responses
2042+
return;
2043+
}
2044+
errors.forEach((topicPartition, tpErrors) -> {
2045+
DescribeProducersResponseData.PartitionResponse topicResponse =
2046+
new DescribeProducersResponseData.PartitionResponse()
2047+
.setPartitionIndex(topicPartition.partition())
2048+
.setErrorCode(tpErrors.code())
2049+
.setErrorMessage(tpErrors.message());
2050+
allResponses.put(topicPartition, topicResponse);
2051+
});
2052+
DescribeProducersResponseData response = new DescribeProducersResponseData();
2053+
allResponses
2054+
.entrySet()
2055+
.stream()
2056+
.collect(Collectors.groupingBy(
2057+
entry -> entry.getKey().topic(),
2058+
Collectors.mapping(
2059+
entry -> entry.getValue(),
2060+
Collectors.toList()
2061+
)
2062+
))
2063+
.forEach((topic, partitionResponses) -> {
2064+
DescribeProducersResponseData.TopicResponse topicResponse =
2065+
new DescribeProducersResponseData.TopicResponse()
2066+
.setName(topic)
2067+
.setPartitions(partitionResponses);
2068+
response.topics().add(topicResponse);
2069+
});
2070+
responseFuture.complete(new DescribeProducersResponse(response));
2071+
};
2072+
2073+
request.data().topics().forEach ((topicRequest) -> {
2074+
topicRequest.partitionIndexes().forEach(partition -> {
2075+
TopicPartition tp = new TopicPartition(topicRequest.name(), partition);
2076+
String fullPartitionName;
2077+
try {
2078+
fullPartitionName = KopTopic.toString(tp, namespacePrefix);
2079+
} catch (KoPTopicException e) {
2080+
log.warn("Invalid topic name: {}", tp.topic(), e);
2081+
errors.put(tp, Errors.UNKNOWN_TOPIC_OR_PARTITION);
2082+
completeOne.run();
2083+
return;
2084+
}
2085+
authorize(AclOperation.WRITE, Resource.of(ResourceType.TOPIC, fullPartitionName))
2086+
.whenComplete((isAuthorized, ex) -> {
2087+
if (ex != null) {
2088+
log.error("AddPartitionsToTxn topic authorize failed, topic - {}. {}",
2089+
fullPartitionName, ex.getMessage());
2090+
errors.put(tp, Errors.TOPIC_AUTHORIZATION_FAILED);
2091+
completeOne.run();
2092+
return;
2093+
}
2094+
if (!isAuthorized) {
2095+
errors.put(tp, Errors.TOPIC_AUTHORIZATION_FAILED);
2096+
completeOne.run();
2097+
return;
2098+
}
2099+
CompletableFuture<DescribeProducersResponseData.PartitionResponse> topicResponse =
2100+
replicaManager.activeProducerState(tp, namespacePrefix);
2101+
topicResponse.whenComplete((response, throwable) -> {
2102+
if (throwable != null) {
2103+
log.error("DescribeProducersRequest failed, topic - {}. {}",
2104+
fullPartitionName, throwable.getMessage());
2105+
errors.put(tp, Errors.UNKNOWN_TOPIC_OR_PARTITION);
2106+
} else {
2107+
allResponses.put(tp, response);
2108+
}
2109+
completeOne.run();
2110+
});
2111+
2112+
});
2113+
});
2114+
});
2115+
2116+
2117+
}
2118+
20232119
@Override
20242120
protected void handleListGroupsRequest(KafkaHeaderAndRequest listGroups,
20252121
CompletableFuture<AbstractResponse> resultFuture) {

kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/PartitionLog.java

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@
7878
import org.apache.kafka.common.errors.NotLeaderOrFollowerException;
7979
import org.apache.kafka.common.errors.RecordTooLargeException;
8080
import org.apache.kafka.common.errors.UnknownServerException;
81+
import org.apache.kafka.common.message.DescribeProducersResponseData;
8182
import org.apache.kafka.common.message.FetchRequestData;
8283
import org.apache.kafka.common.message.FetchResponseData;
8384
import org.apache.kafka.common.protocol.Errors;
@@ -153,7 +154,7 @@ public CompletableFuture<KeyValueSchemaIds> getSchemaIds(String topic, BytesSche
153154

154155
private volatile EntryFormatter entryFormatter;
155156

156-
private volatile AtomicBoolean unloaded = new AtomicBoolean();
157+
private final AtomicBoolean unloaded = new AtomicBoolean();
157158

158159
public PartitionLog(KafkaServiceConfiguration kafkaConfig,
159160
RequestStats requestStats,
@@ -1187,6 +1188,29 @@ public CompletableFuture<Long> forcePurgeAbortTx() {
11871188
});
11881189
}
11891190

1191+
public DescribeProducersResponseData.PartitionResponse activeProducerState() {
1192+
DescribeProducersResponseData.PartitionResponse producerState =
1193+
new DescribeProducersResponseData.PartitionResponse()
1194+
.setPartitionIndex(topicPartition.partition())
1195+
.setErrorCode(Errors.NONE.code())
1196+
.setActiveProducers(new ArrayList<>());
1197+
1198+
// this utility is only for monitoring, it is fine to access this structure directly from any thread
1199+
Map<Long, ProducerStateEntry> producers = producerStateManager.getProducers();
1200+
producers.values().forEach(producerStateEntry -> {
1201+
producerState.activeProducers().add(new DescribeProducersResponseData.ProducerState()
1202+
.setProducerId(producerStateEntry.producerId())
1203+
.setLastSequence(-1) // NOT HANDLED YET
1204+
.setProducerEpoch(producerStateEntry.producerEpoch() != null
1205+
? producerStateEntry.producerEpoch().intValue() : -1)
1206+
.setLastTimestamp(producerStateEntry.lastTimestamp() != null
1207+
? producerStateEntry.lastTimestamp().longValue() : -1)
1208+
.setCoordinatorEpoch(producerStateEntry.coordinatorEpoch())
1209+
.setCurrentTxnStartOffset(producerStateEntry.currentTxnFirstOffset().orElse(-1L)));
1210+
});
1211+
return producerState;
1212+
}
1213+
11901214

11911215
public CompletableFuture<Long> recoverTxEntries(
11921216
long offset,

kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/ProducerStateManager.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -389,4 +389,8 @@ public void handleMissingDataBeforeRecovery(long minOffset, long snapshotOffset)
389389
mapEndOffset = -1;
390390
}
391391
}
392+
393+
public Map<Long, ProducerStateEntry> getProducers() {
394+
return producers;
395+
}
392396
}

kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/ReplicaManager.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
import org.apache.kafka.common.TopicPartition;
4444
import org.apache.kafka.common.errors.InvalidTopicException;
4545
import org.apache.kafka.common.errors.NotLeaderOrFollowerException;
46+
import org.apache.kafka.common.message.DescribeProducersResponseData;
4647
import org.apache.kafka.common.message.FetchRequestData;
4748
import org.apache.kafka.common.protocol.Errors;
4849
import org.apache.kafka.common.record.MemoryRecords;
@@ -338,6 +339,14 @@ public CompletableFuture<?> updatePurgeAbortedTxnsOffsets() {
338339
return logManager.updatePurgeAbortedTxnsOffsets();
339340
}
340341

341-
342+
public CompletableFuture<DescribeProducersResponseData.PartitionResponse> activeProducerState(
343+
TopicPartition topicPartition,
344+
String namespacePrefix) {
345+
PartitionLog partitionLog = getPartitionLog(topicPartition, namespacePrefix);
346+
// https://github.com/apache/kafka/blob/5514f372b3e12db1df35b257068f6bb5083111c7/
347+
// core/src/main/scala/kafka/server/ReplicaManager.scala#L535
348+
return partitionLog.awaitInitialisation()
349+
.thenApply(log -> log.activeProducerState());
350+
}
342351

343352
}

0 commit comments

Comments
 (0)