single instance with multi DefaultLitePullConsumer(use same topic and different tag) don't poll message right #6578
-
|
Beta Was this translation helpful? Give feedback.
Answered by
RongtongJin
Apr 12, 2023
Replies: 4 comments
-
generate DefaultLitePullConsumer instance private void initRocketMQPushConsumer() throws MQClientException {
List<DefaultLitePullConsumer> pullConsumers = new ArrayList<>();
DefaultLitePullConsumer defaultLitePullConsumer = getDefaultLitePullConsumer("high");
pullConsumers.add(defaultLitePullConsumer);
DefaultLitePullConsumer defaultLitePullConsumer_1 = getDefaultLitePullConsumer("low");
pullConsumers.add(defaultLitePullConsumer_1);
PULL_CONSUMERS = Collections.unmodifiableList(pullConsumers);
}
private DefaultLitePullConsumer getDefaultLitePullConsumer(String tag) throws MQClientException {
RocketMQProperties.Consumer consumerConfig = new RocketMQProperties.Consumer();
consumerConfig.setGroup(MqConfig.CATEGORY_SMART_CAMPUS_APP_CENTER + "_priority");
consumerConfig.setTopic(TOPIC);
consumerConfig.setPullBatchSize(50);
DefaultLitePullConsumer defaultLitePullConsumer;
consumerConfig.setSelectorExpression(tag);
defaultLitePullConsumer = buildPullConsumer(consumerConfig);
return defaultLitePullConsumer;
}
private DefaultLitePullConsumer buildPullConsumer(RocketMQProperties.Consumer consumerConfig) throws MQClientException {
/**
* {@link org.apache.rocketmq.spring.autoconfigure.RocketMQAutoConfiguration#defaultLitePullConsumer(RocketMQProperties)}
*/
String nameServer = rocketMQProperties.getNameServer();
String groupName = consumerConfig.getGroup();
String topicName = consumerConfig.getTopic();
Assert.hasText(nameServer, "[rocketmq.name-server] must not be null");
Assert.hasText(groupName, "[rocketmq.pull-consumer.group] must not be null");
Assert.hasText(topicName, "[rocketmq.pull-consumer.topic] must not be null");
String accessChannel = rocketMQProperties.getAccessChannel();
MessageModel messageModel = MessageModel.valueOf(consumerConfig.getMessageModel());
SelectorType selectorType = SelectorType.valueOf(consumerConfig.getSelectorType());
String selectorExpression = consumerConfig.getSelectorExpression();
String ak = consumerConfig.getAccessKey();
String sk = consumerConfig.getSecretKey();
int pullBatchSize = consumerConfig.getPullBatchSize();
boolean useTLS = consumerConfig.isTlsEnable();
DefaultLitePullConsumer litePullConsumer = RocketMQUtil.createDefaultLitePullConsumer(nameServer, accessChannel,
groupName, topicName, messageModel, selectorType, selectorExpression, ak, sk, pullBatchSize, useTLS);
litePullConsumer.setEnableMsgTrace(consumerConfig.isEnableMsgTrace());
litePullConsumer.setCustomizedTraceTopic(consumerConfig.getCustomizedTraceTopic());
litePullConsumer.setNamespace(consumerConfig.getNamespace());
litePullConsumer.setAllocateMessageQueueStrategy(new AllocateMessageQueueAveragely());
litePullConsumer.setAutoCommit(true);
return litePullConsumer;
} invoke poll method try {
if (CollUtil.isEmpty(PULL_CONSUMERS)) {
this.setRunning(true);
return;
}
for (DefaultLitePullConsumer pullConsumer : PULL_CONSUMERS) {
pullConsumer.start();
}
} catch (MQClientException e) {
throw new IllegalStateException("start pull fail", e);
} |
Beta Was this translation helpful? Give feedback.
0 replies
-
rocketmq-dashboard: |
Beta Was this translation helpful? Give feedback.
0 replies
-
lite pull consumer和push consumer,需要保证订阅关系的一致 |
Beta Was this translation helpful? Give feedback.
0 replies
Answer selected by
odbozhou
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
lite pull consumer和push consumer,需要保证订阅关系的一致