Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -824,6 +824,10 @@ public void subscribe(SubscriptionPattern pattern) {
/**
* Unsubscribe from topics currently subscribed with {@link #subscribe(Collection)} or {@link #subscribe(Pattern)}.
* This also clears any partitions directly assigned through {@link #assign(Collection)}.
* <p>
* <b>Note:</b> Unlike {@link #close()}, this method does not guarantee that pending offsets are committed before
* unsubscribing, even if {@code enable.auto.commit} is enabled. To avoid duplicate processing upon re-joining,
* it is recommended to explicitly call {@link #commitSync()} before invoking this method.
*
* @throws org.apache.kafka.common.KafkaException for any other unrecoverable errors (e.g. rebalance callback errors)
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1669,6 +1669,68 @@ public void testSubscriptionChangesWithAutoCommitDisabled(GroupProtocol groupPro
client.requests().clear();
}

/**
* Verify that unsubscribe() does not commit offsets even when auto-commit is enabled.
* This ensures users are aware that they need to explicitly call commitSync() before
* unsubscribing to avoid duplicate processing upon re-joining the group.
*/
@ParameterizedTest
@EnumSource(value = GroupProtocol.class, names = "CLASSIC")
@SuppressWarnings("unchecked")
public void testUnsubscribeDoesNotCommitOffsetsEvenWithAutoCommitEnabled(GroupProtocol groupProtocol) {
ConsumerMetadata metadata = createMetadata(subscription);
MockClient client = new MockClient(time, metadata);

Map<String, Integer> tpCounts = new HashMap<>();
tpCounts.put(topic, 1);
initMetadata(client, tpCounts);
Node node = metadata.fetch().nodes().get(0);

ConsumerPartitionAssignor assignor = new RangeAssignor();

// Create consumer with auto-commit enabled
consumer = newConsumer(groupProtocol, time, client, subscription, metadata, assignor, true, groupInstanceId);

initializeSubscriptionWithSingleTopic(consumer, getConsumerRebalanceListener(consumer));

// Mock rebalance responses
prepareRebalance(client, node, assignor, List.of(tp0), null);

consumer.updateAssignmentMetadataIfNeeded(time.timer(Long.MAX_VALUE));
consumer.poll(Duration.ZERO);

// Verify that subscription are set up correctly
assertEquals(Set.of(topic), consumer.subscription());

// Mock a fetch response so that we have consumed some data
Map<TopicPartition, FetchInfo> fetches = new HashMap<>();
fetches.put(tp0, new FetchInfo(0, 10));
client.respondFrom(fetchResponse(fetches), node);
client.poll(0, time.milliseconds());

ConsumerRecords<String, String> records = (ConsumerRecords<String, String>) consumer.poll(Duration.ofMillis(1));
assertEquals(10, records.count());
assertEquals(10L, consumer.position(tp0));

// Clear previous requests to focus on unsubscribe behavior
client.requests().clear();

// Call unsubscribe - this should NOT commit offsets even though auto-commit is enabled
consumer.unsubscribe();

// Verify that subscription and assignment are both cleared
assertEquals(Collections.emptySet(), consumer.subscription());
assertEquals(Collections.emptySet(), consumer.assignment());

// Verify that no offset commit request was sent despite auto-commit being enabled
for (ClientRequest req : client.requests()) {
assertNotSame(ApiKeys.OFFSET_COMMIT, req.requestBuilder().apiKey(),
"unsubscribe() should not commit offsets even when auto-commit is enabled");
}

client.requests().clear();
}

// TODO: this test requires rebalance logic which is not yet implemented in the CONSUMER group protocol.
// Once it is implemented, this should use both group protocols.
@ParameterizedTest
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1814,6 +1814,38 @@ public void testUnsubscribeWithoutGroupId() {
verify(applicationEventHandler).add(ArgumentMatchers.isA(UnsubscribeEvent.class));
}

/**
* Verify that unsubscribe() does not commit offsets even when auto-commit is enabled.
* This ensures users are aware that they need to explicitly call commitSync() before
* unsubscribing to avoid duplicate processing upon re-joining the group.
*/
@Test
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you consider adding a integration tests instead?

public void testUnsubscribeDoesNotCommitOffsetsEvenWithAutoCommitEnabled() {
Properties props = requiredConsumerConfigAndGroupId("test-group");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);

consumer = newConsumer(props);

// Subscribe to a topic
completeTopicSubscriptionChangeEventSuccessfully();
consumer.subscribe(singleton("topic"));

// Clear any previous invocations to focus on unsubscribe behavior
clearInvocations(applicationEventHandler);

// Call unsubscribe - this should NOT commit offsets even though auto-commit is enabled
completeUnsubscribeApplicationEventSuccessfully();
consumer.unsubscribe();

// Verify that UnsubscribeEvent was sent
verify(applicationEventHandler).add(ArgumentMatchers.isA(UnsubscribeEvent.class));

// Verify that no commit event (sync or async) was sent despite auto-commit being enabled
verify(applicationEventHandler, never()).add(ArgumentMatchers.isA(SyncCommitEvent.class));
verify(applicationEventHandler, never()).add(ArgumentMatchers.isA(AsyncCommitEvent.class));
verify(applicationEventHandler, never()).add(ArgumentMatchers.isA(CommitOnCloseEvent.class));
}

@Test
public void testSeekToBeginning() {
Collection<TopicPartition> topics = Collections.singleton(new TopicPartition("test", 0));
Expand Down