-
Notifications
You must be signed in to change notification settings - Fork 46
FAQs
- What is error_code X?
- What is
offsets_not_available_at_group_coordinator
? - How do I prevent data loss?
- How should I configure Kafunk to run across a WAN?
- How do I consumer faster?
- How do I produce faster?
- How does Kafunk handle failures?
The error event offsets_not_available_at_group_coordinator
indicates that the Kafka broker responsible for storing a consumer's offsets, doesn't have offsets for some or all of the partitions for a given topic, consumer group combination. This can happen if the consumer's autoOffsetReset strategy is set to AutoOffsetReset.Halt or AutoOffsetReset.TryStartFromCommittedOffsets and a) the consumer is running for the first time or b) the consumer hasn't committed offsets for longer than the offset retention period.
In either case, the consumer should be pointed to offsets explicitly using either Consumer.commitOffsets or Consumer.commitOffsetsToTime or the consumer should use AutoOffsetReset.StartFromTime. Note that with the latter, there is potential of message loss.
Additionally for scenario b), if it is possible that a consumer is offline for longer than the offset retention period, then the offset retention period can be extended, either by updating the server side configuration offsets.retention.minutes
or overriding via ConsumerConfig.offsetRetentionTime.
By default, Kafunk is configured to either prevent data loss or to fail fast in case a potential loss of data has been detected. The applicable Kafunk configuration points are:
-
ProducerConfig.requiredAcks = RequiredAcks.AllInSync: this setting works in conjunction with the server side
min.insync.replicas
setting to determine the number of replicas which acknowledge a produce request. By default, Kafunk requires all in-sync replicas (ISR) to acknowledge a produce request. By default, Kafka configuresmin.insync.replicas
= 1 so you may want to increase this setting. Note that this comes as the cost of increased produce latency since more synchronization is required. -
ConsumerConfig.autoOffsetReset = AutoOffsetReset.TryStartFromCommittedOffsets: this setting controls the behavior of the consumer when it doesn't have offsets to start from and when it detects an out of range offset from a fetch request. The former can happen if the consumer is running for the first time or if the consumer's offsets have been lost due to offset retention. The latter can happen if the consumer is falling behind the topic retention window, or a lagging broker has been elected as the leader for a partition.
-
ConsumerConfig.offsetRetentionTime: this setting overrides the server side offset retention setting. This can be used to extend the offset retention window to allow consumers to be offline for longer periods.
The applicable server side configuration points are:
min.insync.replicas
offsets.retention.minutes
offsets.topic.replication.factor
unclean.leader.election.enable
log.retention.*
Increased latencies across a WAN connection can warrant larger TCP window sizes. The applicable configuration points are:
- ChanConfig.receiveBufferSize controls the TCP receive buffer size.
- ChanConfig.sendBufferSize controls the TCP send buffer size.
The following consumer and producer configuration points are also applicable:
- ConsumerConfig.fetchMaxBytes controls the maximum amount of data Kafka will send for each partition of a fetch request. Increasing this value will increase throughput. Note however that setting this value too high may cause heartbeat requests to slow down beyond the session timeout period, causing the consumer instance to be ejected from the group.
- ProducerConfig.batchSizeBytes controls the maximum size of a batch of client side produce requests before being sent to the server. Increasing this value will increase throughput at the cost of latency.
- ProducerConfig.batchLingerMs controls the maximum amount of time to wait before sending a batch of client side produce requests to the server. Increasing this value will increase throughput at the cost of latency.
Note that the tradeoff being made is that of throughput versus latency. For some rules of thumb on tuning the TCP window with respect to network latency, look here.
See How should I configure Kafunk to run across a WAN?
See How should I configure Kafunk to run across a WAN?
In general, Kafunk handles failed operations by retrying using RetryPolicy. The applicable configuration points are:
- ChanConfig.requestRetryPolicy controls the retry policy for requests to an individual broker. This is helpful for recovering from transient network failures or timeouts, but it can't recover from cluster topology changes. See KafkaConfig.requestRetryPolicy bellow.
- ChanConfig.requestTimeout controls the maximum duration of requests to individual brokers after which the request is cancelled, making it available for retry.
- ChanConfig.connectRetryPolicy controls the retry policy for connection attempts to an individual Kafka broker. This is helpful for recovering from transient network failures or timeout, but it can't recover from cluster topology changes. See KafkaConfig.bootstrapConnectRetryPolicy bellow.
- ChanConfig.connectTimeout controls the maximum duration of a TCP connection attempt to an individual broker.
-
KafkaConfig.requestRetryPolicy controls the retry policy for all requests. This setting works in conjunction with ChanConfig.requestRetryPolicy to determine how a request is retried. The latter controls the retry policy with respect to an individual broker and defines the first tier of retries. These are meant to address transient network issues or timeouts, but they aren't about to address issues having to to with cluster topology changes. This is where
KafkaConfig.requestRetryPolicy
comes into play - it controls retries which consist of re-discovering the state of the cluster and retrying the operation on a potentially new broker. - KafkaConfig.bootstrapConnectRetryPolicy controls the retry policy for connecting to a bootstrap broker. This setting works in conjunction with ChanConfig.connectRetryPolicy to determine how connections are retries. The latter controls the connection retry policy with respect to an individual broker. The former controls the connection retries with respect to the bootstrap broker list.
A Kafunk consumer handles consumer group failures by rejoining the group. The applicable configuration points are:
- ConsumerConfig.sessionTimeout controls the time window during which a consumer must send heartbeats to the group coordinator, and where failure to do so results in a group rebalance.
-
ConsumerConfig.heartbeatFrequency controls the number of times heartbeats are sent during
sessionTimeout
.