Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update readme v1.0.0 mk #59

Merged
merged 10 commits into from
Jun 4, 2018
Merged

Conversation

klippx
Copy link
Contributor

@klippx klippx commented May 16, 2018

Comments for @tulios

  • How does one implement a custom decoder for consuming?
  • Elaborate on the statement "It will also provide more utility functions to give your code more flexibility" for eachBatch
  • In resolveOffset I understand that the default behaviour is that processed messages within a batch will be commited in case of errors, while the rest will remain unprocessed. But it is not clear to me why I would like to change this. What is the use case? It is only adding confusion for me.
  • Seek: you don't need to await consumer#run - can you elaborate why this is the case?
  • Pause & Resume: I removed mentions of "unsupported by the library"
  • Pause & Resume: The example is very weird. Maybe its fine, but its not the way ppl use topics. Maybe use 1 topic and pause because of a dependency being down which seems more like a useful scenario?

README.md Outdated

Some use cases can be optimized by dealing with batches rather than single messages. This handler will feed your function batches and some utility functions to give your code more flexibility. Be aware that using `eachBatch` is considered a more advanced use case since you will have to understand how session timeouts and heartbeats are connected. All resolved offsets will be automatically committed after the function is executed.
In order to process huge volumes of messages in a responsive manner, you need consider the `eachBatch` API. Dealing with batches rather than single messages reduces the network traffic and the communication overhead with the broker, allowing your consumer group to eat away at your partition lag in orders of magnitudes faster than `eachMessage`.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't really true though. eachMessage is essentially a default implementation of eachBatch. We still consume a batch at a time from the brokers, and the offset is only resolved, not committed, on each message.

Situations where you might want to use eachBatch is where you need to do something with all messages at once. For example, maybe you want to send some data from each message to a remote batch API, so that for N messages you call the remote API once.

@Nevon
Copy link
Collaborator

Nevon commented May 16, 2018

The example is very weird. Maybe its fine, but its not the way ppl use topics. Maybe use 1 topic and pause because of a dependency being down which seems more like a useful scenario?

We actually do something similar, where we expose an HTTP endpoint to pause/resume. When I wrote the example, I just didn't want to make assumptions about how the pause/resume would be invoked, so invoking it via a Kafka message seemed appropriate. 😅

If you have an idea for a clearer example, I'm all for it. I just can't think of one that wouldn't involve more non-KafkaJS related cruft. For example, if we take the example of your dependency responding with 429:

await consumer.connect()

await consumer.subscribe({ topic: 'jobs' })

await consumer.run({ eachMessage: async ({ topic, message }) => {
  try {
    await sendToDependency(message)
  } catch (e) {
    if (e instanceof TooManyRequestsError) {
      consumer.pause([{ topic }])
      setTimeout(() => consumer.resume([{ topic }]), e.retryAfter * 1000)
    }

    throw e
  }
}})

@@ -116,9 +115,37 @@ new Kafka({
})
```

#### <a name="setup-client-default-retry"></a> Default Retry
### <a name="configuration-default-retry"></a> Default Retry
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the detailed explanation but can we move it to the bottom? I think the beginning of the readme should provide a quick setup and the most common use case, detailed information such as the custom logger or how the retry mechanism works could live further down. WDYT?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

definately, good point, it clutters up too much


In order to pause and resume consuming from one or more topics, the `Consumer` provides the methods `pause` and `resume`. Note that pausing a topic means that it won't be fetched in the next cycle. You may still receive messages for the topic within the current batch.

Calling `pause` with a topic that the consumer is not subscribed to is a no-op, calling `resume` with a topic that is not paused is also a no-op.

Example: A situation where this could be useful is when an external dependency used by the consumer is under too much load. Here we want to `pause` consumption from a topic when this happens, and after a predefined interval we `resume` again:
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❤️

@klippx klippx force-pushed the update-readme-v1.0.0-mk branch from cd22f99 to d591e37 Compare May 31, 2018 07:42
@tulios
Copy link
Owner

tulios commented Jun 4, 2018

@klippx

How does one implement a custom decoder for consuming?

The codec works for both consumers and producers; the consumer will detect that the message is compressed with some codec and it will use the appropriate decompress function do decompress. If the codec is implemented in KafkaJS, it'll just work.

Elaborate on the statement "It will also provide more utility functions to give your code more flexibility" for eachBatch

The eachBatch functions receive 3 helpers besides the batch: resolveOffset, heartbeat,
and isRunning.

In resolveOffset I understand that the default behavior is that processed messages within a batch will be committed in case of errors, while the rest will remain unprocessed. But it is not clear to me why I would like to change this. What is the use case? It is only adding confusion for me.

When we shut down the consumer, it will wait for the full batch to be processed before it exit, if you use isRunning to stop processing the messages the consumer will automatically commit the last offset, skipping the messages you didn't consume. If you make sure you call resolveOffset and you disable the auto-resolve, you can quickly shut down the consumer without losing/skipping any messages.

Seek: you don't need to await consumer#run - can you elaborate why this is the case?

It's possible to optionally await consumer.run, this will make sure your promise resolves when the consumer successfully receives its first batch, this isn't necessary since this operation will happen multiple times throughout the app lifecycle but it gives an extra check. If you call seek after waiting for run, it means that the consumer received the first batch and then performed the seek operation. To make sure your consumers start from the seek definition you have to call run without await.

The reason for all of that is that we need to initialize the consumer before we invoke the seek operation and run is the best place to initialize the consumer.

@tulios tulios merged commit 7be1ff0 into tulios:master Jun 4, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants