Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;

import java.util.Collection;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

Expand All @@ -44,12 +45,18 @@ boolean undoCommitAhead(CommittableBatch committableBatch) {
boolean undoRequired = false;
for (Map.Entry<TopicPartition, Long> entry : committedOffsets.entrySet()) {
TopicPartition topicPartition = entry.getKey();
long offsetToCommit = dispatchedOffsets.get(entry.getKey()) + 1;
long offsetToCommit = dispatchedOffsets.get(entry.getKey());
if (entry.getValue() > offsetToCommit) {
committableBatch.updateOffset(topicPartition, offsetToCommit);
undoRequired = true;
}
}
return undoRequired;
}

// The consumer shouldn't commit offsets for partitions that aren't assigned to it
// as this might break processing guarantees for other consumers.
void partitionsRevoked(Collection<TopicPartition> partitions) {
partitions.forEach(committedOffsets::remove);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,9 @@ public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
ConsumerEventLoop.this.onPartitionsRevoked(partitions);
ConsumerEventLoop.this.pollEvent.commitBatch.partitionsRevoked(partitions);
if (ConsumerEventLoop.this.atmostOnceOffsets != null) {
ConsumerEventLoop.this.atmostOnceOffsets.partitionsRevoked(partitions);
}
}
})
.accept(consumer);
Expand Down
1 change: 1 addition & 0 deletions src/test/java/reactor/kafka/mock/MockConsumer.java
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ public void subscribe(Collection<String> topics, ConsumerRebalanceListener callb
}
}
assignmentPending = true;
super.subscribe(topics, callback);
} finally {
release();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.regex.Pattern;
Expand Down Expand Up @@ -368,7 +369,7 @@ public void atmostOnce() {
* Tests {@link KafkaReceiver#receiveAtmostOnce()} with commit-ahead.
*/
@Test
public void atmostOnceCommitAheadSize() {
public void atmostOnceCommitAheadSize() throws InterruptedException {
int commitAhead = 5;
receiverOptions = receiverOptions
.atmostOnceCommitAheadSize(commitAhead)
Expand All @@ -377,31 +378,83 @@ public void atmostOnceCommitAheadSize() {
Map<TopicPartition, Long> consumedOffsets = new HashMap<>();
Flux<ConsumerRecord<Integer, String>> inboundFlux = new DefaultKafkaReceiver<>(consumerFactory, receiverOptions)
.receiveAtmostOnce()
.filter(r -> {
long committed = cluster.committedOffset(groupId, topicPartition(r));
return committed >= r.offset() && committed <= r.offset() + commitAhead + 1;
})
.doOnNext(r -> consumedOffsets.put(new TopicPartition(r.topic(), r.partition()), r.offset()));
int consumeCount = 17;
StepVerifier.create(inboundFlux, consumeCount)
.recordWith(() -> receivedMessages)
.expectNextCount(consumeCount)
.expectRecordedMatches(ignored -> verifyCommittedAhead(consumedOffsets, commitAhead))
.thenCancel()
.verify(Duration.ofMillis(DEFAULT_TEST_TIMEOUT));
verifyMessages(consumeCount);
for (int i = 0; i < cluster.partitions(topic).size(); i++) {
TopicPartition topicPartition = new TopicPartition(topic, i);
waitForConsumerToClose();
verifyUndoCommitAhead(consumedOffsets);
}

private boolean verifyCommittedAhead(Map<TopicPartition, Long> consumedOffsets, int commitAhead) {
return cluster.partitions(topic).stream().allMatch(topicPartition -> {
long consumed = consumedOffsets.get(topicPartition);
long committedOffset = cluster.committedOffset(groupId, topicPartition);
return committedOffset > consumed && committedOffset <= consumed + commitAhead + 1;
});
}

private void verifyUndoCommitAhead(Map<TopicPartition, Long> consumedOffsets) {
cluster.partitions(topic).forEach(topicPartition -> {
long consumed = consumedOffsets.get(topicPartition);
consumerFactory.addConsumer(new MockConsumer(cluster));
receiverOptions = receiverOptions.assignment(Collections.singleton(topicPartition));
inboundFlux = new DefaultKafkaReceiver<>(consumerFactory, receiverOptions)
Flux<ConsumerRecord<Integer, String>> inboundFlux = new DefaultKafkaReceiver<>(consumerFactory, receiverOptions)
.receiveAtmostOnce();
StepVerifier.create(inboundFlux, 1)
.expectNextMatches(r -> r.offset() > consumed && r.offset() <= consumed + commitAhead + 1)
.expectNextMatches(r -> r.offset() == consumed + 1)
.thenCancel()
.verify(Duration.ofMillis(DEFAULT_TEST_TIMEOUT));
}
});
}

/**
* Tests if commit-ahead doesn't break offsets for partitions that have been revoked from the consumer.
*/
@Test
public void atmostOnceUndoCommitAhead() throws InterruptedException {
int commitAhead = 5;
List<TopicPartition> initialPartitions = new ArrayList<>(cluster.partitions(topic));
receiverOptions = receiverOptions
.atmostOnceCommitAheadSize(commitAhead)
.subscription(Collections.singleton(topic));
sendMessages(topic, 0, 2);
Flux<ConsumerRecord<Integer, String>> inboundFlux = new DefaultKafkaReceiver<>(consumerFactory, receiverOptions)
.receiveAtmostOnce();

int consumeCount = 2;
AtomicReference<ParitionWithOffset> partition = new AtomicReference<>();
StepVerifier.create(inboundFlux, consumeCount)
.recordWith(() -> receivedMessages)
.then(() -> consumer.rebalance(initialPartitions))
.expectNextCount(consumeCount)
.then(() -> revokeAndRegisterPartition(initialPartitions, partition))
.thenCancel()
.verify(Duration.ofMillis(DEFAULT_TEST_TIMEOUT));
verifyMessages(consumeCount);
waitForConsumerToClose();
verifyNoMoreCommitsOn(partition.get());
}

private void revokeAndRegisterPartition(
List<TopicPartition> initialPartitions,
AtomicReference<ParitionWithOffset> revokedPartitionRef
) {
List<TopicPartition> newPartitions = initialPartitions.subList(1, initialPartitions.size());
TopicPartition revokedPartition = initialPartitions.get(0);
long commitAheadOffset = cluster.committedOffset(consumer.groupMetadata().groupId(), revokedPartition);
revokedPartitionRef.set(new ParitionWithOffset(revokedPartition, commitAheadOffset));
consumer.rebalance(newPartitions);
}

private void verifyNoMoreCommitsOn(ParitionWithOffset revokedPartition) {
long lastCommittedOffset = cluster.committedOffset(consumer.groupMetadata().groupId(), revokedPartition.partition);
assertEquals("Consumer committed on partition that was revoked", lastCommittedOffset, revokedPartition.offset);
}

/**
Expand Down Expand Up @@ -1516,4 +1569,20 @@ private Disposable asyncReceive(CountDownLatch latch) {
private TopicPartition topicPartition(ConsumerRecord<?, ?> record) {
return new TopicPartition(record.topic(), record.partition());
}

private void waitForConsumerToClose() throws InterruptedException {
while (!consumer.closed()) {
Thread.sleep(100);
}
}

private static class ParitionWithOffset {
private final TopicPartition partition;
private final long offset;

public ParitionWithOffset(TopicPartition partition, long offset) {
this.partition = partition;
this.offset = offset;
}
}
}