diff --git a/index.d.ts b/index.d.ts index bd471469..4e610b25 100644 --- a/index.d.ts +++ b/index.d.ts @@ -97,6 +97,7 @@ export interface ConsumerConfig { schema?: SchemaInfo; batchIndexAckEnabled?: boolean; regexSubscriptionMode?: RegexSubscriptionMode; + deadLetterPolicy?: DeadLetterPolicy; } export class Consumer { @@ -174,6 +175,12 @@ export interface SchemaInfo { properties?: Record; } +export interface DeadLetterPolicy { + deadLetterTopic: string; + maxRedeliverCount?: number; + initialSubscriptionName?: string; +} + export class AuthenticationTls { constructor(params: { certificatePath: string, privateKeyPath: string }); } diff --git a/src/ConsumerConfig.cc b/src/ConsumerConfig.cc index be646aee..2758649d 100644 --- a/src/ConsumerConfig.cc +++ b/src/ConsumerConfig.cc @@ -47,6 +47,10 @@ static const std::string CFG_MAX_PENDING_CHUNKED_MESSAGE = "maxPendingChunkedMes static const std::string CFG_AUTO_ACK_OLDEST_CHUNKED_MESSAGE_ON_QUEUE_FULL = "autoAckOldestChunkedMessageOnQueueFull"; static const std::string CFG_BATCH_INDEX_ACK_ENABLED = "batchIndexAckEnabled"; +static const std::string CFG_DEAD_LETTER_POLICY = "deadLetterPolicy"; +static const std::string CFG_DLQ_POLICY_TOPIC = "deadLetterTopic"; +static const std::string CFG_DLQ_POLICY_MAX_REDELIVER_COUNT = "maxRedeliverCount"; +static const std::string CFG_DLQ_POLICY_INIT_SUB_NAME = "initialSubscriptionName"; static const std::map SUBSCRIPTION_TYPE = { {"Exclusive", pulsar_ConsumerExclusive}, @@ -239,6 +243,28 @@ ConsumerConfig::ConsumerConfig(const Napi::Object &consumerConfig, pulsar_messag pulsar_consumer_configuration_set_batch_index_ack_enabled(this->cConsumerConfig.get(), batchIndexAckEnabled); } + + if (consumerConfig.Has(CFG_DEAD_LETTER_POLICY) && consumerConfig.Get(CFG_DEAD_LETTER_POLICY).IsObject()) { + pulsar_consumer_config_dead_letter_policy_t dlq_policy{}; + Napi::Object dlqPolicyObject = consumerConfig.Get(CFG_DEAD_LETTER_POLICY).ToObject(); + std::string dlq_topic_str; + std::string init_subscription_name; + if (dlqPolicyObject.Has(CFG_DLQ_POLICY_TOPIC) && dlqPolicyObject.Get(CFG_DLQ_POLICY_TOPIC).IsString()) { + dlq_topic_str = dlqPolicyObject.Get(CFG_DLQ_POLICY_TOPIC).ToString().Utf8Value(); + dlq_policy.dead_letter_topic = dlq_topic_str.c_str(); + } + if (dlqPolicyObject.Has(CFG_DLQ_POLICY_MAX_REDELIVER_COUNT) && + dlqPolicyObject.Get(CFG_DLQ_POLICY_MAX_REDELIVER_COUNT).IsNumber()) { + dlq_policy.max_redeliver_count = + dlqPolicyObject.Get(CFG_DLQ_POLICY_MAX_REDELIVER_COUNT).ToNumber().Int32Value(); + } + if (dlqPolicyObject.Has(CFG_DLQ_POLICY_INIT_SUB_NAME) && + dlqPolicyObject.Get(CFG_DLQ_POLICY_INIT_SUB_NAME).IsString()) { + init_subscription_name = dlqPolicyObject.Get(CFG_DLQ_POLICY_INIT_SUB_NAME).ToString().Utf8Value(); + dlq_policy.initial_subscription_name = init_subscription_name.c_str(); + } + pulsar_consumer_configuration_set_dlq_policy(this->cConsumerConfig.get(), &dlq_policy); + } } ConsumerConfig::~ConsumerConfig() { diff --git a/tests/consumer.test.js b/tests/consumer.test.js index d976cb24..2579d1bd 100644 --- a/tests/consumer.test.js +++ b/tests/consumer.test.js @@ -247,6 +247,74 @@ const Pulsar = require('../index.js'); await producer4.close(); await consumer.close(); }); + + test('Dead Letter topic', async () => { + const topicName = 'test-dead_letter_topic'; + const dlqTopicName = 'test-dead_letter_topic_customize'; + const producer = await client.createProducer({ + topic: topicName, + }); + + const maxRedeliverCountNum = 3; + const consumer = await client.subscribe({ + topic: topicName, + subscription: 'sub-1', + subscriptionType: 'Shared', + deadLetterPolicy: { + deadLetterTopic: dlqTopicName, + maxRedeliverCount: maxRedeliverCountNum, + initialSubscriptionName: 'init-sub-1-dlq', + }, + nAckRedeliverTimeoutMs: 50, + }); + + // Send messages. + const sendNum = 5; + const messages = []; + for (let i = 0; i < sendNum; i += 1) { + const msg = `my-message-${i}`; + await producer.send({ data: Buffer.from(msg) }); + messages.push(msg); + } + + // Redelivery all messages maxRedeliverCountNum time. + let results = []; + for (let i = 1; i <= maxRedeliverCountNum * sendNum + sendNum; i += 1) { + const msg = await consumer.receive(); + results.push(msg); + if (i % sendNum === 0) { + results.forEach((message) => { + console.log(`Redeliver message ${message.getData().toString()} ${i} times ${message.getRedeliveryCount()} redeliver Count`); + consumer.negativeAcknowledge(message); + }); + results = []; + } + } + // assert no more msgs. + await expect(consumer.receive(100)).rejects.toThrow( + 'Failed to receive message: TimeOut', + ); + + const dlqConsumer = await client.subscribe({ + topic: dlqTopicName, + subscription: 'sub-1', + }); + const dlqResult = []; + for (let i = 0; i < sendNum; i += 1) { + const msg = await dlqConsumer.receive(); + dlqResult.push(msg.getData().toString()); + } + expect(dlqResult).toEqual(messages); + + // assert no more msgs. + await expect(dlqConsumer.receive(500)).rejects.toThrow( + 'Failed to receive message: TimeOut', + ); + + producer.close(); + consumer.close(); + dlqConsumer.close(); + }); }); }); })();