Skip to content

Commit 249482a

Browse files
committed
refactor: functions about consumer_table
1 parent f49dac1 commit 249482a

5 files changed

+25
-64
lines changed

src/MQClientInstance.cpp

+16-50
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,7 @@ void MQClientInstance::start() {
124124
}
125125

126126
void MQClientInstance::shutdown() {
127-
if (getConsumerTableSize() != 0) {
127+
if (MapAccessor::Size(consumer_table_, consumer_table_mutex_) > 0) {
128128
return;
129129
}
130130

@@ -382,9 +382,12 @@ bool MQClientInstance::updateTopicRouteInfoFromNameServer(const std::string& top
382382
topic_publish_info_table_mutex_);
383383

384384
// update subscribe info
385-
if (getConsumerTableSize() > 0) {
385+
std::lock_guard<std::mutex> lock(consumer_table_mutex_);
386+
if (!consumer_table_.empty()) {
386387
std::vector<MessageQueue> subscribeInfo = MakeTopicSubscribeInfo(topic, *topicRouteData);
387-
updateConsumerTopicSubscribeInfo(topic, subscribeInfo);
388+
for (auto& it : consumer_table_) {
389+
it.second->updateTopicSubscribeInfo(topic, subscribeInfo);
390+
}
388391
}
389392

390393
MapAccessor::InsertOrAssign(topic_route_table_, topic, topicRouteData, topic_route_table_mutex_);
@@ -444,12 +447,12 @@ TopicRouteDataPtr MQClientInstance::GetTopicRouteData(const std::string& topic)
444447
return MapAccessor::GetOrDefault(topic_route_table_, topic, nullptr, topic_route_table_mutex_);
445448
}
446449

447-
bool MQClientInstance::registerConsumer(const std::string& group, MQConsumerInner* consumer) {
450+
bool MQClientInstance::RegisterConsumer(const std::string& group, MQConsumerInner* consumer) {
448451
if (group.empty()) {
449452
return false;
450453
}
451454

452-
if (!addConsumerToTable(group, consumer)) {
455+
if (!MapAccessor::Insert(consumer_table_, group, consumer, consumer_table_mutex_)) {
453456
LOG_WARN_NEW("the consumer group[{}] exist already.", group);
454457
return false;
455458
}
@@ -458,8 +461,8 @@ bool MQClientInstance::registerConsumer(const std::string& group, MQConsumerInne
458461
return true;
459462
}
460463

461-
void MQClientInstance::unregisterConsumer(const std::string& group) {
462-
eraseConsumerFromTable(group);
464+
void MQClientInstance::UnregisterConsumer(const std::string& group) {
465+
MapAccessor::Erase(consumer_table_, group, consumer_table_mutex_);
463466
unregisterClientWithLock(null, group);
464467
}
465468

@@ -521,8 +524,8 @@ void MQClientInstance::rebalanceImmediately() {
521524

522525
void MQClientInstance::doRebalance() {
523526
LOG_INFO_NEW("the client instance:{} start doRebalance", client_id_);
524-
if (getConsumerTableSize() > 0) {
525-
std::lock_guard<std::mutex> lock(consumer_table_mutex_);
527+
std::lock_guard<std::mutex> lock(consumer_table_mutex_);
528+
if (!consumer_table_.empty()) {
526529
for (auto& it : consumer_table_) {
527530
it.second->doRebalance();
528531
}
@@ -548,37 +551,8 @@ MQProducerInner* MQClientInstance::SelectProducer(const std::string& producer_gr
548551
return MapAccessor::GetOrDefault(producer_table_, producer_group, nullptr, producer_table_mutex_);
549552
}
550553

551-
MQConsumerInner* MQClientInstance::selectConsumer(const std::string& group) {
552-
std::lock_guard<std::mutex> lock(consumer_table_mutex_);
553-
const auto& it = consumer_table_.find(group);
554-
if (it != consumer_table_.end()) {
555-
return it->second;
556-
}
557-
return nullptr;
558-
}
559-
560-
bool MQClientInstance::addConsumerToTable(const std::string& consumerName, MQConsumerInner* consumer) {
561-
std::lock_guard<std::mutex> lock(consumer_table_mutex_);
562-
if (consumer_table_.find(consumerName) != consumer_table_.end()) {
563-
return false;
564-
}
565-
consumer_table_[consumerName] = consumer;
566-
return true;
567-
}
568-
569-
void MQClientInstance::eraseConsumerFromTable(const std::string& consumerName) {
570-
std::lock_guard<std::mutex> lock(consumer_table_mutex_);
571-
const auto& it = consumer_table_.find(consumerName);
572-
if (it != consumer_table_.end()) {
573-
consumer_table_.erase(it); // do not need free consumer, as it was allocated by user
574-
} else {
575-
LOG_WARN_NEW("could not find consumer:{} from table", consumerName);
576-
}
577-
}
578-
579-
int MQClientInstance::getConsumerTableSize() {
580-
std::lock_guard<std::mutex> lock(consumer_table_mutex_);
581-
return consumer_table_.size();
554+
MQConsumerInner* MQClientInstance::SelectConsumer(const std::string& consumer_group) {
555+
return MapAccessor::GetOrDefault(consumer_table_, consumer_group, nullptr, consumer_table_mutex_);
582556
}
583557

584558
void MQClientInstance::getTopicListFromConsumerSubscription(std::set<std::string>& topicList) {
@@ -591,14 +565,6 @@ void MQClientInstance::getTopicListFromConsumerSubscription(std::set<std::string
591565
}
592566
}
593567

594-
void MQClientInstance::updateConsumerTopicSubscribeInfo(const std::string& topic,
595-
std::vector<MessageQueue> subscribeInfo) {
596-
std::lock_guard<std::mutex> lock(consumer_table_mutex_);
597-
for (auto& it : consumer_table_) {
598-
it.second->updateTopicSubscribeInfo(topic, subscribeInfo);
599-
}
600-
}
601-
602568
TopicPublishInfoPtr MQClientInstance::tryToFindTopicPublishInfo(const std::string& topic) {
603569
auto topicPublishInfo =
604570
MapAccessor::GetOrDefault(topic_publish_info_table_, topic, nullptr, topic_publish_info_table_mutex_);
@@ -749,7 +715,7 @@ void MQClientInstance::resetOffset(const std::string& group,
749715
const std::map<MessageQueue, int64_t>& offsetTable) {
750716
DefaultMQPushConsumerImpl* consumer = nullptr;
751717
try {
752-
auto* impl = selectConsumer(group);
718+
auto* impl = SelectConsumer(group);
753719
if (impl != nullptr && std::type_index(typeid(*impl)) == std::type_index(typeid(DefaultMQPushConsumerImpl))) {
754720
consumer = static_cast<DefaultMQPushConsumerImpl*>(impl);
755721
} else {
@@ -792,7 +758,7 @@ void MQClientInstance::resetOffset(const std::string& group,
792758
}
793759

794760
std::unique_ptr<ConsumerRunningInfo> MQClientInstance::consumerRunningInfo(const std::string& consumerGroup) {
795-
auto* consumer = selectConsumer(consumerGroup);
761+
auto* consumer = SelectConsumer(consumerGroup);
796762
if (consumer != nullptr) {
797763
std::unique_ptr<ConsumerRunningInfo> runningInfo(consumer->consumerRunningInfo());
798764
if (runningInfo != nullptr) {

src/MQClientInstance.h

+4-9
Original file line numberDiff line numberDiff line change
@@ -65,8 +65,8 @@ class MQClientInstance {
6565
bool RegisterProducer(const std::string& group, MQProducerInner* producer);
6666
void UnregisterProducer(const std::string& group);
6767

68-
bool registerConsumer(const std::string& group, MQConsumerInner* consumer);
69-
void unregisterConsumer(const std::string& group);
68+
bool RegisterConsumer(const std::string& group, MQConsumerInner* consumer);
69+
void UnregisterConsumer(const std::string& group);
7070

7171
void updateTopicRouteInfoFromNameServer();
7272
bool updateTopicRouteInfoFromNameServer(const std::string& topic, bool isDefault = false);
@@ -77,7 +77,7 @@ class MQClientInstance {
7777
void doRebalance();
7878

7979
MQProducerInner* SelectProducer(const std::string& producer_group);
80-
MQConsumerInner* selectConsumer(const std::string& group);
80+
MQConsumerInner* SelectConsumer(const std::string& consumer_group);
8181

8282
FindBrokerResult FindBrokerAddressInAdmin(const std::string& broker_name);
8383
std::string FindBrokerAddressInPublish(const std::string& broker_name);
@@ -129,11 +129,7 @@ class MQClientInstance {
129129
void doRebalanceByConsumerGroup(const std::string& consumerGroup);
130130

131131
// consumer related operation
132-
bool addConsumerToTable(const std::string& consumerName, MQConsumerInner* consumer);
133-
void eraseConsumerFromTable(const std::string& consumerName);
134-
int getConsumerTableSize();
135132
void getTopicListFromConsumerSubscription(std::set<std::string>& topicList);
136-
void updateConsumerTopicSubscribeInfo(const std::string& topic, std::vector<MessageQueue> subscribeInfo);
137133

138134
private:
139135
std::string client_id_;
@@ -144,8 +140,7 @@ class MQClientInstance {
144140
std::mutex producer_table_mutex_;
145141

146142
// group -> MQConsumer
147-
using MQCMAP = std::map<std::string, MQConsumerInner*>;
148-
MQCMAP consumer_table_;
143+
std::map<std::string, MQConsumerInner*> consumer_table_;
149144
std::mutex consumer_table_mutex_;
150145

151146
// Topic -> TopicRouteData

src/consumer/DefaultLitePullConsumerImpl.cpp

+2-2
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,7 @@ void DefaultLitePullConsumerImpl::start() {
149149
scheduled_executor_service_.startup();
150150

151151
// register consumer
152-
bool registerOK = client_instance_->registerConsumer(client_config_->group_name(), this);
152+
bool registerOK = client_instance_->RegisterConsumer(client_config_->group_name(), this);
153153
if (!registerOK) {
154154
service_state_ = ServiceState::kCreateJust;
155155
THROW_MQEXCEPTION(MQClientException,
@@ -291,7 +291,7 @@ void DefaultLitePullConsumerImpl::shutdown() {
291291
break;
292292
case ServiceState::kRunning:
293293
persistConsumerOffset();
294-
client_instance_->unregisterConsumer(client_config_->group_name());
294+
client_instance_->UnregisterConsumer(client_config_->group_name());
295295
scheduled_executor_service_.shutdown();
296296
client_instance_->shutdown();
297297
rebalance_impl_->shutdown();

src/consumer/DefaultMQPushConsumerImpl.cpp

+2-2
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,7 @@ void DefaultMQPushConsumerImpl::start() {
134134
consume_service_->start();
135135

136136
// register consumer
137-
bool registerOK = client_instance_->registerConsumer(client_config_->group_name(), this);
137+
bool registerOK = client_instance_->RegisterConsumer(client_config_->group_name(), this);
138138
if (!registerOK) {
139139
service_state_ = ServiceState::kCreateJust;
140140
consume_service_->shutdown();
@@ -233,7 +233,7 @@ void DefaultMQPushConsumerImpl::shutdown() {
233233
case ServiceState::kRunning: {
234234
consume_service_->shutdown();
235235
persistConsumerOffset();
236-
client_instance_->unregisterConsumer(client_config_->group_name());
236+
client_instance_->UnregisterConsumer(client_config_->group_name());
237237
client_instance_->shutdown();
238238
rebalance_impl_->shutdown();
239239
service_state_ = ServiceState::kShutdownAlready;

src/consumer/PullMessageService.hpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ class PullMessageService {
6969

7070
private:
7171
void pullMessage(PullRequestPtr pull_request) {
72-
MQConsumerInner* consumer = client_instance_->selectConsumer(pull_request->consumer_group());
72+
MQConsumerInner* consumer = client_instance_->SelectConsumer(pull_request->consumer_group());
7373
if (consumer != nullptr) {
7474
consumer->pullMessage(std::move(pull_request));
7575
} else {

0 commit comments

Comments
 (0)