-
Notifications
You must be signed in to change notification settings - Fork 148
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Reuse Kafka consumer for polling next batch #225
Comments
Sure, caching recent consumers seems reasonable to me. Is this a large
performance cost today? Roughly how much of our time do we spend creating
a new consumer?
…On Wed, Feb 27, 2019 at 5:11 PM Satish Kumar Matti ***@***.***> wrote:
I would like to propose an idea of having cached consumers for
FromKafkaBatched class and reuse consumers for getting next batch.
The current FromKafkaBatched class create a new Kafka consumer for a every
new batch which may introduce a lot of overhead. I have looked into Spark
integration with Kafka and want to reference the following text from there
<https://spark.apache.org/docs/2.2.0/streaming-kafka-0-10-integration.html#locationstrategies>
,
The new Kafka consumer API will pre-fetch messages into buffers. Therefore it is important for performance reasons that the Spark integration keep cached consumers on executors (rather than recreating them for each batch), and prefer to schedule partitions on the host locations that have the appropriate consumers.
This means FromKafkaBatched class holds the references to consumers
created initially, and reuse them to poll next batch. Also, this makes
sense from Kafka point of view as it can have as many concurrent consumers
as the number of partitions in a topic.
But I am not sure how this can be handled when stream is running in dask
mode. Dask scheduler should be able to redirect request for new batches to
appropriate consumer on dask workers.
—
You are receiving this because you are subscribed to this thread.
Reply to this email directly, view it on GitHub
<#225>, or mute the thread
<https://github.com/notifications/unsubscribe-auth/AASszFNy9bdtd3y86TOAqRDzHG2T6I6kks5vRyzYgaJpZM4bVytC>
.
|
Creating a consumer itself may not be attributed to a lot of this large performance cost, but assigning to a particular partition and seeking to an offset may be. I have not tested in streamz set up, when I tested outside it took a few seconds to poll the first message and only a few milliseconds to poll subsequent messages. But the trick here is to schedule jobs corresponding to a partition on same dask worker because the consumer on this worker is assigned and seeked to the last polled offset already. |
@jsmaupin I would be happy to hear your feedback on this. |
Every time a client is created, be it producer or consumer, there is a process where it fetches the topic metadata and the initiates new connections to each node using the info in the meta-data. If this is what we're talking about here, then creating a consumer comes with a fair amount of overhead. |
I would like to propose an idea of having cached consumers for FromKafkaBatched class and reuse consumers for getting next batch.
The current FromKafkaBatched class create a new Kafka consumer for a every new batch which may introduce a lot of overhead. I have looked into Spark integration with Kafka and want to reference the following text from there,
This means FromKafkaBatched class holds the references to consumers created initially, and reuse them to poll next batch. Also, this makes sense from Kafka point of view as it can have as many concurrent consumers as the number of partitions in a topic.
But I am not sure how this can be handled when stream is running in dask mode. Dask scheduler should be able to redirect request for new batches to appropriate consumer on dask workers.
The text was updated successfully, but these errors were encountered: