Skip to content

Commit 56e9088

Browse files
committed
refactor: find broker address by message queue
1 parent 249482a commit 56e9088

6 files changed

+46
-44
lines changed

src/MQAdminImpl.cpp

+5-20
Original file line numberDiff line numberDiff line change
@@ -45,11 +45,7 @@ void MQAdminImpl::fetchSubscribeMessageQueues(const std::string& topic, std::vec
4545
}
4646

4747
int64_t MQAdminImpl::searchOffset(const MessageQueue& mq, int64_t timestamp) {
48-
std::string brokerAddr = client_instance_->FindBrokerAddressInPublish(mq.broker_name());
49-
if (brokerAddr.empty()) {
50-
client_instance_->updateTopicRouteInfoFromNameServer(mq.topic());
51-
brokerAddr = client_instance_->FindBrokerAddressInPublish(mq.broker_name());
52-
}
48+
auto brokerAddr = client_instance_->FindBrokerAddressInPublish(mq);
5349

5450
if (!brokerAddr.empty()) {
5551
try {
@@ -59,15 +55,12 @@ int64_t MQAdminImpl::searchOffset(const MessageQueue& mq, int64_t timestamp) {
5955
THROW_MQEXCEPTION(MQClientException, "Invoke Broker exception", -1);
6056
}
6157
}
58+
6259
THROW_MQEXCEPTION(MQClientException, "The broker is not exist", -1);
6360
}
6461

6562
int64_t MQAdminImpl::maxOffset(const MessageQueue& mq) {
66-
std::string brokerAddr = client_instance_->FindBrokerAddressInPublish(mq.broker_name());
67-
if (brokerAddr.empty()) {
68-
client_instance_->updateTopicRouteInfoFromNameServer(mq.topic());
69-
brokerAddr = client_instance_->FindBrokerAddressInPublish(mq.broker_name());
70-
}
63+
auto brokerAddr = client_instance_->FindBrokerAddressInPublish(mq);
7164

7265
if (!brokerAddr.empty()) {
7366
try {
@@ -80,11 +73,7 @@ int64_t MQAdminImpl::maxOffset(const MessageQueue& mq) {
8073
}
8174

8275
int64_t MQAdminImpl::minOffset(const MessageQueue& mq) {
83-
std::string brokerAddr = client_instance_->FindBrokerAddressInPublish(mq.broker_name());
84-
if (brokerAddr.empty()) {
85-
client_instance_->updateTopicRouteInfoFromNameServer(mq.topic());
86-
brokerAddr = client_instance_->FindBrokerAddressInPublish(mq.broker_name());
87-
}
76+
auto brokerAddr = client_instance_->FindBrokerAddressInPublish(mq);
8877

8978
if (!brokerAddr.empty()) {
9079
try {
@@ -98,11 +87,7 @@ int64_t MQAdminImpl::minOffset(const MessageQueue& mq) {
9887
}
9988

10089
int64_t MQAdminImpl::earliestMsgStoreTime(const MessageQueue& mq) {
101-
std::string brokerAddr = client_instance_->FindBrokerAddressInPublish(mq.broker_name());
102-
if (brokerAddr.empty()) {
103-
client_instance_->updateTopicRouteInfoFromNameServer(mq.topic());
104-
brokerAddr = client_instance_->FindBrokerAddressInPublish(mq.broker_name());
105-
}
90+
auto brokerAddr = client_instance_->FindBrokerAddressInPublish(mq);
10691

10792
if (!brokerAddr.empty()) {
10893
try {

src/MQClientInstance.cpp

+30
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
#include <utility> // std::move
2121

2222
#include "ClientRemotingProcessor.h"
23+
#include "FindBrokerResult.hpp"
2324
#include "Logging.h"
2425
#include "MQAdminImpl.h"
2526
#include "MQClientAPIImpl.h"
@@ -630,6 +631,35 @@ FindBrokerResult MQClientInstance::FindBrokerAddressInSubscribe(const std::strin
630631
return {};
631632
}
632633

634+
FindBrokerResult MQClientInstance::FindBrokerAddressInAdmin(const MessageQueue& message_queue) {
635+
auto find_broker_result = FindBrokerAddressInAdmin(message_queue.broker_name());
636+
if (!find_broker_result) {
637+
updateTopicRouteInfoFromNameServer(message_queue.topic());
638+
find_broker_result = FindBrokerAddressInAdmin(message_queue.broker_name());
639+
}
640+
return find_broker_result;
641+
}
642+
643+
std::string MQClientInstance::FindBrokerAddressInPublish(const MessageQueue& message_queue) {
644+
auto broker_address = FindBrokerAddressInPublish(message_queue.broker_name());
645+
if (broker_address.empty()) {
646+
updateTopicRouteInfoFromNameServer(message_queue.topic());
647+
broker_address = FindBrokerAddressInPublish(message_queue.broker_name());
648+
}
649+
return broker_address;
650+
}
651+
652+
FindBrokerResult MQClientInstance::FindBrokerAddressInSubscribe(const MessageQueue& message_queue,
653+
int broker_id,
654+
bool only_this_broker) {
655+
auto find_broker_result = FindBrokerAddressInSubscribe(message_queue.broker_name(), broker_id, only_this_broker);
656+
if (!find_broker_result) {
657+
updateTopicRouteInfoFromNameServer(message_queue.topic());
658+
find_broker_result = FindBrokerAddressInSubscribe(message_queue.broker_name(), broker_id, only_this_broker);
659+
}
660+
return find_broker_result;
661+
}
662+
633663
void MQClientInstance::findConsumerIds(const std::string& topic,
634664
const std::string& group,
635665
std::vector<std::string>& cids) {

src/MQClientInstance.h

+6
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,12 @@ class MQClientInstance {
8383
std::string FindBrokerAddressInPublish(const std::string& broker_name);
8484
FindBrokerResult FindBrokerAddressInSubscribe(const std::string& broker_name, int broker_id, bool only_this_broker);
8585

86+
FindBrokerResult FindBrokerAddressInAdmin(const MessageQueue& message_queue);
87+
std::string FindBrokerAddressInPublish(const MessageQueue& message_queue);
88+
FindBrokerResult FindBrokerAddressInSubscribe(const MessageQueue& message_queue,
89+
int broker_id,
90+
bool only_this_broker);
91+
8692
void findConsumerIds(const std::string& topic, const std::string& group, std::vector<std::string>& cids);
8793

8894
std::string findBrokerAddrByTopic(const std::string& topic);

src/consumer/PullAPIWrapper.cpp

+2-7
Original file line numberDiff line numberDiff line change
@@ -46,13 +46,8 @@ std::unique_ptr<PullResultExt> PullAPIWrapper::PullKernelImpl(const MessageQueue
4646
int timeout_millis,
4747
CommunicationMode communication_mode,
4848
PullCallback pull_callback) {
49-
auto find_broker_result = client_instance_->FindBrokerAddressInSubscribe(
50-
message_queue.broker_name(), RecalculatePullFromWhichNode(message_queue), false);
51-
if (!find_broker_result) {
52-
client_instance_->updateTopicRouteInfoFromNameServer(message_queue.topic());
53-
find_broker_result = client_instance_->FindBrokerAddressInSubscribe(
54-
message_queue.broker_name(), RecalculatePullFromWhichNode(message_queue), false);
55-
}
49+
auto find_broker_result =
50+
client_instance_->FindBrokerAddressInSubscribe(message_queue, RecalculatePullFromWhichNode(message_queue), false);
5651

5752
if (find_broker_result) {
5853
if (find_broker_result.slave) {

src/consumer/RemoteBrokerOffsetStore.cpp

+2-12
Original file line numberDiff line numberDiff line change
@@ -148,12 +148,7 @@ void RemoteBrokerOffsetStore::removeOffset(const MessageQueue& mq) {
148148
}
149149

150150
void RemoteBrokerOffsetStore::updateConsumeOffsetToBroker(const MessageQueue& mq, int64_t offset) {
151-
auto findBrokerResult = client_instance_->FindBrokerAddressInAdmin(mq.broker_name());
152-
153-
if (!findBrokerResult) {
154-
client_instance_->updateTopicRouteInfoFromNameServer(mq.topic());
155-
findBrokerResult = client_instance_->FindBrokerAddressInAdmin(mq.broker_name());
156-
}
151+
auto findBrokerResult = client_instance_->FindBrokerAddressInAdmin(mq);
157152

158153
if (findBrokerResult) {
159154
try {
@@ -168,12 +163,7 @@ void RemoteBrokerOffsetStore::updateConsumeOffsetToBroker(const MessageQueue& mq
168163
}
169164

170165
int64_t RemoteBrokerOffsetStore::fetchConsumeOffsetFromBroker(const MessageQueue& mq) {
171-
auto findBrokerResult = client_instance_->FindBrokerAddressInAdmin(mq.broker_name());
172-
173-
if (!findBrokerResult) {
174-
client_instance_->updateTopicRouteInfoFromNameServer(mq.topic());
175-
findBrokerResult = client_instance_->FindBrokerAddressInAdmin(mq.broker_name());
176-
}
166+
auto findBrokerResult = client_instance_->FindBrokerAddressInAdmin(mq);
177167

178168
if (findBrokerResult) {
179169
return client_instance_->getMQClientAPIImpl()->QueryConsumerOffset(findBrokerResult.broker_addr, group_name_,

src/producer/DefaultMQProducerImpl.cpp

+1-5
Original file line numberDiff line numberDiff line change
@@ -508,11 +508,7 @@ std::unique_ptr<SendResult> DefaultMQProducerImpl::SendKernelImpl(const MessageP
508508
int64_t timeout) {
509509
int64_t begin_time = UtilAll::currentTimeMillis();
510510

511-
std::string broker_addr = client_instance_->FindBrokerAddressInPublish(message_queue.broker_name());
512-
if (broker_addr.empty()) {
513-
client_instance_->tryToFindTopicPublishInfo(message_queue.topic());
514-
broker_addr = client_instance_->FindBrokerAddressInPublish(message_queue.broker_name());
515-
}
511+
auto broker_addr = client_instance_->FindBrokerAddressInPublish(message_queue);
516512

517513
if (!broker_addr.empty()) {
518514
try {

0 commit comments

Comments
 (0)