Skip to content

Commit

Permalink
feat: Support dead letter topic. (#335)
Browse files Browse the repository at this point in the history
  • Loading branch information
shibd authored Jun 26, 2023
1 parent 1e51f5a commit 6457aef
Show file tree
Hide file tree
Showing 3 changed files with 101 additions and 0 deletions.
7 changes: 7 additions & 0 deletions index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ export interface ConsumerConfig {
schema?: SchemaInfo;
batchIndexAckEnabled?: boolean;
regexSubscriptionMode?: RegexSubscriptionMode;
deadLetterPolicy?: DeadLetterPolicy;
}

export class Consumer {
Expand Down Expand Up @@ -174,6 +175,12 @@ export interface SchemaInfo {
properties?: Record<string, string>;
}

export interface DeadLetterPolicy {
deadLetterTopic: string;
maxRedeliverCount?: number;
initialSubscriptionName?: string;
}

export class AuthenticationTls {
constructor(params: { certificatePath: string, privateKeyPath: string });
}
Expand Down
26 changes: 26 additions & 0 deletions src/ConsumerConfig.cc
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@ static const std::string CFG_MAX_PENDING_CHUNKED_MESSAGE = "maxPendingChunkedMes
static const std::string CFG_AUTO_ACK_OLDEST_CHUNKED_MESSAGE_ON_QUEUE_FULL =
"autoAckOldestChunkedMessageOnQueueFull";
static const std::string CFG_BATCH_INDEX_ACK_ENABLED = "batchIndexAckEnabled";
static const std::string CFG_DEAD_LETTER_POLICY = "deadLetterPolicy";
static const std::string CFG_DLQ_POLICY_TOPIC = "deadLetterTopic";
static const std::string CFG_DLQ_POLICY_MAX_REDELIVER_COUNT = "maxRedeliverCount";
static const std::string CFG_DLQ_POLICY_INIT_SUB_NAME = "initialSubscriptionName";

static const std::map<std::string, pulsar_consumer_type> SUBSCRIPTION_TYPE = {
{"Exclusive", pulsar_ConsumerExclusive},
Expand Down Expand Up @@ -239,6 +243,28 @@ ConsumerConfig::ConsumerConfig(const Napi::Object &consumerConfig, pulsar_messag
pulsar_consumer_configuration_set_batch_index_ack_enabled(this->cConsumerConfig.get(),
batchIndexAckEnabled);
}

if (consumerConfig.Has(CFG_DEAD_LETTER_POLICY) && consumerConfig.Get(CFG_DEAD_LETTER_POLICY).IsObject()) {
pulsar_consumer_config_dead_letter_policy_t dlq_policy{};
Napi::Object dlqPolicyObject = consumerConfig.Get(CFG_DEAD_LETTER_POLICY).ToObject();
std::string dlq_topic_str;
std::string init_subscription_name;
if (dlqPolicyObject.Has(CFG_DLQ_POLICY_TOPIC) && dlqPolicyObject.Get(CFG_DLQ_POLICY_TOPIC).IsString()) {
dlq_topic_str = dlqPolicyObject.Get(CFG_DLQ_POLICY_TOPIC).ToString().Utf8Value();
dlq_policy.dead_letter_topic = dlq_topic_str.c_str();
}
if (dlqPolicyObject.Has(CFG_DLQ_POLICY_MAX_REDELIVER_COUNT) &&
dlqPolicyObject.Get(CFG_DLQ_POLICY_MAX_REDELIVER_COUNT).IsNumber()) {
dlq_policy.max_redeliver_count =
dlqPolicyObject.Get(CFG_DLQ_POLICY_MAX_REDELIVER_COUNT).ToNumber().Int32Value();
}
if (dlqPolicyObject.Has(CFG_DLQ_POLICY_INIT_SUB_NAME) &&
dlqPolicyObject.Get(CFG_DLQ_POLICY_INIT_SUB_NAME).IsString()) {
init_subscription_name = dlqPolicyObject.Get(CFG_DLQ_POLICY_INIT_SUB_NAME).ToString().Utf8Value();
dlq_policy.initial_subscription_name = init_subscription_name.c_str();
}
pulsar_consumer_configuration_set_dlq_policy(this->cConsumerConfig.get(), &dlq_policy);
}
}

ConsumerConfig::~ConsumerConfig() {
Expand Down
68 changes: 68 additions & 0 deletions tests/consumer.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,74 @@ const Pulsar = require('../index.js');
await producer4.close();
await consumer.close();
});

test('Dead Letter topic', async () => {
const topicName = 'test-dead_letter_topic';
const dlqTopicName = 'test-dead_letter_topic_customize';
const producer = await client.createProducer({
topic: topicName,
});

const maxRedeliverCountNum = 3;
const consumer = await client.subscribe({
topic: topicName,
subscription: 'sub-1',
subscriptionType: 'Shared',
deadLetterPolicy: {
deadLetterTopic: dlqTopicName,
maxRedeliverCount: maxRedeliverCountNum,
initialSubscriptionName: 'init-sub-1-dlq',
},
nAckRedeliverTimeoutMs: 50,
});

// Send messages.
const sendNum = 5;
const messages = [];
for (let i = 0; i < sendNum; i += 1) {
const msg = `my-message-${i}`;
await producer.send({ data: Buffer.from(msg) });
messages.push(msg);
}

// Redelivery all messages maxRedeliverCountNum time.
let results = [];
for (let i = 1; i <= maxRedeliverCountNum * sendNum + sendNum; i += 1) {
const msg = await consumer.receive();
results.push(msg);
if (i % sendNum === 0) {
results.forEach((message) => {
console.log(`Redeliver message ${message.getData().toString()} ${i} times ${message.getRedeliveryCount()} redeliver Count`);
consumer.negativeAcknowledge(message);
});
results = [];
}
}
// assert no more msgs.
await expect(consumer.receive(100)).rejects.toThrow(
'Failed to receive message: TimeOut',
);

const dlqConsumer = await client.subscribe({
topic: dlqTopicName,
subscription: 'sub-1',
});
const dlqResult = [];
for (let i = 0; i < sendNum; i += 1) {
const msg = await dlqConsumer.receive();
dlqResult.push(msg.getData().toString());
}
expect(dlqResult).toEqual(messages);

// assert no more msgs.
await expect(dlqConsumer.receive(500)).rejects.toThrow(
'Failed to receive message: TimeOut',
);

producer.close();
consumer.close();
dlqConsumer.close();
});
});
});
})();

0 comments on commit 6457aef

Please sign in to comment.