此代码完成于2019年,可能存在过期可能,作者正在与RocketMQ官方人员沟通进行新版代码构建,如果遇到问题或者其他想法请联系本人邮箱fyqnankai@gmail.com
新版地址:敬请期待!
2021.4.19
.
├── rocketmq-client-dotnet RocketMQ .NET Client 文件
└── example
└── demo
├── ConsumerDemo PushConsumer Demo
├── ProducerDemo Producer Demo
└── PullConsumerDemo PullConsumer Demo
└── nugetTest
└── nugetTest Nuget package test.
└── src
└── RocketMQ.NETClient NET Client
├── Consumer Consumer API
├── Interop Const Value
├── Message Message API
└── Producer Producer API
- 使用说明 QuickStart
- 使用要求
.NET FrameWork >=4.6.1
or
.NET Standard >=2.0
功能 | C | .NET |
---|---|---|
同步消息发送 | Y | Y |
顺序消息发送 | Y | Y |
单向消息发送 | Y | Y |
拉取消息消费 | Y | Y |
推送消息消费 | Y | Y |
延时消息 | Y | Y |
消息压缩 | Y | Y |
消息过滤 | Y | Y |
字符串消息体 | Y | Y |
字节流消息体 | N | N |
Topic设置 | Y | Y |
- 创建
DefaultProducerBuilder
对象producerBuilder
- 使用
producerBuilder
设置想要生成的producerBuilder
参数 - 调用
producerBuilder
中的Builder
函数返回一个IProducer
实例producer
- 使用
producer
示例代码:
class MainClass
{
private static ProducerWrap.QueueSelectorCallback _queueSelectorCallback = new ProducerWrap.QueueSelectorCallback(
(size, message, args) =>
{
Console.WriteLine($"size: {size}, message: {message}, ptr: {args}");
return 0;
});
public static void Main(string[] args)
{
Console.Title = "Producer";
Console.WriteLine("Start create producer.");
var producerPtr = ProducerWrap.CreateProducer("GID_test");
if (producerPtr == IntPtr.Zero)
{
Console.WriteLine("zero. Oops.");
}
Console.WriteLine(producerPtr.ToString());
Console.WriteLine("end create producer.");
var p = new MainClass();
var producer = new HandleRef(p, producerPtr);
try
{
var setNameServerAddressResult = ProducerWrap.SetProducerNameServerAddress(producer, "127.0.0.1:9876");
Console.WriteLine("set name server address result:" + setNameServerAddressResult);
var setProducerLogPathResult = ProducerWrap.SetProducerLogPath(producer, "C:/rocketmq_log.txt");
Console.WriteLine("set producer log path result:" + setProducerLogPathResult);
var setLogLevelResult = ProducerWrap.SetProducerLogLevel(producer, CLogLevel.E_LOG_LEVEL_TRACE);
Console.WriteLine("set producer log level result:" + setLogLevelResult);
var startResult = ProducerWrap.StartProducer(producer);
Console.WriteLine("start result:" + startResult);
while (true)
{
// message
var message = MessageWrap.CreateMessage("Test");
Console.WriteLine("message intPtr:" + message);
var p1 = new MainClass();
var messageIntPtr = new HandleRef(p1, message);
var setMessageBodyResult = MessageWrap.SetMessageBody(messageIntPtr, "hello" + Guid.NewGuid());
Console.WriteLine("set message body result:" + setMessageBodyResult);
var setTagResult = MessageWrap.SetMessageTags(messageIntPtr, "tag_test11");
Console.WriteLine("set message tag result:" + setTagResult);
var setPropertyResult = MessageWrap.SetMessageProperty(messageIntPtr, "key1", "value1");
Console.WriteLine("set message property result:" + setPropertyResult);
//SendMessageSync
var sendResult = ProducerWrap.SendMessageSync(producer, messageIntPtr, out CSendResult sendResultStruct);
Console.WriteLine("send result:" + sendResult + ", msgId: " + sendResultStruct.msgId.ToString());
{sendResultStruct.sendStatus}, offset: {sendResultStruct.offset}");
Thread.Sleep(500);
}
var shutdownResult = ProducerWrap.ShutdownProducer(producer);
Console.WriteLine("shutdown result:" + shutdownResult);
var destoryResult = ProducerWrap.DestroyProducer(producer);
Console.WriteLine("destory result:" + destoryResult);
}
catch (Exception e)
{
Console.WriteLine(e.ToString());
}
Console.ReadKey(true);
}
}
- 创建producer对象(多种构造函数)
- 使用producer发送消息
//创建一个 producer
MQProducer producer = new MQProducer("GroupA", "127.0.0.1:9876");
producer.StartProducer();
// 创建一个消息 message
MQMessage message = new MQMessage("test");
// 使用producer发送消息
// SendMessageSync
var sendResult = producer.SendMessageSync(message);
Console.WriteLine("send result:" + sendResult + ", msgId: " + sendResult.MessageId);
- 创建一个Push消费者
- 订阅一个
topic
- 注册回调函数
- 启动消费者
// 创建一个Push消费者
var consumer = new MQPushConsumer("xx", "127.0.0.1:9876");
Console.WriteLine($"consumer: {consumer}");
// 设置日志目录和级别
consumer.SetPushConsumerLogPath(".\\consumer_log.txt");
consumer.SetPushConsumerLogLevel(LogLevel.Trace);
// 获取消费者组号
var groupId = consumer.GetPushConsumerGroupID();
Console.WriteLine($"groupId: {groupId}");
// 订阅一个`topic`
consumer.Subscribe("test", "*");
//注册回调函数
consumer.RegisterMessageCallback(_callback);
//启动消费者
var result=consumer.StartPushConsumer();
Console.WriteLine($"start push consumer ptr: {result}");
- 创建一个PullConsumer
- 开启消费者
- 填充消息队列
- 主动拉取消费
//创建一个PullConsumer
MQPullConsumer consumer = new MQPullConsumer("xxx", "127.0.0.1:9876", ".\\log.txt", LogLevel.Trace);
//开启消费者
var result = consumer.StartPullConsumer();
Console.WriteLine($"start Pull consumer ? : {result}");
//填充消息队列
CMessageQueue[] msgs = consumer.FetchSubscriptionMessageQueues("test");
for (int j = 0; j < msgs.Length; j++)
{
int flag = 0;
Console.WriteLine("msg topic : " + new string(msgs[j].topic));
MessageQueue mq = new MessageQueue { topic = new string(msgs[j].topic), brokeName = new string(msgs[j].brokerName), queueId = msgs[j].queueId };
while (true)
{
try
{
//主动拉取消费
CPullResult cPullResult = consumer.Pull(mq,msgs[j], "", MQPullConsumer.getMessageQueueOffset(mq), 32);
Console.WriteLine(new string(msgs[j].topic) + " status : " + cPullResult.pullStatus +"Max offset "+ cPullResult.maxOffset + " offset: " + cPullResult.nextBeginOffset + " Quene Id" + msgs[j].queueId);
//Console.WriteLine(" " + msg.topic);
long a = cPullResult.nextBeginOffset;
MQPullConsumer.putMessageQueueOffset(mq, a);
switch (cPullResult.pullStatus)
{
case CPullStatus.E_FOUND:
break;
case CPullStatus.E_NO_MATCHED_MSG:
break;
case CPullStatus.E_NO_NEW_MSG:
flag = 1;
break;
case CPullStatus.E_OFFSET_ILLEGAL:
flag = 2;
break;
default:
break;
}
if(flag == 1|| cPullResult.nextBeginOffset == cPullResult.maxOffset)
{
break;
}
if (flag == 2)
{
break;
}
}
catch (Exception e)
{
Console.WriteLine(e.Message);
}
}
}