Skip to content

Commit

Permalink
feat(topic): give the true first offset on compacted topics (#1817)
Browse files Browse the repository at this point in the history
  • Loading branch information
AlexisSouquiere authored Jun 27, 2024
1 parent ef8e8f4 commit 0ad541c
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 17 deletions.
11 changes: 9 additions & 2 deletions src/main/java/org/akhq/modules/AbstractKafkaWrapper.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import org.apache.kafka.clients.admin.*;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.acl.AclBinding;
Expand Down Expand Up @@ -165,8 +166,14 @@ public Map<String, List<Partition.Offsets>> describeTopicsOffsets(String cluster
)
.collect(Collectors.toList());

Map<TopicPartition, Long> startOffsetsToSearch = collect.stream().map(p ->
new AbstractMap.SimpleEntry<>(p, 0L))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));

KafkaConsumer<byte[], byte[]> consumer = kafkaModule.getConsumer(clusterId);
Map<TopicPartition, Long> begins = consumer.beginningOffsets(collect);
// KAFKA-7556: beginningOffsets always return 0. Use offsetsForTimes instead
Map<TopicPartition, OffsetAndTimestamp> begins = consumer.offsetsForTimes(startOffsetsToSearch);

Map<TopicPartition, Long> ends = consumer.endOffsets(collect);
consumer.close();

Expand All @@ -177,7 +184,7 @@ public Map<String, List<Partition.Offsets>> describeTopicsOffsets(String cluster
begin ->
new Partition.Offsets(
begin.getKey().partition(),
begin.getValue(),
begin.getValue() != null ? begin.getValue().offset() : ends.get(begin.getKey()),
ends.get(begin.getKey())
),
toList()
Expand Down
18 changes: 4 additions & 14 deletions src/test/java/org/akhq/controllers/TopicControllerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -117,20 +117,10 @@ void updateConfigApi() {
@Test
@Order(1)
void dataApi() {
ResultNextList<Record> result;
String after = TOPIC_URL + "/data";

int count = 0;
while (after != null) {
result = this.retrieveNextList(HttpRequest.GET(after), Record.class);
assertEquals(300, result.getSize());
if (result.getResults() != null) {
count = count + result.getResults().size();
}
after = result.getAfter();
}

assertEquals(153, count);
ResultNextList<Record> result = this.retrieveNextList(HttpRequest.GET(TOPIC_URL + "/data"), Record.class);
// 50 messages with the same key on each partition (1 remaining after compaction) : 3 messages
// 50 random messages on each partition : 150 messages
assertEquals(153, result.getSize());
}

@Test
Expand Down
4 changes: 3 additions & 1 deletion src/test/java/org/akhq/repositories/TopicRepositoryTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,9 @@ void offset() throws ExecutionException, InterruptedException {
.findFirst();

assertTrue(compacted.isPresent());
assertEquals(0, compacted.get().getFirstOffset());
// 50 messages with the same key (1 remaining after compaction) : 1st offset is 49
assertEquals(49, compacted.get().getFirstOffset());
// 50 messages with the same key + 50 random messages : last offset is 100
assertEquals(100, compacted.get().getLastOffset());
}

Expand Down

0 comments on commit 0ad541c

Please sign in to comment.