From 9adfafd2eded08b09305d1c4f0cc5853b08bcc06 Mon Sep 17 00:00:00 2001 From: Valentin Troadec Date: Mon, 20 Nov 2023 20:36:28 +0100 Subject: [PATCH] fix(RecordRepository): fix copy infinite loop when record added during copy --- .../akhq/repositories/RecordRepository.java | 32 ++++++++++++++++- .../repositories/RecordRepositoryTest.java | 34 +++++++++++++++++++ 2 files changed, 65 insertions(+), 1 deletion(-) diff --git a/src/main/java/org/akhq/repositories/RecordRepository.java b/src/main/java/org/akhq/repositories/RecordRepository.java index f8833cd73..bd95b301a 100644 --- a/src/main/java/org/akhq/repositories/RecordRepository.java +++ b/src/main/java/org/akhq/repositories/RecordRepository.java @@ -1035,12 +1035,17 @@ public CopyResult copy(Topic fromTopic, String toClusterId, Topic toTopic, List< ); } + Map partitionsLastOffsetMap = fromTopic.getPartitions() + .stream() + .collect(Collectors.toMap(Function.identity(), Partition::getLastOffset)); + boolean samePartition = toTopic.getPartitions().size() == fromTopic.getPartitions().size(); KafkaProducer producer = kafkaModule.getProducer(toClusterId); ConsumerRecords records; do { - records = this.poll(consumer); + records = this.pollAndFilter(consumer, partitionsLastOffsetMap); + for (ConsumerRecord record : records) { System.out.println(record.offset() + "-" + record.partition()); @@ -1064,6 +1069,31 @@ public CopyResult copy(Topic fromTopic, String toClusterId, Topic toTopic, List< return new CopyResult(counter); } + /** + * Polls the records and filters them with a maximum offset + * + * @param consumer + * @param partitionsLastOffsetMap key : partition, value : the maximum offset we want to reach + * @return filtered records after polled. And an empty one if there are no records polled + * or if every record has been filtered + */ + private ConsumerRecords pollAndFilter(KafkaConsumer consumer, Map partitionsLastOffsetMap) { + ConsumerRecords records = this.poll(consumer); + return new ConsumerRecords<>(partitionsLastOffsetMap.entrySet() + .stream() + .map(entry -> + { + // We filter records by partition + TopicPartition topicPartition = new TopicPartition(entry.getKey().getTopic(), entry.getKey().getId()); + return Map.entry(topicPartition, records.records(topicPartition) + .stream() + .filter(consumerRecord -> consumerRecord.offset() < entry.getValue()) + .collect(Collectors.toList())); + } + ).filter(entry -> !entry.getValue().isEmpty()) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))); + } + @ToString @EqualsAndHashCode @AllArgsConstructor diff --git a/src/test/java/org/akhq/repositories/RecordRepositoryTest.java b/src/test/java/org/akhq/repositories/RecordRepositoryTest.java index 8e4d37c0f..def92d0d2 100644 --- a/src/test/java/org/akhq/repositories/RecordRepositoryTest.java +++ b/src/test/java/org/akhq/repositories/RecordRepositoryTest.java @@ -6,6 +6,7 @@ import lombok.extern.slf4j.Slf4j; import org.akhq.AbstractTest; import org.akhq.KafkaTestCluster; +import org.akhq.controllers.TopicController; import org.akhq.models.Record; import org.akhq.models.Schema; import org.akhq.models.Topic; @@ -23,6 +24,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.containsString; @@ -322,4 +324,36 @@ private int searchAll(RecordRepository.Options options) throws ExecutionExceptio return size.get(); } + + @Test + void copy() throws ExecutionException, InterruptedException, RestClientException, IOException { + + RecordRepository.Options optionsFromAndTo = new RecordRepository.Options(environment, KafkaTestCluster.CLUSTER_ID, KafkaTestCluster.TOPIC_RANDOM); + + Topic topicFromAndTo = topicRepository.findByName(optionsFromAndTo.getClusterId(), optionsFromAndTo.getTopic()); + + List offsets = topicFromAndTo.getPartitions() + .stream() + .map(partition -> new TopicController.OffsetCopy(partition.getId(), partition.getLastOffset())) + .collect(Collectors.toList()); + + // We simulate the case a record has been added after the method copy has been used + this.repository.produce( + KafkaTestCluster.CLUSTER_ID, + KafkaTestCluster.TOPIC_RANDOM, + Optional.of("value"), + Collections.emptyList(), + Optional.of("key"), + Optional.empty(), + Optional.empty(), + Optional.empty(), + Optional.empty() + ); + + RecordRepository.CopyResult copyResult = this.repository.copy(topicFromAndTo, KafkaTestCluster.CLUSTER_ID, topicFromAndTo, offsets, optionsFromAndTo); + + log.info("Copied " + copyResult.records + " records"); + + assertEquals(300, copyResult.records); + } }