Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reuse Kafka consumer for polling next batch #225

Open
skmatti opened this issue Feb 28, 2019 · 4 comments
Open

Reuse Kafka consumer for polling next batch #225

skmatti opened this issue Feb 28, 2019 · 4 comments

Comments

@skmatti
Copy link
Contributor

skmatti commented Feb 28, 2019

I would like to propose an idea of having cached consumers for FromKafkaBatched class and reuse consumers for getting next batch.

The current FromKafkaBatched class create a new Kafka consumer for a every new batch which may introduce a lot of overhead. I have looked into Spark integration with Kafka and want to reference the following text from there,

The new Kafka consumer API will pre-fetch messages into buffers. Therefore it is important for performance reasons that the Spark integration keep cached consumers on executors (rather than recreating them for each batch), and prefer to schedule partitions on the host locations that have the appropriate consumers.

This means FromKafkaBatched class holds the references to consumers created initially, and reuse them to poll next batch. Also, this makes sense from Kafka point of view as it can have as many concurrent consumers as the number of partitions in a topic.

But I am not sure how this can be handled when stream is running in dask mode. Dask scheduler should be able to redirect request for new batches to appropriate consumer on dask workers.

@mrocklin
Copy link
Collaborator

mrocklin commented Feb 28, 2019 via email

@skmatti
Copy link
Contributor Author

skmatti commented Feb 28, 2019

Creating a consumer itself may not be attributed to a lot of this large performance cost, but assigning to a particular partition and seeking to an offset may be. I have not tested in streamz set up, when I tested outside it took a few seconds to poll the first message and only a few milliseconds to poll subsequent messages. But the trick here is to schedule jobs corresponding to a partition on same dask worker because the consumer on this worker is assigned and seeked to the last polled offset already.

@skmatti
Copy link
Contributor Author

skmatti commented Feb 28, 2019

@jsmaupin I would be happy to hear your feedback on this.

@jsmaupin
Copy link
Contributor

jsmaupin commented Mar 1, 2019

Every time a client is created, be it producer or consumer, there is a process where it fetches the topic metadata and the initiates new connections to each node using the info in the meta-data. If this is what we're talking about here, then creating a consumer comes with a fair amount of overhead.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants