-
Notifications
You must be signed in to change notification settings - Fork 915
Add producer purge method with optional blocking argument #548
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add producer purge method with optional blocking argument #548
Conversation
It looks like @peteryin21 hasn't signed our Contributor License Agreement, yet.
You can read and sign our full Contributor License Agreement here. Once you've signed reply with Appreciation of efforts, clabot |
confluent_kafka/src/Producer.c
Outdated
if (!PyArg_ParseTupleAndKeywords(args, kwargs, "|p", kws, &blocking)) | ||
return NULL; | ||
if (blocking==1) | ||
rd_kafka_purge(rk, RD_KAFKA_PURGE_F_QUEUE) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make sure to handle any errors which may be raised from the execution of rd_kafka_purge
https://github.com/edenhill/librdkafka/blob/master/src/rdkafka.h#L3478-L3482
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall it looks like you are the right track. We need to add error handling and we should expose the full purge implementation including the ability to purge in flight request.
[clabot:check] |
@confluentinc It looks like @peteryin21 just signed our Contributor License Agreement. 👍 Always at your service, clabot |
Thanks for the review @rnpridgeon! I've made the requested changes and fixed the build errors. Let me know how it looks. Also, I think it would be worth adding some tests, but wanted to get your thoughts. I was thinking of adding a |
Sorry I didn't get back to this in time. At any rate I believe the Transactional Producer API is better suited for this. I'll close this for now but if we deem the Transactional API to not be sufficient we can revisit. |
purge() is useful for other purposes, for instance when terminating and the application wants to know the exact status of each message delivery without having to wait the full message.timeout.ms on flush. E.g., something like this: # terminate
p.flush(5) # wait for messages to be delivered, but much shorter than message.timeout.ms
# purge any remaining messages from queues/inflight
p.purge(in_queue=True, in_flight=True)
p.flush(0) # get delivery reports for the purged messages
sys.exit(0) I think we should reconsider adding this functionality since Peter has put the effort in to implement it. |
@edenhill @rnpridgeon It's been a while! I will look more into the transactional producer api. In the meantime, if |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good work!
I think we can simplify the implementation and the API by not exposing the enums/flags and simply have bool kwargs.
confluent_kafka/admin/__init__.py
Outdated
@@ -35,6 +38,15 @@ class ConfigSource(Enum): | |||
DEFAULT_CONFIG = CONFIG_SOURCE_DEFAULT_CONFIG #: | |||
|
|||
|
|||
class PurgeStrategy(Enum): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
While this technically maps nicely to librdkafka, I think from a user-experience it might be better to simply go with optional boolean kwargs: purge(in_queue=True, in_flight=True, blocking=True)
(those are the defaults)
@@ -398,6 +398,34 @@ static PyObject *Producer_flush (Handle *self, PyObject *args, | |||
return cfl_PyInt_FromInt(qlen); | |||
} | |||
|
|||
|
|||
static void *Producer_purge (Handle *self, PyObject *args, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I know the source is messed up tab/space-wise, but please try to use 8-space indents in new code, no tabs.
Thank you!
confluent_kafka/src/Producer.c
Outdated
if (err == RD_KAFKA_RESP_ERR_NO_ERROR) { | ||
Py_RETURN_NONE; | ||
} | ||
if (err == RD_KAFKA_RESP_ERR__INVALID_ARG) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think the specific error code handling is needed here, it is very unlikely to happen, so to keep it simple simply do cfl_PyErr_format(err, "%s", rd_kafka_err2str(err))
It also needs a unit-test (tests/test_Producer.py) |
0d008a1
to
8dcad18
Compare
@edenhill Thanks for the comments. I've addressed them and also added a new test for purge in |
@edenhill Ping in case this fell off your radar :) |
Looks like there are some issues with prepare-osx.sh that are also being experienced in recent PRs #909. I can rebase this branch on top of the fix once that's figured out. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM!
Let's get #909 merged first, then we'll restart your CI jobs to make sure they pass before merging. |
Hey @edenhill, would it be possible to restart the CI jobs and then merge if it looks good? |
@edenhill Can we get this PR merged? |
Thanks for this, @peteryin21 ! |
Background
Currently, if a producer times out (i.e when a broker goes down), messages are added to an internal queue to be published. The number of remaining messages is returned via the
flush()
command.In use cases where duplicate messages might be highly undesirable (e.g sending task requests to a factory), it would be useful to purge failed messages from the internal queue so they are not unintentionally sent the next time
flush()
is called with the brokers back online.Description
Luckily,
librdkafka
exposes apurge
api that purges messages in internal queues. This PR adds a binding for this functionality to this library so we can access it via the pythonProducer
class. There are three strategies of purging that are defined by thePurgeStrategy
Enum that can be passed in as first positional argument.PURGE_INTERNAL_QUEUES
: Purge messages from internal queues.PURGE_INFLIGHT
: Purge messages in flight to or from the broker.PURGE_ALL
: Purge messages from internal queues and those in flight.By default, the method will block, but if the optional argument
blocking
is set to False, we will not wait on the background thread queue.In addition, the user will need to call
poll()
orflush()
afterpurge()
to serve the delivery report callbacks of the purged messages. I conservatively decided to leave it out, but am open to discussion for how to best make this intuitive to the user.