Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 27 additions & 33 deletions example/CAsyncProducer.c
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,15 @@
*/

#include <stdio.h>

#include "CProducer.h"
#include "CCommon.h"
#include "CMessage.h"
#include "CSendResult.h"

#ifdef _WIN32
#include <windows.h>
#else

#include <unistd.h>
#include <memory.h>

#endif

void thread_sleep(unsigned milliseconds) {
Expand All @@ -39,48 +35,47 @@ void thread_sleep(unsigned milliseconds) {
#endif
}

void sendSuccessCallback(CSendResult result){
printf("Msg Send ID:%s\n", result.msgId);
void SendSuccessCallback(CSendResult result){
printf("async send success, msgid:%s\n", result.msgId);
}

void sendExceptionCallback(CMQException e){
printf("asyn send exception error : %d\n" , e.error);
printf("asyn send exception msg : %s\n" , e.msg);
printf("asyn send exception file : %s\n" , e.file);
printf("asyn send exception line : %d\n" , e.line);
void SendExceptionCallback(CMQException e){
char msg[1024];
snprintf(msg, sizeof(msg), "error:%d, msg:%s, file:%s:%d", e.error, e.msg, e.file, e.line);
printf("async send exception %s\n", msg);
}

void startSendMessage(CProducer *producer) {
void StartSendMessage(CProducer *producer) {
int i = 0;
char DestMsg[256];
int ret_code = 0;
char body[128];
CMessage *msg = CreateMessage("T_TestTopic");
SetMessageTags(msg, "Test_Tag");
SetMessageKeys(msg, "Test_Keys");
CSendResult result;
for (i = 0; i < 10; i++) {
printf("send one message : %d\n", i);
memset(DestMsg, 0, sizeof(DestMsg));
snprintf(DestMsg, sizeof(DestMsg), "New message body: index %d", i);
SetMessageBody(msg, DestMsg);
int code = SendMessageAsync(producer, msg, sendSuccessCallback , sendExceptionCallback);
printf("Async send return code: %d\n", code);
memset(body, 0, sizeof(body));
snprintf(body, sizeof(body), "new message body, index %d", i);
SetMessageBody(msg, body);
ret_code = SendMessageAsync(producer, msg, SendSuccessCallback , SendExceptionCallback);
printf("async send message[%d] return code: %d\n", i, ret_code);
thread_sleep(1000);
}
DestroyMessage(msg);
}

void CreateProducerAndStartSendMessage(int i){
printf("Producer Initializing.....\n");
CProducer *producer = CreateProducer("Group_producer");
SetProducerNameServerAddress(producer, "127.0.0.1:9876");
if(i == 1){
SetProducerSendMsgTimeout(producer , 3);
}
StartProducer(producer);
printf("Producer start.....\n");
startSendMessage(producer);
ShutdownProducer(producer);
DestroyProducer(producer);
printf("Producer Shutdown!\n");
printf("Producer Initializing.....\n");
CProducer *producer = CreateProducer("Group_producer");
SetProducerNameServerAddress(producer, "127.0.0.1:9876");
if(i == 1){
SetProducerSendMsgTimeout(producer , 3);
}
StartProducer(producer);
printf("Producer start.....\n");
StartSendMessage(producer);
ShutdownProducer(producer);
DestroyProducer(producer);
printf("Producer Shutdown!\n");
}

int main(int argc, char *argv[]) {
Expand All @@ -89,7 +84,6 @@ int main(int argc, char *argv[]) {

printf("Send Async exceptionCallback.....\n");
CreateProducerAndStartSendMessage(1);

return 0;
}

21 changes: 7 additions & 14 deletions example/Producer.c
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,15 @@
*/

#include <stdio.h>

#include "CProducer.h"
#include "CCommon.h"
#include "CMessage.h"
#include "CSendResult.h"

#ifdef _WIN32
#include <windows.h>
#else

#include <unistd.h>
#include <memory.h>

#endif

void thread_sleep(unsigned milliseconds) {
Expand All @@ -39,33 +35,30 @@ void thread_sleep(unsigned milliseconds) {
#endif
}

void startSendMessage(CProducer *producer) {
void StartSendMessage(CProducer *producer) {
int i = 0;
char DestMsg[256];
char body[256];
CMessage *msg = CreateMessage("T_TestTopic");
SetMessageTags(msg, "Test_Tag");
SetMessageKeys(msg, "Test_Keys");
CSendResult result;
for (i = 0; i < 10; i++) {
printf("send one message : %d\n", i);
memset(DestMsg, 0, sizeof(DestMsg));
snprintf(DestMsg, 255, "New message body: index %d", i);
SetMessageBody(msg, DestMsg);
memset(body, 0, sizeof(body));
SetMessageBody(msg, body);
SendMessageSync(producer, msg, &result);
printf("Msg Send ID:%s\n", result.msgId);
printf("send message[%d] result status:%d, msgId:%s\n", i, (int)result.sendStatus, result.msgId);
thread_sleep(1000);
}
DestroyMessage(msg);
}


int main(int argc, char *argv[]) {
printf("Producer Initializing.....\n");

CProducer *producer = CreateProducer("Group_producer");
SetProducerNameServerAddress(producer, "127.0.0.1:9876");
StartProducer(producer);
printf("Producer start.....\n");
startSendMessage(producer);
StartSendMessage(producer);
ShutdownProducer(producer);
DestroyProducer(producer);
printf("Producer Shutdown!\n");
Expand Down
4 changes: 0 additions & 4 deletions example/PullConsumeMessage.c
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,16 @@
*/

#include <stdio.h>

#include "CPullConsumer.h"
#include "CCommon.h"
#include "CMessageExt.h"
#include "CPullResult.h"
#include "CMessageQueue.h"

#ifdef _WIN32
#include <windows.h>
#else

#include <unistd.h>
#include <memory.h>

#endif

void thread_sleep(unsigned milliseconds) {
Expand Down
5 changes: 0 additions & 5 deletions example/PushConsumeMessage.c
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,14 @@
*/

#include <stdio.h>

#include "CPushConsumer.h"
#include "CCommon.h"
#include "CMessageExt.h"

#ifdef _WIN32
#include <windows.h>
#else

#include <unistd.h>
#include <memory.h>

#endif

void thread_sleep(unsigned milliseconds) {
Expand All @@ -47,7 +43,6 @@ int doConsumeMessage(struct CPushConsumer *consumer, CMessageExt *msgExt) {
return E_CONSUME_SUCCESS;
}


int main(int argc, char *argv[]) {
int i = 0;
printf("PushConsumer Initializing....\n");
Expand Down
18 changes: 10 additions & 8 deletions example/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,20 +65,20 @@ class TpsReportService {
public:
TpsReportService() : tps_interval_(1), quit_flag_(false), tps_count_(0) {}
void start() {
if (tps_thread_ == NULL) {
std::cout << "tps_thread_ is null" << std::endl;
return;
}
if (tps_thread_ == NULL) {
std::cout << "tps_thread_ is null" << std::endl;
return;
}
tps_thread_.reset(
new boost::thread(boost::bind(&TpsReportService::TpsReport, this)));
}

~TpsReportService() {
quit_flag_.store(true);
if (tps_thread_ == NULL) {
std::cout << "tps_thread_ is null" << std::endl;
return;
}
if (tps_thread_ == NULL) {
std::cout << "tps_thread_ is null" << std::endl;
return;
}
if (tps_thread_->joinable()) tps_thread_->join();
}

Expand All @@ -99,12 +99,14 @@ class TpsReportService {
boost::atomic<long> tps_count_;
};

/*
static void PrintResult(rocketmq::SendResult* result) {
std::cout << "sendresult = " << result->getSendStatus()
<< ", msgid = " << result->getMsgId()
<< ", queueOffset = " << result->getQueueOffset() << ","
<< result->getMessageQueue().toString() << endl;
}
*/

void PrintPullResult(rocketmq::PullResult* result) {
std::cout << result->toString() << std::endl;
Expand Down
3 changes: 1 addition & 2 deletions include/CCommon.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ typedef enum _CStatus_{
PRODUCER_SEND_SYNC_FAILED = 11,
PRODUCER_SEND_ONEWAY_FAILED = 12,
PRODUCER_SEND_ORDERLY_FAILED = 13,
PRODUCER_SEND_ASYNC_FAILED = 14,
PRODUCER_SEND_ASYNC_FAILED = 14,

PUSHCONSUMER_ERROR_CODE_START = 20,
PUSHCONSUMER_START_FAILED = 20,
Expand All @@ -57,7 +57,6 @@ typedef enum _CLogLevel_{
E_LOG_LEVEL_LEVEL_NUM = 7
} CLogLevel;


#ifdef WIN32
#ifdef ROCKETMQCLIENT_EXPORTS
#ifdef _WINDLL
Expand Down
17 changes: 8 additions & 9 deletions include/CMQException.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/


#ifndef __C_MQEXCPTION_H__
#define __C_MQEXCPTION_H__
#include "CCommon.h"
Expand All @@ -24,14 +22,15 @@
extern "C" {
#endif

#define MAX_EXEPTION_CHAR_LENGTH 512

#define MAX_EXEPTION_MSG_LENGTH 512
#define MAX_EXEPTION_FILE_LENGTH 256
#define MAX_EXEPTION_TYPE_LENGTH 128
typedef struct _CMQException_{
int error;
int line;
char file[MAX_EXEPTION_CHAR_LENGTH];
char msg[MAX_EXEPTION_CHAR_LENGTH];
char type[MAX_EXEPTION_CHAR_LENGTH];
int error;
int line;
char file[MAX_EXEPTION_FILE_LENGTH];
char msg[MAX_EXEPTION_MSG_LENGTH];
char type[MAX_EXEPTION_TYPE_LENGTH];

} CMQException;

Expand Down
11 changes: 1 addition & 10 deletions include/MQClientException.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,10 @@
#include <ostream>
#include <sstream>
#include <string>

#include <string.h>
#include "RocketMQClient.h"
#include "CCommon.h"



namespace rocketmq {
//<!***************************************************************************
class ROCKETMQCLIENT_API MQException : public std::exception {
Expand Down Expand Up @@ -56,17 +53,12 @@ class ROCKETMQCLIENT_API MQException : public std::exception {
} catch (...) {
}
}

virtual ~MQException() throw() {}

const char* what() const throw() { return m_msg.c_str(); }

int GetError() const throw() { return m_error; }

virtual const char* GetType() const throw() { return m_type.c_str(); }

int GetLine() { return m_line;}

const char* GetFile() { return m_file.c_str(); }

protected:
Expand All @@ -77,7 +69,6 @@ class ROCKETMQCLIENT_API MQException : public std::exception {
std::string m_type;
};


inline std::ostream& operator<<(std::ostream& os, const MQException& e) {
os << "Type: " << e.GetType() << " , " << e.what();
return os;
Expand Down
Loading