Skip to content

Commit f3a06cc

Browse files
authored
Subscription: unsubscribe completed topics when remove consumer config (#15660) (#15696)
1 parent a34add4 commit f3a06cc

File tree

1 file changed

+28
-0
lines changed

1 file changed

+28
-0
lines changed

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiverV1.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,7 @@
8484
import java.nio.ByteBuffer;
8585
import java.util.Collections;
8686
import java.util.HashMap;
87+
import java.util.HashSet;
8788
import java.util.List;
8889
import java.util.Map;
8990
import java.util.Objects;
@@ -160,7 +161,12 @@ public void handleExit() {
160161
LOGGER.info(
161162
"Subscription: remove consumer config {} when handling exit",
162163
consumerConfigThreadLocal.get());
164+
// we should not close the consumer here because it might reuse the previous consumption
165+
// progress to continue consuming
163166
// closeConsumer(consumerConfig);
167+
// when handling exit, unsubscribe from topics that have already been completed as much as
168+
// possible to release some resources (such as the underlying pipe) in a timely manner
169+
unsubscribeCompleteTopics(consumerConfig);
164170
consumerConfigThreadLocal.remove();
165171
}
166172
}
@@ -309,6 +315,8 @@ private TPipeSubscribeResp handlePipeSubscribeHeartbeatInternal(
309315
// fetch topics should be unsubscribed
310316
final List<String> topicNamesToUnsubscribe =
311317
SubscriptionAgent.broker().fetchTopicNamesToUnsubscribe(consumerConfig, topics.keySet());
318+
// here we did not immediately unsubscribe from topics in order to allow the client to perceive
319+
// completed topics
312320

313321
return PipeSubscribeHeartbeatResp.toTPipeSubscribeResp(
314322
RpcUtils.SUCCESS_STATUS, topics, endPoints, topicNamesToUnsubscribe);
@@ -682,6 +690,26 @@ private void closeConsumer(final ConsumerConfig consumerConfig) {
682690
LOGGER.info("Subscription: consumer {} close successfully", consumerConfig);
683691
}
684692

693+
private void unsubscribeCompleteTopics(final ConsumerConfig consumerConfig) {
694+
// fetch subscribed topics
695+
final Map<String, TopicConfig> topics =
696+
SubscriptionAgent.topic()
697+
.getTopicConfigs(
698+
SubscriptionAgent.consumer()
699+
.getTopicNamesSubscribedByConsumer(
700+
consumerConfig.getConsumerGroupId(), consumerConfig.getConsumerId()));
701+
702+
// fetch topics should be unsubscribed
703+
final List<String> topicNamesToUnsubscribe =
704+
SubscriptionAgent.broker().fetchTopicNamesToUnsubscribe(consumerConfig, topics.keySet());
705+
706+
unsubscribe(consumerConfig, new HashSet<>(topicNamesToUnsubscribe));
707+
LOGGER.info(
708+
"Subscription: consumer {} unsubscribe {} (completed topics) successfully",
709+
consumerConfig,
710+
topicNamesToUnsubscribe);
711+
}
712+
685713
//////////////////////////// consumer operations ////////////////////////////
686714

687715
private void createConsumer(final ConsumerConfig consumerConfig) throws SubscriptionException {

0 commit comments

Comments
 (0)