Skip to content

Commit b50d99c

Browse files
authored
fix(send): try to use command v2 to send messages (#225)
[fix #189] * fix(send): try to use command v2 to send messages * fix(send): try to use command v2 to send messages * fix(send): try to use command v2 to send messages * fix(send): try to use command v2 to send messages * fix(send): try to use command v2 to send messages * fix(send): try to use command v2 to send messages * fix(send): try to use command v2 to send messages
1 parent 54a3878 commit b50d99c

14 files changed

+1581
-47
lines changed

src/MQClientAPIImpl.cpp

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -230,7 +230,11 @@ SendResult MQClientAPIImpl::sendMessage(const string& addr,
230230
int communicationMode,
231231
SendCallback* pSendCallback,
232232
const SessionCredentials& sessionCredentials) {
233-
RemotingCommand request(SEND_MESSAGE, pRequestHeader);
233+
// RemotingCommand request(SEND_MESSAGE, pRequestHeader);
234+
// Using MQ V2 Protocol to end messages.
235+
SendMessageRequestHeaderV2* pRequestHeaderV2 = new SendMessageRequestHeaderV2(*pRequestHeader);
236+
RemotingCommand request(SEND_MESSAGE_V2, pRequestHeaderV2);
237+
delete pRequestHeader; // delete to avoid memory leak.
234238
string body = msg.getBody();
235239
request.SetBody(body.c_str(), body.length());
236240
request.setMsgBody(body);

src/MQClientFactory.cpp

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
#include "ConsumerRunningInfo.h"
1919
#include "Logging.h"
2020
#include "MQClientManager.h"
21+
#include "MQVersion.h"
2122
#include "PullRequest.h"
2223
#include "Rebalance.h"
2324
#include "TopicPublishInfo.h"
@@ -1161,7 +1162,9 @@ ConsumerRunningInfo* MQClientFactory::consumerRunningInfo(const string& consumer
11611162
} else {
11621163
runningInfo->setProperty(ConsumerRunningInfo::PROP_CONSUME_TYPE, "CONSUME_ACTIVELY");
11631164
}
1164-
runningInfo->setProperty(ConsumerRunningInfo::PROP_CLIENT_VERSION, "V3_1_8"); // MQVersion::s_CurrentVersion ));
1165+
runningInfo->setProperty(
1166+
ConsumerRunningInfo::PROP_CLIENT_VERSION,
1167+
MQVersion::GetVersionDesc(MQVersion::s_CurrentVersion)); // MQVersion::s_CurrentVersion ));
11651168

11661169
return runningInfo;
11671170
}

src/common/AsyncCallbackWrap.cpp

100755100644
Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
#include "MQClientAPIImpl.h"
2121
#include "MQDecoder.h"
2222
#include "MQMessageQueue.h"
23-
#include "MQProtos.h"
2423
#include "PullAPIWrapper.h"
2524
#include "PullResultExt.h"
2625
#include "ResponseFuture.h"

src/common/AsyncCallbackWrap.h

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,15 +20,14 @@
2020
#include "AsyncArg.h"
2121
#include "AsyncCallback.h"
2222
#include "MQMessage.h"
23-
#include "UtilAll.h"
2423
#include "RemotingCommand.h"
24+
#include "UtilAll.h"
2525

2626
namespace rocketmq {
2727

2828
class ResponseFuture;
2929
class MQClientAPIImpl;
3030
class DefaultMQProducer;
31-
class SendMessageRequestHeader;
3231
//<!***************************************************************************
3332
enum asyncCallBackType { asyncCallbackWrap = 0, sendCallbackWrap = 1, pullCallbackWarp = 2 };
3433

@@ -77,5 +76,5 @@ class PullCallbackWarp : public AsyncCallbackWrap {
7776
};
7877

7978
//<!***************************************************************************
80-
} //<!end namespace;
81-
#endif //<! _AsyncCallbackWrap_H_
79+
} // namespace rocketmq
80+
#endif // __ASYNCCALLBACKWRAP_H__

src/common/MQVersion.cpp

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,19 @@
1717
#include "MQVersion.h"
1818

1919
namespace rocketmq {
20-
int MQVersion::s_CurrentVersion = MQVersion::V3_1_8;
20+
int MQVersion::s_CurrentVersion = MQVersion::V4_6_0;
21+
std::string MQVersion::s_CurrentLanguage = "CPP";
2122

2223
//<!************************************************************************
23-
const char* MQVersion::getVersionDesc(int value) {
24-
switch (value) {
25-
// case V1_0_0:
26-
// return "V1_0_0";
24+
const char* MQVersion::GetVersionDesc(int value) {
25+
int currentVersion = value;
26+
if (value <= V3_0_0_SNAPSHOT) {
27+
currentVersion = V3_0_0_SNAPSHOT;
2728
}
28-
return "";
29+
if (value >= HIGHER_VERSION) {
30+
currentVersion = HIGHER_VERSION;
31+
}
32+
return RocketMQCPPClientVersion[currentVersion];
2933
}
3034
//<!***************************************************************************
31-
} //<!end namespace;
35+
} // namespace rocketmq

0 commit comments

Comments
 (0)