Skip to content

Commit e628502

Browse files
githublaohuShannonDing
authored andcommitted
Realization C asynSend (#65)
Realization C asynSend
1 parent 6f0908e commit e628502

File tree

9 files changed

+228
-6
lines changed

9 files changed

+228
-6
lines changed

example/CAsyncProducer.c

Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
#include <stdio.h>
19+
20+
#include "CProducer.h"
21+
#include "CCommon.h"
22+
#include "CMessage.h"
23+
#include "CSendResult.h"
24+
25+
#ifdef _WIN32
26+
#include <windows.h>
27+
#else
28+
29+
#include <unistd.h>
30+
#include <memory.h>
31+
32+
#endif
33+
34+
void thread_sleep(unsigned milliseconds) {
35+
#ifdef _WIN32
36+
Sleep(milliseconds);
37+
#else
38+
usleep(milliseconds * 1000); // takes microseconds
39+
#endif
40+
}
41+
42+
void sendSuccessCallback(CSendResult result){
43+
printf("Msg Send ID:%s\n", result.msgId);
44+
}
45+
46+
void sendExceptionCallback(CMQException e){
47+
printf("asyn send exception error : %d\n" , e.error);
48+
printf("asyn send exception msg : %s\n" , e.msg);
49+
printf("asyn send exception file : %s\n" , e.file);
50+
printf("asyn send exception line : %d\n" , e.line);
51+
}
52+
53+
void startSendMessage(CProducer *producer) {
54+
int i = 0;
55+
char DestMsg[256];
56+
CMessage *msg = CreateMessage("T_TestTopic");
57+
SetMessageTags(msg, "Test_Tag");
58+
SetMessageKeys(msg, "Test_Keys");
59+
CSendResult result;
60+
for (i = 0; i < 10; i++) {
61+
printf("send one message : %d\n", i);
62+
memset(DestMsg, 0, sizeof(DestMsg));
63+
snprintf(DestMsg, sizeof(DestMsg), "New message body: index %d", i);
64+
SetMessageBody(msg, DestMsg);
65+
int code = SendMessageAsync(producer, msg, sendSuccessCallback , sendExceptionCallback);
66+
printf("Async send return code: %d\n", code);
67+
thread_sleep(1000);
68+
}
69+
}
70+
71+
void CreateProducerAndStartSendMessage(int i){
72+
printf("Producer Initializing.....\n");
73+
CProducer *producer = CreateProducer("Group_producer");
74+
SetProducerNameServerAddress(producer, "127.0.0.1:9876");
75+
if(i == 1){
76+
SetProducerSendMsgTimeout(producer , 3);
77+
}
78+
StartProducer(producer);
79+
printf("Producer start.....\n");
80+
startSendMessage(producer);
81+
ShutdownProducer(producer);
82+
DestroyProducer(producer);
83+
printf("Producer Shutdown!\n");
84+
}
85+
86+
int main(int argc, char *argv[]) {
87+
printf("Send Async successCallback.....\n");
88+
CreateProducerAndStartSendMessage(0);
89+
90+
printf("Send Async exceptionCallback.....\n");
91+
CreateProducerAndStartSendMessage(1);
92+
93+
return 0;
94+
}
95+

example/Producer.c

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -62,11 +62,10 @@ int main(int argc, char *argv[]) {
6262
printf("Producer Initializing.....\n");
6363

6464
CProducer *producer = CreateProducer("Group_producer");
65-
SetProducerNameServerAddress(producer, "172.17.0.2:9876");
65+
SetProducerNameServerAddress(producer, "127.0.0.1:9876");
6666
StartProducer(producer);
6767
printf("Producer start.....\n");
6868
startSendMessage(producer);
69-
7069
ShutdownProducer(producer);
7170
DestroyProducer(producer);
7271
printf("Producer Shutdown!\n");

example/PushConsumer.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ int main(int argc, char *argv[]) {
8181

8282
if (info.syncpush) consumer.setAsyncPull(false); // set sync pull
8383
if (info.broadcasting) {
84-
consumer.setMessageModel(BROADCASTING);
84+
consumer.setMessageModel(rocketmq::BROADCASTING);
8585
}
8686

8787
consumer.setInstanceName(info.groupname);

include/CCommon.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ typedef enum _CStatus_{
3636
PRODUCER_SEND_SYNC_FAILED = 11,
3737
PRODUCER_SEND_ONEWAY_FAILED = 12,
3838
PRODUCER_SEND_ORDERLY_FAILED = 13,
39+
PRODUCER_SEND_ASYNC_FAILED = 14,
3940

4041
PUSHCONSUMER_ERROR_CODE_START = 20,
4142
PUSHCONSUMER_START_FAILED = 20,

include/CMQException.h

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
19+
#ifndef __C_MQEXCPTION_H__
20+
#define __C_MQEXCPTION_H__
21+
#include "CCommon.h"
22+
23+
#ifdef __cplusplus
24+
extern "C" {
25+
#endif
26+
27+
#define MAX_EXEPTION_CHAR_LENGTH 512
28+
29+
typedef struct _CMQException_{
30+
int error;
31+
int line;
32+
char file[MAX_EXEPTION_CHAR_LENGTH];
33+
char msg[MAX_EXEPTION_CHAR_LENGTH];
34+
char type[MAX_EXEPTION_CHAR_LENGTH];
35+
36+
} CMQException;
37+
38+
#ifdef __cplusplus
39+
};
40+
#endif
41+
#endif

include/CProducer.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,9 @@
1818
#ifndef __C_PRODUCER_H__
1919
#define __C_PRODUCER_H__
2020

21-
#include "CCommon.h"
2221
#include "CMessage.h"
2322
#include "CSendResult.h"
23+
#include "CMQException.h"
2424

2525
#ifdef __cplusplus
2626
extern "C" {
@@ -29,6 +29,8 @@ extern "C" {
2929
//typedef struct _CProducer_ _CProducer;
3030
typedef struct CProducer CProducer;
3131
typedef int(*QueueSelectorCallback)(int size, CMessage *msg, void *arg);
32+
typedef void(*CSendSuccessCallback)(CSendResult result);
33+
typedef void(*CSendExceptionCallback)(CMQException e);
3234

3335
ROCKETMQCLIENT_API CProducer *CreateProducer(const char *groupId);
3436
ROCKETMQCLIENT_API int DestroyProducer(CProducer *producer);
@@ -49,6 +51,7 @@ ROCKETMQCLIENT_API int SetProducerCompressLevel(CProducer *producer, int level);
4951
ROCKETMQCLIENT_API int SetProducerMaxMessageSize(CProducer *producer, int size);
5052

5153
ROCKETMQCLIENT_API int SendMessageSync(CProducer *producer, CMessage *msg, CSendResult *result);
54+
ROCKETMQCLIENT_API int SendMessageAsync(CProducer *producer, CMessage *msg, CSendSuccessCallback cSendSuccessCallback , CSendExceptionCallback cSendExceptionCallback);
5255
ROCKETMQCLIENT_API int SendMessageOneway(CProducer *producer,CMessage *msg);
5356
ROCKETMQCLIENT_API int SendMessageOrderly(CProducer *producer, CMessage *msg, QueueSelectorCallback callback, void *arg, int autoRetryTimes, CSendResult *result);
5457
#ifdef __cplusplus

include/MQClientException.h

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,17 @@
2121
#include <ostream>
2222
#include <sstream>
2323
#include <string>
24+
25+
#include <string.h>
2426
#include "RocketMQClient.h"
27+
#include "CCommon.h"
28+
2529

2630

2731
namespace rocketmq {
2832
//<!***************************************************************************
2933
class ROCKETMQCLIENT_API MQException : public std::exception {
34+
3035
public:
3136
MQException(const std::string& msg, int error, const char* file,
3237
int line) throw()
@@ -60,6 +65,10 @@ class ROCKETMQCLIENT_API MQException : public std::exception {
6065

6166
virtual const char* GetType() const throw() { return m_type.c_str(); }
6267

68+
int GetLine() { return m_line;}
69+
70+
const char* GetFile() { return m_file.c_str(); }
71+
6372
protected:
6473
int m_error;
6574
int m_line;
@@ -68,6 +77,7 @@ class ROCKETMQCLIENT_API MQException : public std::exception {
6877
std::string m_type;
6978
};
7079

80+
7181
inline std::ostream& operator<<(std::ostream& os, const MQException& e) {
7282
os << "Type: " << e.GetType() << " , " << e.what();
7383
return os;

src/extern/CProducer.cpp

Lines changed: 61 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,17 @@
1616
*/
1717

1818
#include "DefaultMQProducer.h"
19+
#include "AsyncCallback.h"
20+
1921
#include "CProducer.h"
2022
#include "CCommon.h"
21-
#include <string.h>
23+
#include "CSendResult.h"
2224
#include "CMessage.h"
25+
#include "CMQException.h"
26+
27+
#include <string.h>
28+
#include <typeinfo>
29+
2330

2431
#ifdef __cplusplus
2532
extern "C" {
@@ -45,6 +52,35 @@ class SelectMessageQueue : public MessageQueueSelector {
4552
QueueSelectorCallback m_pCallback;
4653
};
4754

55+
class CSendCallback : public AutoDeleteSendCallBack{
56+
public:
57+
CSendCallback(CSendSuccessCallback cSendSuccessCallback,CSendExceptionCallback cSendExceptionCallback){
58+
m_cSendSuccessCallback = cSendSuccessCallback;
59+
m_cSendExceptionCallback = cSendExceptionCallback;
60+
}
61+
virtual ~CSendCallback(){}
62+
virtual void onSuccess(SendResult& sendResult) {
63+
CSendResult result;
64+
result.sendStatus = CSendStatus((int) sendResult.getSendStatus());
65+
result.offset = sendResult.getQueueOffset();
66+
strncpy(result.msgId, sendResult.getMsgId().c_str(), MAX_MESSAGE_ID_LENGTH - 1);
67+
result.msgId[MAX_MESSAGE_ID_LENGTH - 1] = 0;
68+
m_cSendSuccessCallback(result);
69+
70+
}
71+
virtual void onException(MQException& e) {
72+
CMQException exception;
73+
exception.error = e.GetError();
74+
exception.line = e.GetLine();
75+
strncpy(exception.msg, e.what(), MAX_EXEPTION_CHAR_LENGTH - 1);
76+
strncpy(exception.file, e.GetFile(), MAX_EXEPTION_CHAR_LENGTH - 1);
77+
m_cSendExceptionCallback( exception );
78+
}
79+
private:
80+
CSendSuccessCallback m_cSendSuccessCallback;
81+
CSendExceptionCallback m_cSendExceptionCallback;
82+
};
83+
4884

4985
CProducer *CreateProducer(const char *groupId) {
5086
if (groupId == NULL) {
@@ -127,6 +163,30 @@ int SendMessageSync(CProducer *producer, CMessage *msg, CSendResult *result) {
127163
return OK;
128164
}
129165

166+
int SendMessageAsync(CProducer *producer, CMessage *msg, CSendSuccessCallback cSendSuccessCallback,CSendExceptionCallback cSendExceptionCallback){
167+
if (producer == NULL || msg == NULL || cSendSuccessCallback == NULL || cSendExceptionCallback == NULL) {
168+
return NULL_POINTER;
169+
}
170+
DefaultMQProducer *defaultMQProducer = (DefaultMQProducer *) producer;
171+
MQMessage *message = (MQMessage *) msg;
172+
CSendCallback* cSendCallback = new CSendCallback(cSendSuccessCallback , cSendExceptionCallback);
173+
174+
try {
175+
defaultMQProducer->send(*message ,cSendCallback);
176+
} catch (exception &e) {
177+
if(cSendCallback != NULL){
178+
if(typeid(e) == typeid( MQException )){
179+
MQException &mqe = (MQException &)e;
180+
cSendCallback->onException( mqe );
181+
}
182+
delete cSendCallback;
183+
cSendCallback = NULL;
184+
}
185+
return PRODUCER_SEND_ASYNC_FAILED;
186+
}
187+
return OK;
188+
}
189+
130190
int SendMessageOneway(CProducer *producer, CMessage *msg) {
131191
if (producer == NULL || msg == NULL) {
132192
return NULL_POINTER;

test/src/UrlTest.cpp

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,15 @@
2020
#include "gtest/gtest.h"
2121
#include "gmock/gmock.h"
2222

23+
#include <stdio.h>
24+
25+
#include "CProducer.h"
26+
#include "CCommon.h"
27+
#include "CMessage.h"
28+
#include "CSendResult.h"
29+
#include "CMQException.h"
30+
#include <unistd.h>
31+
2332
using namespace std;
2433
using ::testing::InitGoogleTest;
2534
using ::testing::InitGoogleMock;
@@ -53,9 +62,13 @@ TEST(Url, Url) {
5362

5463
}
5564

65+
66+
5667
int main(int argc, char* argv[]) {
5768
InitGoogleMock(&argc, argv);
5869

5970
testing::GTEST_FLAG(filter) = "Url.Url";
60-
return RUN_ALL_TESTS();
71+
int itestts = RUN_ALL_TESTS();
72+
printf("i %d" , itestts);
73+
return itestts;
6174
}

0 commit comments

Comments
 (0)