-
-
Notifications
You must be signed in to change notification settings - Fork 541
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Update readme v1.0.0 mk #59
Conversation
README.md
Outdated
|
||
Some use cases can be optimized by dealing with batches rather than single messages. This handler will feed your function batches and some utility functions to give your code more flexibility. Be aware that using `eachBatch` is considered a more advanced use case since you will have to understand how session timeouts and heartbeats are connected. All resolved offsets will be automatically committed after the function is executed. | ||
In order to process huge volumes of messages in a responsive manner, you need consider the `eachBatch` API. Dealing with batches rather than single messages reduces the network traffic and the communication overhead with the broker, allowing your consumer group to eat away at your partition lag in orders of magnitudes faster than `eachMessage`. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This isn't really true though. eachMessage
is essentially a default implementation of eachBatch
. We still consume a batch at a time from the brokers, and the offset is only resolved, not committed, on each message.
Situations where you might want to use eachBatch
is where you need to do something with all messages at once. For example, maybe you want to send some data from each message to a remote batch API, so that for N messages you call the remote API once.
We actually do something similar, where we expose an HTTP endpoint to pause/resume. When I wrote the example, I just didn't want to make assumptions about how the pause/resume would be invoked, so invoking it via a Kafka message seemed appropriate. 😅 If you have an idea for a clearer example, I'm all for it. I just can't think of one that wouldn't involve more non-KafkaJS related cruft. For example, if we take the example of your dependency responding with 429: await consumer.connect()
await consumer.subscribe({ topic: 'jobs' })
await consumer.run({ eachMessage: async ({ topic, message }) => {
try {
await sendToDependency(message)
} catch (e) {
if (e instanceof TooManyRequestsError) {
consumer.pause([{ topic }])
setTimeout(() => consumer.resume([{ topic }]), e.retryAfter * 1000)
}
throw e
}
}}) |
@@ -116,9 +115,37 @@ new Kafka({ | |||
}) | |||
``` | |||
|
|||
#### <a name="setup-client-default-retry"></a> Default Retry | |||
### <a name="configuration-default-retry"></a> Default Retry |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like the detailed explanation but can we move it to the bottom? I think the beginning of the readme should provide a quick setup and the most common use case, detailed information such as the custom logger or how the retry mechanism works could live further down. WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
definately, good point, it clutters up too much
|
||
In order to pause and resume consuming from one or more topics, the `Consumer` provides the methods `pause` and `resume`. Note that pausing a topic means that it won't be fetched in the next cycle. You may still receive messages for the topic within the current batch. | ||
|
||
Calling `pause` with a topic that the consumer is not subscribed to is a no-op, calling `resume` with a topic that is not paused is also a no-op. | ||
|
||
Example: A situation where this could be useful is when an external dependency used by the consumer is under too much load. Here we want to `pause` consumption from a topic when this happens, and after a predefined interval we `resume` again: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
❤️
cd22f99
to
d591e37
Compare
The codec works for both consumers and producers; the consumer will detect that the message is compressed with some codec and it will use the appropriate
The
When we shut down the consumer, it will wait for the full batch to be processed before it exit, if you use
It's possible to optionally The reason for all of that is that we need to initialize the consumer before we invoke the seek operation and |
Comments for @tulios
eachBatch
resolveOffset
I understand that the default behaviour is that processed messages within a batch will be commited in case of errors, while the rest will remain unprocessed. But it is not clear to me why I would like to change this. What is the use case? It is only adding confusion for me.you don't need to await consumer#run
- can you elaborate why this is the case?