Skip to content

Commit

Permalink
fix(RecordRepository): fix copy infinite loop when record added durin…
Browse files Browse the repository at this point in the history
…g copy
  • Loading branch information
valTroadec committed Nov 12, 2023
1 parent 8214a11 commit 5451c82
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 1 deletion.
32 changes: 31 additions & 1 deletion src/main/java/org/akhq/repositories/RecordRepository.java
Original file line number Diff line number Diff line change
Expand Up @@ -1035,12 +1035,17 @@ public CopyResult copy(Topic fromTopic, String toClusterId, Topic toTopic, List<
);
}

Map<Partition, Long> partitionsLastOffsetMap = fromTopic.getPartitions()
.stream()
.collect(Collectors.toMap(Function.identity(), Partition::getLastOffset));

boolean samePartition = toTopic.getPartitions().size() == fromTopic.getPartitions().size();

KafkaProducer<byte[], byte[]> producer = kafkaModule.getProducer(toClusterId);
ConsumerRecords<byte[], byte[]> records;
do {
records = this.poll(consumer);
records = this.pollAndFilter(consumer, partitionsLastOffsetMap);

for (ConsumerRecord<byte[], byte[]> record : records) {
System.out.println(record.offset() + "-" + record.partition());

Expand All @@ -1064,6 +1069,31 @@ public CopyResult copy(Topic fromTopic, String toClusterId, Topic toTopic, List<
return new CopyResult(counter);
}

/**
* Polls the records and filters them with a maximum offset
*
* @param consumer
* @param partitionsLastOffsetMap key : partition, value : the maximum offset we want to reach
* @return filtered records after polled. And an empty one if there are no records polled
* or if every record has been filtered
*/
private ConsumerRecords<byte[], byte[]> pollAndFilter(KafkaConsumer<byte[], byte[]> consumer, Map<Partition, Long> partitionsLastOffsetMap) {
ConsumerRecords<byte[], byte[]> records = this.poll(consumer);
return new ConsumerRecords<>(partitionsLastOffsetMap.entrySet()
.stream()
.map(entry ->
{
// We filter records by partition
TopicPartition topicPartition = new TopicPartition(entry.getKey().getTopic(), entry.getKey().getId());
return Map.entry(topicPartition, records.records(topicPartition)
.stream()
.filter(consumerRecord -> consumerRecord.offset() < entry.getValue())
.collect(Collectors.toList()));
}
).filter(entry -> !entry.getValue().isEmpty())
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)));
}

@ToString
@EqualsAndHashCode
@AllArgsConstructor
Expand Down
12 changes: 12 additions & 0 deletions src/test/java/org/akhq/KafkaTestCluster.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ public class KafkaTestCluster implements Runnable {
public static final String CLUSTER_ID = "test";

public static final String TOPIC_RANDOM = "random";
public static final String TOPIC_COPY = "copy";
public static final String TOPIC_COPY_DLT = "copy.dlt";
public static final String TOPIC_TOBE_EMPTIED = "emptied";
public static final String TOPIC_COMPACTED = "compacted";
public static final String TOPIC_EMPTY = "empty";
Expand Down Expand Up @@ -279,6 +281,16 @@ private void injectTestData() throws InterruptedException, ExecutionException {
}
log.debug("Random topic created");

// random data
testUtils.createTopic(TOPIC_COPY_DLT, 3, (short) 1);
for (int partition = 0; partition < 3; partition++) {
testUtils.produceRecords(randomDatas(50, 0), TOPIC_COPY_DLT, partition);
}
log.debug("DLT copy created");

testUtils.createTopic(TOPIC_COPY, 3, (short) 1);
log.debug("Copy created");

// random data to be emptied
testUtils.createTopic(TOPIC_TOBE_EMPTIED, 3, (short) 1);
for (int partition = 0; partition < 3; partition++) {
Expand Down
41 changes: 41 additions & 0 deletions src/test/java/org/akhq/repositories/RecordRepositoryTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import lombok.extern.slf4j.Slf4j;
import org.akhq.AbstractTest;
import org.akhq.KafkaTestCluster;
import org.akhq.controllers.TopicController;
import org.akhq.models.Record;
import org.akhq.models.Schema;
import org.akhq.models.Topic;
Expand All @@ -23,6 +24,7 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
Expand Down Expand Up @@ -322,4 +324,43 @@ private int searchAll(RecordRepository.Options options) throws ExecutionExceptio

return size.get();
}

@Test
void copy() throws ExecutionException, InterruptedException, RestClientException, IOException {

RecordRepository.Options optionsFrom = new RecordRepository.Options(environment, KafkaTestCluster.CLUSTER_ID, KafkaTestCluster.TOPIC_COPY_DLT);
RecordRepository.Options optionsTo = new RecordRepository.Options(environment, KafkaTestCluster.CLUSTER_ID, KafkaTestCluster.TOPIC_COPY);
optionsTo.setSize(500);

Topic topicFrom = topicRepository.findByName(optionsFrom.getClusterId(), optionsFrom.getTopic());
String toClusterId = KafkaTestCluster.CLUSTER_ID;
Topic topicTo = topicRepository.findByName(optionsTo.getClusterId(), optionsTo.getTopic());

List<TopicController.OffsetCopy> offsets = topicTo.getPartitions()
.stream()
.map(partition -> new TopicController.OffsetCopy(partition.getId(), partition.getLastOffset()))
.collect(Collectors.toList());

// We simulate the case a record has been added after the method copy has been used
this.repository.produce(
KafkaTestCluster.CLUSTER_ID,
KafkaTestCluster.TOPIC_COPY_DLT,
Optional.of("value"),
Collections.emptyList(),
Optional.of("key"),
Optional.empty(),
Optional.empty(),
Optional.empty(),
Optional.empty()
);

RecordRepository.CopyResult copyResult = this.repository.copy(topicFrom, toClusterId, topicTo, offsets, optionsFrom);

log.info("Copied " + copyResult.records + " records");

assertEquals(150, copyResult.records);

//We empty the topic to set it to his initialisation state
this.repository.emptyTopic(KafkaTestCluster.CLUSTER_ID, KafkaTestCluster.TOPIC_COPY);
}
}

0 comments on commit 5451c82

Please sign in to comment.