Skip to content

Commit 6f2832a

Browse files
authored
Fetch topic route with name server list attached (#15)
1 parent a06a84d commit 6f2832a

File tree

7 files changed

+69
-12
lines changed

7 files changed

+69
-12
lines changed

example/rocketmq/ExampleProducer.cpp

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,15 +33,19 @@ int main(int argc, char* argv[]) {
3333
logger.init();
3434

3535
DefaultMQProducer producer("TestGroup");
36-
producer.setNamesrvAddr("11.165.223.199:9876");
36+
37+
const char* topic = "cpp_sdk_standard";
38+
const char* name_server = "47.98.116.189:80";
39+
40+
producer.setNamesrvAddr(name_server);
3741
producer.compressBodyThreshold(256);
38-
const char* arn = "MQ_INST_1973281269661160_BXmPlOA6";
39-
producer.setRegion("cn-hangzhou");
42+
const char* arn = "MQ_INST_1080056302921134_BXuIbML7";
43+
producer.setRegion("cn-hangzhou-pre");
4044
producer.arn(arn);
4145
producer.setCredentialsProvider(std::make_shared<ConfigFileCredentialsProvider>());
4246

4347
MQMessage message;
44-
message.setTopic("yc001");
48+
message.setTopic(topic);
4549
message.setTags("TagA");
4650
message.setKey("Yuck! Why-plural?");
4751

src/main/cpp/base/MQMessage.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ const std::string& MQMessage::getMsgId() const { return impl_->system_attribute_
5151
std::string MQMessage::getBornHost() const { return impl_->system_attribute_.born_host; }
5252

5353
std::chrono::system_clock::time_point MQMessage::deliveryTimestamp() const {
54-
absl::ToChronoTime(impl_->system_attribute_.delivery_timestamp);
54+
return absl::ToChronoTime(impl_->system_attribute_.delivery_timestamp);
5555
}
5656

5757
void MQMessage::setProperty(const std::string& name, const std::string& value) {

src/main/cpp/base/MixAll.cpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ const uint32_t MixAll::DEFAULT_CONSUME_MESSAGE_BATCH_SIZE = 1;
3030
const int32_t MixAll::DEFAULT_MAX_DELIVERY_ATTEMPTS = 16;
3131

3232
const RE2 MixAll::TOPIC_REGEX("[a-zA-Z0-9\\-_]{3,64}");
33+
const RE2 MixAll::IP_REGEX("\\d+\\.\\d+\\.\\d+\\.\\d+");
3334

3435
const std::chrono::duration<long long> MixAll::DEFAULT_INVISIBLE_TIME_ = std::chrono::seconds(30);
3536

@@ -221,4 +222,8 @@ bool MixAll::homeDirectory(std::string& home_dir) {
221222
#endif
222223
}
223224

225+
bool MixAll::isIPv4(absl::string_view host) {
226+
return RE2::FullMatch(re2::StringPiece(host.data(), host.length()), IP_REGEX);
227+
}
228+
224229
ROCKETMQ_NAMESPACE_END

src/main/cpp/base/include/MixAll.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
#pragma once
22

3+
#include "absl/strings/string_view.h"
34
#include "re2/re2.h"
45
#include "rocketmq/MQMessage.h"
56
#include <chrono>
@@ -22,6 +23,7 @@ class MixAll {
2223
static const int32_t DEFAULT_MAX_DELIVERY_ATTEMPTS;
2324

2425
static const RE2 TOPIC_REGEX;
26+
static const RE2 IP_REGEX;
2527

2628
/**
2729
* The amount of time required before a popped message is eligible to be consumed again. By default, 30s.
@@ -100,6 +102,8 @@ class MixAll {
100102

101103
static bool homeDirectory(std::string& home);
102104

105+
static bool isIPv4(absl::string_view host);
106+
103107
private:
104108
static bool hexCharValue(char c, uint8_t& value);
105109
};

src/main/cpp/log/LoggerImpl.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,8 @@ void LoggerImpl::init0() {
6868
setLevel(console_sink_, console_level_);
6969
console_sink_->set_pattern(pattern_);
7070

71-
file_sink_ = std::make_shared<spdlog::sinks::rotating_file_sink_mt>(log_home_, file_size_, file_count_, true);
71+
file_sink_ = std::make_shared<spdlog::sinks::rotating_file_sink_mt>(log_home_, file_size_, file_count_,
72+
/*rotate_on_open=*/false);
7273
file_sink_->set_pattern(pattern_);
7374
setLevel(file_sink_, level_);
7475

src/main/cpp/rocketmq/ClientImpl.cpp

Lines changed: 41 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,16 @@
66
#include "MessageAccessor.h"
77
#include "Signature.h"
88
#include "absl/strings/str_join.h"
9+
#include "absl/strings/str_split.h"
910
#include "apache/rocketmq/v1/definition.pb.h"
1011
#include "rocketmq/MQMessageExt.h"
12+
#include <algorithm>
1113
#include <chrono>
14+
#include <cstdint>
15+
#include <iterator>
1216
#include <memory>
1317
#include <string>
18+
#include <utility>
1419

1520
ROCKETMQ_NAMESPACE_BEGIN
1621

@@ -53,7 +58,7 @@ void ClientImpl::start() {
5358
};
5459
name_server_update_handle_ =
5560
client_manager_->getScheduler().schedule(name_server_update_functor, UPDATE_NAME_SERVER_LIST_TASK_NAME,
56-
std::chrono::milliseconds(0), std::chrono::seconds(30));
61+
std::chrono::milliseconds(0), std::chrono::seconds(30));
5762
}
5863

5964
auto route_update_functor = [ptr]() {
@@ -64,7 +69,7 @@ void ClientImpl::start() {
6469
};
6570

6671
route_update_handle_ = client_manager_->getScheduler().schedule(route_update_functor, UPDATE_ROUTE_TASK_NAME,
67-
std::chrono::seconds(10), std::chrono::seconds(30));
72+
std::chrono::seconds(10), std::chrono::seconds(30));
6873
state_.store(State::STARTED);
6974
}
7075

@@ -236,6 +241,37 @@ void ClientImpl::fetchRouteFor(const std::string& topic, const std::function<voi
236241
QueryRouteRequest request;
237242
request.mutable_topic()->set_arn(arn_);
238243
request.mutable_topic()->set_name(topic);
244+
auto endpoints = request.mutable_endpoints()->mutable_addresses();
245+
std::vector<std::pair<std::string, std::uint16_t>> pairs;
246+
{
247+
absl::MutexLock lk(&name_server_list_mtx_);
248+
for (const auto& name_server_item : name_server_list_) {
249+
std::string::size_type pos = name_server_item.rfind(":");
250+
if (std::string::npos == pos) {
251+
continue;
252+
}
253+
std::string host(name_server_item.substr(0, pos));
254+
std::string port(name_server_item.substr(pos + 1));
255+
pairs.emplace_back(std::make_pair(host, std::stoi(port)));
256+
}
257+
}
258+
259+
if (!pairs.empty()) {
260+
for (const auto& host_port : pairs) {
261+
auto address = new rmq::Address();
262+
address->set_port(host_port.second);
263+
address->set_host(host_port.first);
264+
endpoints->AddAllocated(address);
265+
}
266+
267+
if (MixAll::isIPv4(pairs.begin()->first)) {
268+
request.mutable_endpoints()->set_scheme(rmq::AddressScheme::IPv4);
269+
} else if (absl::StrContains(pairs.begin()->first, ':')) {
270+
request.mutable_endpoints()->set_scheme(rmq::AddressScheme::IPv6);
271+
} else {
272+
request.mutable_endpoints()->set_scheme(rmq::AddressScheme::DOMAIN_NAME);
273+
}
274+
}
239275

240276
absl::flat_hash_map<std::string, std::string> metadata;
241277
Signature::sign(this, metadata);
@@ -346,7 +382,7 @@ void ClientImpl::multiplexing(const std::string& target, const MultiplexingReque
346382
absl::flat_hash_map<std::string, std::string> metadata;
347383
Signature::sign(this, metadata);
348384
client_manager_->multiplexingCall(target, metadata, request, absl::ToChronoMilliseconds(long_polling_timeout_),
349-
std::bind(&ClientImpl::onMultiplexingResponse, this, std::placeholders::_1));
385+
std::bind(&ClientImpl::onMultiplexingResponse, this, std::placeholders::_1));
350386
}
351387

352388
void ClientImpl::onMultiplexingResponse(const InvocationContext<MultiplexingResponse>* ctx) {
@@ -359,7 +395,7 @@ void ClientImpl::onMultiplexingResponse(const InvocationContext<MultiplexingResp
359395
};
360396
static std::string task_name = "Initiate multiplex request later";
361397
client_manager_->getScheduler().schedule(multiplexingLater, task_name, std::chrono::seconds(3),
362-
std::chrono::seconds(0));
398+
std::chrono::seconds(0));
363399
return;
364400
}
365401

@@ -373,7 +409,7 @@ void ClientImpl::onMultiplexingResponse(const InvocationContext<MultiplexingResp
373409
absl::flat_hash_map<std::string, std::string> metadata;
374410
Signature::sign(this, metadata);
375411
client_manager_->multiplexingCall(ctx->remote_address, metadata, request, absl::ToChronoMilliseconds(io_timeout_),
376-
std::bind(&ClientImpl::onMultiplexingResponse, this, std::placeholders::_1));
412+
std::bind(&ClientImpl::onMultiplexingResponse, this, std::placeholders::_1));
377413
break;
378414
}
379415

src/test/cpp/ut/base/MixAllTest.cpp

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ class MixAllTest : public testing::Test {
88
public:
99
static std::string toUpperCase(const std::string& s) {
1010
std::string result;
11-
for (const char & c : s) {
11+
for (const char& c : s) {
1212
if ('a' <= c && 'z' >= c) {
1313
result.push_back(static_cast<char>('A' + (c - 'a')));
1414
} else {
@@ -94,4 +94,11 @@ TEST_F(MixAllTest, testCrc32) {
9494
std::cout << digest << std::endl;
9595
}
9696

97+
TEST_F(MixAllTest, testIsIpv4) {
98+
const char* ip = "8.8.8.8";
99+
const char* host = "www.taobao.com";
100+
EXPECT_TRUE(MixAll::isIPv4(ip));
101+
EXPECT_FALSE(MixAll::isIPv4(host));
102+
}
103+
97104
ROCKETMQ_NAMESPACE_END

0 commit comments

Comments
 (0)