Add producer purge method with optional blocking argument#548
Conversation
|
It looks like Peter Yin (@peteryin21) hasn't signed our Contributor License Agreement, yet.
You can read and sign our full Contributor License Agreement here. Once you've signed reply with Appreciation of efforts, clabot |
confluent_kafka/src/Producer.c
Outdated
| if (!PyArg_ParseTupleAndKeywords(args, kwargs, "|p", kws, &blocking)) | ||
| return NULL; | ||
| if (blocking==1) | ||
| rd_kafka_purge(rk, RD_KAFKA_PURGE_F_QUEUE) |
There was a problem hiding this comment.
Make sure to handle any errors which may be raised from the execution of rd_kafka_purge
https://github.com/edenhill/librdkafka/blob/master/src/rdkafka.h#L3478-L3482
Ryan P (rnpridgeon)
left a comment
There was a problem hiding this comment.
Overall it looks like you are the right track. We need to add error handling and we should expose the full purge implementation including the ability to purge in flight request.
|
[clabot:check] |
|
Confluent Inc. (@confluentinc) It looks like Peter Yin (@peteryin21) just signed our Contributor License Agreement. 👍 Always at your service, clabot |
|
Thanks for the review Ryan P (@rnpridgeon)! I've made the requested changes and fixed the build errors. Let me know how it looks. Also, I think it would be worth adding some tests, but wanted to get your thoughts. I was thinking of adding a |
|
Sorry I didn't get back to this in time. At any rate I believe the Transactional Producer API is better suited for this. I'll close this for now but if we deem the Transactional API to not be sufficient we can revisit. |
|
purge() is useful for other purposes, for instance when terminating and the application wants to know the exact status of each message delivery without having to wait the full message.timeout.ms on flush. E.g., something like this: # terminate
p.flush(5) # wait for messages to be delivered, but much shorter than message.timeout.ms
# purge any remaining messages from queues/inflight
p.purge(in_queue=True, in_flight=True)
p.flush(0) # get delivery reports for the purged messages
sys.exit(0)I think we should reconsider adding this functionality since Peter has put the effort in to implement it. |
|
Magnus Edenhill (@edenhill) Ryan P (@rnpridgeon) It's been a while! I will look more into the transactional producer api. In the meantime, if |
Magnus Edenhill (edenhill)
left a comment
There was a problem hiding this comment.
Good work!
I think we can simplify the implementation and the API by not exposing the enums/flags and simply have bool kwargs.
confluent_kafka/admin/__init__.py
Outdated
| DEFAULT_CONFIG = CONFIG_SOURCE_DEFAULT_CONFIG #: | ||
|
|
||
|
|
||
| class PurgeStrategy(Enum): |
There was a problem hiding this comment.
While this technically maps nicely to librdkafka, I think from a user-experience it might be better to simply go with optional boolean kwargs: purge(in_queue=True, in_flight=True, blocking=True) (those are the defaults)
| } | ||
|
|
||
|
|
||
| static void *Producer_purge (Handle *self, PyObject *args, |
There was a problem hiding this comment.
I know the source is messed up tab/space-wise, but please try to use 8-space indents in new code, no tabs.
Thank you!
confluent_kafka/src/Producer.c
Outdated
| if (err == RD_KAFKA_RESP_ERR_NO_ERROR) { | ||
| Py_RETURN_NONE; | ||
| } | ||
| if (err == RD_KAFKA_RESP_ERR__INVALID_ARG) { |
There was a problem hiding this comment.
I don't think the specific error code handling is needed here, it is very unlikely to happen, so to keep it simple simply do cfl_PyErr_format(err, "%s", rd_kafka_err2str(err))
|
It also needs a unit-test (tests/test_Producer.py) |
0d008a1 to
8dcad18
Compare
|
Magnus Edenhill (@edenhill) Thanks for the comments. I've addressed them and also added a new test for purge in |
|
Magnus Edenhill (@edenhill) Ping in case this fell off your radar :) |
|
Looks like there are some issues with prepare-osx.sh that are also being experienced in recent PRs #909. I can rebase this branch on top of the fix once that's figured out. |
Magnus Edenhill (edenhill)
left a comment
There was a problem hiding this comment.
LGTM!
|
Let's get #909 merged first, then we'll restart your CI jobs to make sure they pass before merging. |
|
Magnus Edenhill (@edenhill) It looks like #909 is merged -- would you be able to restart the CI jobs and then merge? I'm also curious what the release schedule looks like, and what the timeline for releasing this change might look like? |
|
Hey Magnus Edenhill (@edenhill), would it be possible to restart the CI jobs and then merge if it looks good? |
|
Magnus Edenhill (@edenhill) Can we get this PR merged? |
|
Thanks for this, Peter Yin (@peteryin21) ! |
Background
Currently, if a producer times out (i.e when a broker goes down), messages are added to an internal queue to be published. The number of remaining messages is returned via the
flush()command.In use cases where duplicate messages might be highly undesirable (e.g sending task requests to a factory), it would be useful to purge failed messages from the internal queue so they are not unintentionally sent the next time
flush()is called with the brokers back online.Description
Luckily,
librdkafkaexposes apurgeapi that purges messages in internal queues. This PR adds a binding for this functionality to this library so we can access it via the pythonProducerclass. There are three strategies of purging that are defined by thePurgeStrategyEnum that can be passed in as first positional argument.PURGE_INTERNAL_QUEUES: Purge messages from internal queues.PURGE_INFLIGHT: Purge messages in flight to or from the broker.PURGE_ALL: Purge messages from internal queues and those in flight.By default, the method will block, but if the optional argument
blockingis set to False, we will not wait on the background thread queue.In addition, the user will need to call
poll()orflush()afterpurge()to serve the delivery report callbacks of the purged messages. I conservatively decided to leave it out, but am open to discussion for how to best make this intuitive to the user.