Skip to content

Add producer purge method with optional blocking argument #548

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 14 commits into from
Oct 20, 2020

Conversation

peteryin21
Copy link
Contributor

@peteryin21 peteryin21 commented Feb 27, 2019

Background

Currently, if a producer times out (i.e when a broker goes down), messages are added to an internal queue to be published. The number of remaining messages is returned via the flush() command.
In use cases where duplicate messages might be highly undesirable (e.g sending task requests to a factory), it would be useful to purge failed messages from the internal queue so they are not unintentionally sent the next time flush() is called with the brokers back online.

Description

Luckily, librdkafka exposes a purge api that purges messages in internal queues. This PR adds a binding for this functionality to this library so we can access it via the python Producer class. There are three strategies of purging that are defined by the PurgeStrategy Enum that can be passed in as first positional argument.

  • PURGE_INTERNAL_QUEUES: Purge messages from internal queues.
  • PURGE_INFLIGHT: Purge messages in flight to or from the broker.
  • PURGE_ALL: Purge messages from internal queues and those in flight.
    By default, the method will block, but if the optional argument blocking is set to False, we will not wait on the background thread queue.

In addition, the user will need to call poll() or flush() after purge() to serve the delivery report callbacks of the purged messages. I conservatively decided to leave it out, but am open to discussion for how to best make this intuitive to the user.

@ghost
Copy link

ghost commented Feb 27, 2019

It looks like @peteryin21 hasn't signed our Contributor License Agreement, yet.

The purpose of a CLA is to ensure that the guardian of a project's outputs has the necessary ownership or grants of rights over all contributions to allow them to distribute under the chosen licence.
Wikipedia

You can read and sign our full Contributor License Agreement here.

Once you've signed reply with [clabot:check] to prove it.

Appreciation of efforts,

clabot

if (!PyArg_ParseTupleAndKeywords(args, kwargs, "|p", kws, &blocking))
return NULL;
if (blocking==1)
rd_kafka_purge(rk, RD_KAFKA_PURGE_F_QUEUE)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make sure to handle any errors which may be raised from the execution of rd_kafka_purge

https://github.com/edenhill/librdkafka/blob/master/src/rdkafka.h#L3478-L3482

Copy link
Contributor

@rnpridgeon rnpridgeon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall it looks like you are the right track. We need to add error handling and we should expose the full purge implementation including the ability to purge in flight request.

@peteryin21
Copy link
Contributor Author

[clabot:check]

@ghost
Copy link

ghost commented Mar 6, 2019

@confluentinc It looks like @peteryin21 just signed our Contributor License Agreement. 👍

Always at your service,

clabot

@peteryin21
Copy link
Contributor Author

peteryin21 commented Mar 6, 2019

Thanks for the review @rnpridgeon! I've made the requested changes and fixed the build errors. Let me know how it looks.

Also, I think it would be worth adding some tests, but wanted to get your thoughts. I was thinking of adding a def test_purge to test_Producer.py and adding purge() usage to this test in integration_test.py. What do you think?

@rnpridgeon
Copy link
Contributor

Sorry I didn't get back to this in time. At any rate I believe the Transactional Producer API is better suited for this. I'll close this for now but if we deem the Transactional API to not be sufficient we can revisit.

@rnpridgeon rnpridgeon closed this Apr 7, 2020
@edenhill
Copy link
Contributor

edenhill commented Apr 8, 2020

purge() is useful for other purposes, for instance when terminating and the application wants to know the exact status of each message delivery without having to wait the full message.timeout.ms on flush.

E.g., something like this:

# terminate

p.flush(5)  # wait for messages to be delivered, but much shorter than message.timeout.ms

# purge any remaining messages from queues/inflight
p.purge(in_queue=True, in_flight=True)

p.flush(0)  # get delivery reports for the purged messages

sys.exit(0)

I think we should reconsider adding this functionality since Peter has put the effort in to implement it.

@peteryin21
Copy link
Contributor Author

@edenhill @rnpridgeon It's been a while! I will look more into the transactional producer api. In the meantime, if purge() would still be a useful method to expose, then I'm happy to try and get this across the finish line. Let me know what the remaining steps are before we can merge this.

Copy link
Contributor

@edenhill edenhill left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good work!

I think we can simplify the implementation and the API by not exposing the enums/flags and simply have bool kwargs.

@@ -35,6 +38,15 @@ class ConfigSource(Enum):
DEFAULT_CONFIG = CONFIG_SOURCE_DEFAULT_CONFIG #:


class PurgeStrategy(Enum):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While this technically maps nicely to librdkafka, I think from a user-experience it might be better to simply go with optional boolean kwargs: purge(in_queue=True, in_flight=True, blocking=True) (those are the defaults)

@@ -398,6 +398,34 @@ static PyObject *Producer_flush (Handle *self, PyObject *args,
return cfl_PyInt_FromInt(qlen);
}


static void *Producer_purge (Handle *self, PyObject *args,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know the source is messed up tab/space-wise, but please try to use 8-space indents in new code, no tabs.
Thank you!

if (err == RD_KAFKA_RESP_ERR_NO_ERROR) {
Py_RETURN_NONE;
}
if (err == RD_KAFKA_RESP_ERR__INVALID_ARG) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think the specific error code handling is needed here, it is very unlikely to happen, so to keep it simple simply do cfl_PyErr_format(err, "%s", rd_kafka_err2str(err))

@edenhill
Copy link
Contributor

edenhill commented May 8, 2020

It also needs a unit-test (tests/test_Producer.py)

@rnpridgeon rnpridgeon reopened this May 14, 2020
@peteryin21 peteryin21 force-pushed the add_purge_to_producer branch from 0d008a1 to 8dcad18 Compare June 16, 2020 21:37
@peteryin21
Copy link
Contributor Author

@edenhill Thanks for the comments. I've addressed them and also added a new test for purge in test_Producer.py. It seems to work! The things I don't think I can test functionally are the blocking and in_flight flags, as well as the error, since these will probably require integration testing. Let me know what you think

@peteryin21
Copy link
Contributor Author

@edenhill Ping in case this fell off your radar :)

@peteryin21 peteryin21 requested a review from edenhill July 7, 2020 21:16
@peteryin21
Copy link
Contributor Author

Looks like there are some issues with prepare-osx.sh that are also being experienced in recent PRs #909. I can rebase this branch on top of the fix once that's figured out.

Copy link
Contributor

@edenhill edenhill left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

@edenhill
Copy link
Contributor

edenhill commented Jul 7, 2020

Let's get #909 merged first, then we'll restart your CI jobs to make sure they pass before merging.

@peteryin21
Copy link
Contributor Author

@edenhill It looks like #909 is merged -- would you be able to restart the CI jobs and then merge?

I'm also curious what the release schedule looks like, and what the timeline for releasing this change might look like?

@peteryin21
Copy link
Contributor Author

Hey @edenhill, would it be possible to restart the CI jobs and then merge if it looks good?

@peteryin21
Copy link
Contributor Author

@edenhill Can we get this PR merged?

@edenhill edenhill merged commit baee8a6 into confluentinc:master Oct 20, 2020
@edenhill
Copy link
Contributor

Thanks for this, @peteryin21 !

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants