Skip to content

Commit 36d6fd6

Browse files
authored
feat: add individual negative acknowledgement for async consumer (#282)
1 parent 6f047ee commit 36d6fd6

File tree

3 files changed

+67
-0
lines changed

3 files changed

+67
-0
lines changed

pulsar/asyncio.py

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -320,6 +320,31 @@ async def acknowledge_cumulative(
320320
)
321321
await future
322322

323+
async def negative_acknowledge(
324+
self,
325+
message: Union[pulsar.Message, pulsar.MessageId, _pulsar.Message, _pulsar.MessageId]
326+
) -> None:
327+
"""
328+
Acknowledge the failure to process a single message asynchronously.
329+
330+
When a message is "negatively acked" it will be marked for redelivery after
331+
some fixed delay. The delay is configurable when constructing the consumer
332+
with {@link ConsumerConfiguration#setNegativeAckRedeliveryDelayMs}.
333+
This call is not blocking.
334+
335+
Parameters
336+
----------
337+
message:
338+
The received message or message id.
339+
"""
340+
if isinstance(message, pulsar.Message):
341+
msg = message._message
342+
elif isinstance(message, pulsar.MessageId):
343+
msg = message._msg_id
344+
else:
345+
msg = message
346+
await asyncio.to_thread(self._consumer.negative_acknowledge, msg)
347+
323348
async def unsubscribe(self) -> None:
324349
"""
325350
Unsubscribe the current consumer from the topic asynchronously.

src/consumer.cc

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,16 @@ void Consumer_acknowledgeCumulativeAsync_message_id(Consumer& consumer, const Me
133133
consumer.acknowledgeCumulativeAsync(msgId, callback);
134134
}
135135

136+
void Consumer_negative_acknowledgeAsync(Consumer& consumer, const Message& msg, ResultCallback callback) {
137+
py::gil_scoped_release release;
138+
consumer.negativeAcknowledge(msg);
139+
}
140+
141+
void Consumer_negative_acknowledgeAsync_message_id(Consumer& consumer, const MessageId& msgId, ResultCallback callback) {
142+
py::gil_scoped_release release;
143+
consumer.negativeAcknowledge(msgId);
144+
}
145+
136146
void Consumer_closeAsync(Consumer& consumer, ResultCallback callback) {
137147
py::gil_scoped_release release;
138148
consumer.closeAsync(callback);
@@ -183,6 +193,8 @@ void export_consumer(py::module_& m) {
183193
.def("acknowledge_async", &Consumer_acknowledgeAsync_message_id)
184194
.def("acknowledge_cumulative_async", &Consumer_acknowledgeCumulativeAsync)
185195
.def("acknowledge_cumulative_async", &Consumer_acknowledgeCumulativeAsync_message_id)
196+
.def("negative_acknowledge_async", &Consumer_negative_acknowledgeAsync)
197+
.def("negative_acknowledge_async", &Consumer_negative_acknowledgeAsync_message_id)
186198
.def("close_async", &Consumer_closeAsync)
187199
.def("unsubscribe_async", &Consumer_unsubscribeAsync)
188200
.def("seek_async", &Consumer_seekAsync)

tests/asyncio_test.py

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -203,6 +203,36 @@ async def test_consumer_individual_acknowledge(self):
203203
msg = await consumer.receive()
204204
self.assertEqual(msg.data(), b'msg-3')
205205

206+
async def test_consumer_negative_acknowledge(self):
207+
topic = f'asyncio-test-consumer-negative-ack-{time.time()}'
208+
sub = 'sub'
209+
consumer = await self._client.subscribe(topic, sub,
210+
consumer_type=pulsar.ConsumerType.Shared,
211+
negative_ack_redelivery_delay_ms=100)
212+
213+
producer = await self._client.create_producer(topic)
214+
await self._prepare_messages(producer)
215+
msgs = []
216+
for _ in range(5):
217+
msg = await consumer.receive()
218+
msgs.append(msg)
219+
220+
await consumer.acknowledge(msgs[1])
221+
await consumer.acknowledge(msgs[3])
222+
223+
await consumer.negative_acknowledge(msgs[0])
224+
await consumer.negative_acknowledge(msgs[2])
225+
await consumer.negative_acknowledge(msgs[4])
226+
await asyncio.sleep(0.2)
227+
228+
received = []
229+
for _ in range(3):
230+
msg = await consumer.receive()
231+
received.append(msg.data())
232+
233+
self.assertEqual(sorted(received), [b'msg-0', b'msg-2', b'msg-4'])
234+
await consumer.close()
235+
206236
async def test_multi_topic_consumer(self):
207237
topics = ['asyncio-test-multi-topic-1', 'asyncio-test-multi-topic-2']
208238
producers = []

0 commit comments

Comments
 (0)