Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion include/CCommon.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,15 @@ typedef enum {
// Failed, null pointer value
NULL_POINTER = 1,
} CStatus;

typedef enum {
E_LOG_LEVEL_FATAL = 1,
E_LOG_LEVEL_ERROR = 2,
E_LOG_LEVEL_WARN = 3,
E_LOG_LEVEL_INFO = 4,
E_LOG_LEVEL_DEBUG = 5,
E_LOG_LEVEL_TRACE = 6,
E_LOG_LEVEL_LEVEL_NUM = 7
} CLogLevel;
#ifdef __cplusplus
};
#endif
Expand Down
1 change: 1 addition & 0 deletions include/CMessage.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ int SetMessageKeys(CMessage *msg, const char *keys);
int SetMessageBody(CMessage *msg, const char *body);
int SetByteMessageBody(CMessage *msg, const char *body, int len);
int SetMessageProperty(CMessage *msg, const char *key, const char *value);
int SetDelayTimeLevel(CMessage *msg, int level);

#ifdef __cplusplus
};
Expand Down
9 changes: 9 additions & 0 deletions include/CMessageExt.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,15 @@ const char *GetMessageKeys(CMessageExt *msgExt);
const char *GetMessageBody(CMessageExt *msgExt);
const char *GetMessageProperty(CMessageExt *msgExt, const char *key);
const char *GetMessageId(CMessageExt *msgExt);
int GetMessageDelayTimeLevel(CMessageExt *msgExt);
int GetMessageQueueId(CMessageExt *msgExt);
int GetMessageReconsumeTimes(CMessageExt *msgExt);
int GetMessageStoreSize(CMessageExt *msgExt);
long long GetMessageBornTimestamp(CMessageExt *msgExt);
long long GetMessageStoreTimestamp(CMessageExt *msgExt);
long long GetMessageQueueOffset(CMessageExt *msgExt);
long long GetMessageCommitLogOffset(CMessageExt *msgExt);
long long GetMessagePreparedTransactionOffset(CMessageExt *msgExt);

#ifdef __cplusplus
};
Expand Down
10 changes: 10 additions & 0 deletions include/CProducer.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,16 @@ int StartProducer(CProducer *producer);
int ShutdownProducer(CProducer *producer);

int SetProducerNameServerAddress(CProducer *producer, const char *namesrv);
int SetProducerGroupName(CProducer *producer, const char *groupName);
int SetProducerInstanceName(CProducer *producer, const char *instanceName);
int SetProducerSessionCredentials(CProducer *producer, const char *accessKey, const char *secretKey,
const char *onsChannel);
int SetProducerLogPath(CProducer *producer, const char *logPath);
int SetProducerLogFileNumAndSize(CProducer *producer, int fileNum, long fileSize);
int SetProducerLogLevel(CProducer *producer, CLogLevel level);
int SetProducerSendMsgTimeout(CProducer *producer, int timeout);
int SetProducerCompressLevel(CProducer *producer, int level);
int SetProducerMaxMessageSize(CProducer *producer, int size);

int SendMessageSync(CProducer *producer, CMessage *msg, CSendResult *result);

Expand Down
8 changes: 7 additions & 1 deletion include/CPullConsumer.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,13 @@ int ShutdownPullConsumer(CPullConsumer *consumer);
int SetPullConsumerGroupID(CPullConsumer *consumer, const char *groupId);
const char *GetPullConsumerGroupID(CPullConsumer *consumer);
int SetPullConsumerNameServerAddress(CPullConsumer *consumer, const char *namesrv);
int fetchSubscribeMessageQueues(CPullConsumer *consumer, const char *topic, vector<CMessageQueue> &mqs);
int SetPullConsumerSessionCredentials(CPullConsumer *consumer, const char *accessKey, const char *secretKey,
const char *channel);
int SetPullConsumerLogPath(CPullConsumer *consumer, const char *logPath);
int SetPullConsumerLogFileNumAndSize(CPullConsumer *consumer, int fileNum, long fileSize);
int SetPullConsumerLogLevel(CPullConsumer *consumer, CLogLevel level);

int fetchSubscribeMessageQueues(CPullConsumer *consumer, const char *topic, CMessageQueue *mqs , int size);
CPullResult pull(const CMessageQueue *mq, const char *subExpression, long long offset, int maxNums);

#ifdef __cplusplus
Expand Down
4 changes: 2 additions & 2 deletions include/CPullResult.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

#include "CCommon.h"
#include "CMessageExt.h"
using namespace std;

#ifdef __cplusplus
extern "C" {
Expand All @@ -39,7 +38,8 @@ typedef struct _CPullResult_ {
long long nextBeginOffset;
long long minOffset;
long long maxOffset;
vector<CMessageExt> msgFoundList;
CMessageExt* msgFoundList;
int size;
} CPullResult;

#ifdef __cplusplus
Expand Down
12 changes: 9 additions & 3 deletions include/CPushConsumer.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#define __C_PUSH_CONSUMER_H__

#include "CMessageExt.h"
#include "CCommon.h"

#ifdef __cplusplus
extern "C" {
Expand All @@ -44,9 +45,14 @@ const char *GetPushConsumerGroupID(CPushConsumer *consumer);
int SetPushConsumerNameServerAddress(CPushConsumer *consumer, const char *namesrv);
int Subscribe(CPushConsumer *consumer, const char *topic, const char *expression);
int RegisterMessageCallback(CPushConsumer *consumer, MessageCallBack pCallback);
int SetPushConsumeThreadCount(CPushConsumer *consumer, int threadCount);
int SetPushConsumeMessageBatchMaxSize(CPushConsumer *consumer, int batchSize);

int SetPushConsumerThreadCount(CPushConsumer *consumer, int threadCount);
int SetPushConsumerMessageBatchMaxSize(CPushConsumer *consumer, int batchSize);
int SetPushConsumerInstanceName(CPushConsumer *consumer, const char *instanceName);
int SetPushConsumerSessionCredentials(CPushConsumer *consumer, const char *accessKey, const char *secretKey,
const char *channel);
int SetPushConsumerLogPath(CPushConsumer *consumer, const char *logPath);
int SetPushConsumerLogFileNumAndSize(CPushConsumer *consumer, int fileNum, long fileSize);
int SetPushConsumerLogLevel(CPushConsumer *consumer, CLogLevel level);
#ifdef __cplusplus
};
#endif
Expand Down
7 changes: 7 additions & 0 deletions src/extern/CMessage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,13 @@ int SetMessageProperty(CMessage *msg, const char *key, const char *value) {
((MQMessage *)msg)->setProperty(key,value);
return OK;
}
int SetDelayTimeLevel(CMessage *msg, int level){
if (msg == NULL) {
return NULL_POINTER;
}
((MQMessage *)msg)->setDelayTimeLevel(level);
return OK;
}

#ifdef __cplusplus
};
Expand Down
64 changes: 64 additions & 0 deletions src/extern/CMessageExt.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

#include "MQMessageExt.h"
#include "CMessageExt.h"
#include "CCommon.h"

#ifdef __cplusplus
extern "C" {
Expand Down Expand Up @@ -58,6 +59,69 @@ const char *GetMessageId(CMessageExt *msg) {
}
return ((MQMessageExt *) msg)->getMsgId().c_str();
}

int GetMessageDelayTimeLevel(CMessageExt *msg){
if (msg == NULL) {
return NULL_POINTER;
}
return ((MQMessageExt *) msg)->getDelayTimeLevel();
}

int GetMessageQueueId(CMessageExt *msg){
if (msg == NULL) {
return NULL_POINTER;
}
return ((MQMessageExt *) msg)->getQueueId();
}

int GetMessageReconsumeTimes(CMessageExt *msg){
if (msg == NULL) {
return NULL_POINTER;
}
return ((MQMessageExt *) msg)->getReconsumeTimes();
}

int GetMessageStoreSize(CMessageExt *msg){
if (msg == NULL) {
return NULL_POINTER;
}
return ((MQMessageExt *) msg)->getStoreSize();
}

long long GetMessageBornTimestamp(CMessageExt *msg) {
if (msg == NULL) {
return NULL_POINTER;
}
return ((MQMessageExt *) msg)->getBornTimestamp();
}

long long GetMessageStoreTimestamp(CMessageExt *msg){
if (msg == NULL) {
return NULL_POINTER;
}
return ((MQMessageExt *) msg)->getBornTimestamp();
}

long long GetMessageQueueOffset(CMessageExt *msg){
if (msg == NULL) {
return NULL_POINTER;
}
return ((MQMessageExt *) msg)->getQueueOffset();
}

long long GetMessageCommitLogOffset(CMessageExt *msg){
if (msg == NULL) {
return NULL_POINTER;
}
return ((MQMessageExt *) msg)->getCommitLogOffset();
}

long long GetMessagePreparedTransactionOffset(CMessageExt *msg){
if (msg == NULL) {
return NULL_POINTER;
}
return ((MQMessageExt *) msg)->getPreparedTransactionOffset();
}
#ifdef __cplusplus
};
#endif
86 changes: 82 additions & 4 deletions src/extern/CProducer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,29 +39,29 @@ int DestroyProducer(CProducer *pProducer) {
delete reinterpret_cast<DefaultMQProducer * >(pProducer);
return OK;
}
int StartProducer( CProducer *producer) {
int StartProducer(CProducer *producer) {
if (producer == NULL) {
return NULL_POINTER;
}
((DefaultMQProducer *) producer)->start();
return OK;
}
int ShutdownProducer( CProducer *producer) {
int ShutdownProducer(CProducer *producer) {
if (producer == NULL) {
return NULL_POINTER;
}
((DefaultMQProducer *) producer)->shutdown();
return OK;
}
int SetProducerNameServerAddress( CProducer *producer, const char *namesrv) {
int SetProducerNameServerAddress(CProducer *producer, const char *namesrv) {
if (producer == NULL) {
return NULL_POINTER;
}
((DefaultMQProducer *) producer)->setNamesrvAddr(namesrv);
return OK;
}

int SendMessageSync( CProducer *producer, CMessage *msg, CSendResult *result) {
int SendMessageSync(CProducer *producer, CMessage *msg, CSendResult *result) {
//CSendResult sendResult;
if (producer == NULL || msg == NULL || result == NULL) {
return NULL_POINTER;
Expand Down Expand Up @@ -93,6 +93,84 @@ int SendMessageSync( CProducer *producer, CMessage *msg, CSendResult *result) {
return OK;
}

int SetProducerGroupName(CProducer *producer, const char *groupName) {
if (producer == NULL) {
return NULL_POINTER;
}
((DefaultMQProducer *) producer)->setGroupName(groupName);
return OK;
}
int SetProducerInstanceName(CProducer *producer, const char *instanceName) {
if (producer == NULL) {
return NULL_POINTER;
}
((DefaultMQProducer *) producer)->setGroupName(instanceName);
return OK;
}
int SetProducerSessionCredentials(CProducer *producer, const char *accessKey, const char *secretKey,
const char *onsChannel) {
if (producer == NULL) {
return NULL_POINTER;
}
((DefaultMQProducer *) producer)->setSessionCredentials(accessKey, secretKey, onsChannel);
return OK;
}
int SetProducerLogPath(CProducer *producer, const char *logPath) {
if (producer == NULL) {
return NULL_POINTER;
}
//Todo, This api should be implemented by core api.
//((DefaultMQProducer *) producer)->setLogFileSizeAndNum(3, 102400000);
return OK;
}

int SetProducerLogFileNumAndSize(CProducer *producer, int fileNum, long fileSize) {
if (producer == NULL) {
return NULL_POINTER;
}
((DefaultMQProducer *) producer)->setLogFileSizeAndNum(fileNum, fileSize);
return OK;
}

int SetProducerLogLevel(CProducer *producer, CLogLevel level) {
if (producer == NULL) {
return NULL_POINTER;
}
((DefaultMQProducer *) producer)->setLogLevel((elogLevel) level);
return OK;
}

int SetProducerSendMsgTimeout(CProducer *producer, int timeout) {
if (producer == NULL) {
return NULL_POINTER;
}
((DefaultMQProducer *) producer)->setSendMsgTimeout(timeout);
return OK;
}

int SetProducerCompressMsgBodyOverHowmuch(CProducer *producer, int howmuch) {
if (producer == NULL) {
return NULL_POINTER;
}
((DefaultMQProducer *) producer)->setCompressMsgBodyOverHowmuch(howmuch);
return OK;
}

int SetProducerCompressLevel(CProducer *producer, int level) {
if (producer == NULL) {
return NULL_POINTER;
}
((DefaultMQProducer *) producer)->setCompressLevel(level);
return OK;
}

int SetProducerMaxMessageSize(CProducer *producer, int size) {
if (producer == NULL) {
return NULL_POINTER;
}
((DefaultMQProducer *) producer)->setMaxMessageSize(size);
return OK;
}
#ifdef __cplusplus
};
#endif
48 changes: 47 additions & 1 deletion src/extern/CPullConsumer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
#include "CMessageExt.h"
#include "CPullConsumer.h"
#include "CCommon.h"
#include <map>

using namespace rocketmq;

Expand Down Expand Up @@ -76,6 +75,53 @@ int SetPullConsumerNameServerAddress(CPullConsumer *consumer, const char *namesr
((DefaultMQPullConsumer *) consumer)->setNamesrvAddr(namesrv);
return OK;
}
int SetPullConsumerSessionCredentials(CPullConsumer *consumer, const char *accessKey, const char *secretKey,
const char *channel) {
if (consumer == NULL) {
return NULL_POINTER;
}
((DefaultMQPullConsumer *) consumer)->setSessionCredentials(accessKey, secretKey, channel);
return OK;
}

int SetPullConsumerLogPath(CPullConsumer *consumer, const char *logPath) {
if (consumer == NULL) {
return NULL_POINTER;
}
//Todo, This api should be implemented by core api.
//((DefaultMQPullConsumer *) consumer)->setInstanceName(instanceName);
return OK;
}

int SetPullConsumerLogFileNumAndSize(CPullConsumer *consumer, int fileNum, long fileSize) {
if (consumer == NULL) {
return NULL_POINTER;
}
((DefaultMQPullConsumer *) consumer)->setLogFileSizeAndNum(fileNum,fileSize);
return OK;
}

int SetPullConsumerLogLevel(CPullConsumer *consumer, CLogLevel level) {
if (consumer == NULL) {
return NULL_POINTER;
}
((DefaultMQPullConsumer *) consumer)->setLogLevel((elogLevel)level);
return OK;
}

int fetchSubscribeMessageQueues(CPullConsumer *consumer, const char *topic, CMessageQueue *mqs , int size){
if (consumer == NULL) {
return NULL_POINTER;
}
//ToDo, Add implement
return OK;
}
CPullResult pull(const CMessageQueue *mq, const char *subExpression, long long offset, int maxNums){
CPullResult pullResult ;
memset(&pullResult,0, sizeof(CPullResult));
//ToDo, Add implement
return pullResult;
}

#ifdef __cplusplus
};
Expand Down
Loading