Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand Down Expand Up @@ -160,7 +161,12 @@ public void handleExit() {
LOGGER.info(
"Subscription: remove consumer config {} when handling exit",
consumerConfigThreadLocal.get());
// we should not close the consumer here because it might reuse the previous consumption
// progress to continue consuming
// closeConsumer(consumerConfig);
// when handling exit, unsubscribe from topics that have already been completed as much as
// possible to release some resources (such as the underlying pipe) in a timely manner
unsubscribeCompleteTopics(consumerConfig);
consumerConfigThreadLocal.remove();
}
}
Expand Down Expand Up @@ -309,6 +315,8 @@ private TPipeSubscribeResp handlePipeSubscribeHeartbeatInternal(
// fetch topics should be unsubscribed
final List<String> topicNamesToUnsubscribe =
SubscriptionAgent.broker().fetchTopicNamesToUnsubscribe(consumerConfig, topics.keySet());
// here we did not immediately unsubscribe from topics in order to allow the client to perceive
// completed topics

return PipeSubscribeHeartbeatResp.toTPipeSubscribeResp(
RpcUtils.SUCCESS_STATUS, topics, endPoints, topicNamesToUnsubscribe);
Expand Down Expand Up @@ -682,6 +690,26 @@ private void closeConsumer(final ConsumerConfig consumerConfig) {
LOGGER.info("Subscription: consumer {} close successfully", consumerConfig);
}

private void unsubscribeCompleteTopics(final ConsumerConfig consumerConfig) {
// fetch subscribed topics
final Map<String, TopicConfig> topics =
SubscriptionAgent.topic()
.getTopicConfigs(
SubscriptionAgent.consumer()
.getTopicNamesSubscribedByConsumer(
consumerConfig.getConsumerGroupId(), consumerConfig.getConsumerId()));

// fetch topics should be unsubscribed
final List<String> topicNamesToUnsubscribe =
SubscriptionAgent.broker().fetchTopicNamesToUnsubscribe(consumerConfig, topics.keySet());

unsubscribe(consumerConfig, new HashSet<>(topicNamesToUnsubscribe));
LOGGER.info(
"Subscription: consumer {} unsubscribe {} (completed topics) successfully",
consumerConfig,
topicNamesToUnsubscribe);
}

//////////////////////////// consumer operations ////////////////////////////

private void createConsumer(final ConsumerConfig consumerConfig) throws SubscriptionException {
Expand Down
Loading