Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(plugin-server): add KAFKA_CONSUMPTION_RDKAFKA_COOPERATIVE_REBALANCE which defaults to true, fix metrics for eager #17406

Merged
merged 2 commits into from
Sep 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions plugin-server/src/config/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ export function getDefaultConfig(): PluginsServerConfig {
KAFKA_SASL_PASSWORD: undefined,
KAFKA_CLIENT_RACK: undefined,
KAFKA_CONSUMPTION_USE_RDKAFKA: false, // Transitional setting, ignored for consumers that only support one library
KAFKA_CONSUMPTION_RDKAFKA_COOPERATIVE_REBALANCE: true, // If true, use the cooperative rebalance strategy, otherwise uses the default ('range,roundrobin')
KAFKA_CONSUMPTION_MAX_BYTES: 10_485_760, // Default value for kafkajs
KAFKA_CONSUMPTION_MAX_BYTES_PER_PARTITION: 1_048_576, // Default value for kafkajs, must be bigger than message size
KAFKA_CONSUMPTION_MAX_WAIT_MS: 1_000, // Down from the 5s default for kafkajs
Expand Down
6 changes: 4 additions & 2 deletions plugin-server/src/kafka/batch-consumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ export const startBatchConsumer = async ({
topicCreationTimeoutMs,
eachBatch,
autoCommit = true,
cooperativeRebalance = true,
queuedMinMessages = 100000,
}: {
connectionConfig: GlobalConfig
Expand All @@ -48,6 +49,7 @@ export const startBatchConsumer = async ({
topicCreationTimeoutMs: number
eachBatch: (messages: Message[]) => Promise<void>
autoCommit?: boolean
cooperativeRebalance?: boolean
queuedMinMessages?: number
}): Promise<BatchConsumer> => {
// Starts consuming from `topic` in batches of `fetchBatchSize` messages,
Expand Down Expand Up @@ -113,12 +115,12 @@ export const startBatchConsumer = async ({
// https://www.confluent.io/en-gb/blog/incremental-cooperative-rebalancing-in-kafka/
// for details on the advantages of this rebalancing strategy as well as
// how it works.
'partition.assignment.strategy': 'cooperative-sticky',
'partition.assignment.strategy': cooperativeRebalance ? 'cooperative-sticky' : 'range,roundrobin',
rebalance_cb: true,
offset_commit_cb: true,
})

instrumentConsumerMetrics(consumer, groupId)
instrumentConsumerMetrics(consumer, groupId, cooperativeRebalance)

let isShuttingDown = false
let lastLoopTime = Date.now()
Expand Down
26 changes: 19 additions & 7 deletions plugin-server/src/kafka/consumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,17 +59,20 @@ export const createKafkaConsumer = async (config: ConsumerGlobalConfig) => {
export function countPartitionsPerTopic(assignments: Assignment[]): Map<string, number> {
const partitionsPerTopic = new Map()
for (const assignment of assignments) {
if (assignment.topic in partitionsPerTopic) {
if (partitionsPerTopic.has(assignment.topic)) {
partitionsPerTopic.set(assignment.topic, partitionsPerTopic.get(assignment.topic) + 1)
} else {
partitionsPerTopic.set(assignment.topic, 1)
}
}

return partitionsPerTopic
}

export const instrumentConsumerMetrics = (consumer: RdKafkaConsumer, groupId: string) => {
export const instrumentConsumerMetrics = (
consumer: RdKafkaConsumer,
groupId: string,
cooperativeRebalance: boolean
) => {
// For each message consumed, we record the latest timestamp processed for
// each partition assigned to this consumer group member. This consumer
// should only provide metrics for the partitions that are assigned to it,
Expand All @@ -94,6 +97,7 @@ export const instrumentConsumerMetrics = (consumer: RdKafkaConsumer, groupId: st
//
// TODO: add other relevant metrics here
// TODO: expose the internal librdkafka metrics as well.
const strategyString = cooperativeRebalance ? 'cooperative' : 'eager'
consumer.on('rebalance', (error: LibrdKafkaError, assignments: TopicPartition[]) => {
/**
* see https://github.com/Blizzard/node-rdkafka#rebalancing errors are used to signal
Expand All @@ -103,14 +107,22 @@ export const instrumentConsumerMetrics = (consumer: RdKafkaConsumer, groupId: st
* And when the balancing is completed the new assignments are received with ERR__ASSIGN_PARTITIONS
*/
if (error.code === CODES.ERRORS.ERR__ASSIGN_PARTITIONS) {
status.info('📝️', 'librdkafka rebalance, partitions assigned', { assignments })
status.info('📝️', `librdkafka ${strategyString} rebalance, partitions assigned`, { assignments })
for (const [topic, count] of countPartitionsPerTopic(assignments)) {
kafkaRebalancePartitionCount.labels({ topic: topic }).inc(count)
if (cooperativeRebalance) {
kafkaRebalancePartitionCount.labels({ topic: topic }).inc(count)
} else {
kafkaRebalancePartitionCount.labels({ topic: topic }).set(count)
}
}
} else if (error.code === CODES.ERRORS.ERR__REVOKE_PARTITIONS) {
status.info('📝️', 'librdkafka rebalance started, partitions revoked', { assignments })
status.info('📝️', `librdkafka ${strategyString} rebalance started, partitions revoked`, { assignments })
for (const [topic, count] of countPartitionsPerTopic(assignments)) {
kafkaRebalancePartitionCount.labels({ topic: topic }).dec(count)
if (cooperativeRebalance) {
kafkaRebalancePartitionCount.labels({ topic: topic }).dec(count)
} else {
kafkaRebalancePartitionCount.labels({ topic: topic }).set(count)
}
}
} else {
// We had a "real" error
Expand Down
1 change: 1 addition & 0 deletions plugin-server/src/main/ingestion-queues/kafka-queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,7 @@ export class IngestionConsumer {
consumerMaxWaitMs: this.pluginsServer.KAFKA_CONSUMPTION_MAX_WAIT_MS,
fetchBatchSize: 500,
topicCreationTimeoutMs: this.pluginsServer.KAFKA_TOPIC_CREATION_TIMEOUT_MS,
cooperativeRebalance: this.pluginsServer.KAFKA_CONSUMPTION_RDKAFKA_COOPERATIVE_REBALANCE,
eachBatch: (payload) => this.eachBatchConsumer(payload),
})
this.consumerReady = true
Expand Down
1 change: 1 addition & 0 deletions plugin-server/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ export interface PluginsServerConfig {
KAFKA_SASL_PASSWORD: string | undefined
KAFKA_CLIENT_RACK: string | undefined
KAFKA_CONSUMPTION_USE_RDKAFKA: boolean
KAFKA_CONSUMPTION_RDKAFKA_COOPERATIVE_REBALANCE: boolean
KAFKA_CONSUMPTION_MAX_BYTES: number
KAFKA_CONSUMPTION_MAX_BYTES_PER_PARTITION: number
KAFKA_CONSUMPTION_MAX_WAIT_MS: number // fetch.wait.max.ms rdkafka parameter
Expand Down
22 changes: 22 additions & 0 deletions plugin-server/tests/main/ingestion-queues/kafka-queue.test.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
import { Assignment } from 'node-rdkafka-acosom'

import { KAFKA_EVENTS_PLUGIN_INGESTION } from '../../../src/config/kafka-topics'
import { countPartitionsPerTopic } from '../../../src/kafka/consumer'
import { ServerInstance, startPluginsServer } from '../../../src/main/pluginsServer'
import { LogLevel, PluginsServerConfig } from '../../../src/types'
import { Hub } from '../../../src/types'
Expand Down Expand Up @@ -79,3 +82,22 @@ describe.skip('IngestionConsumer', () => {
expect(bufferCalls.length).toEqual(1)
})
})

describe('countPartitionsPerTopic', () => {
it('should correctly count the number of partitions per topic', () => {
const assignments: Assignment[] = [
{ topic: 'topic1', partition: 0 },
{ topic: 'topic1', partition: 1 },
{ topic: 'topic2', partition: 0 },
{ topic: 'topic2', partition: 1 },
{ topic: 'topic2', partition: 2 },
{ topic: 'topic3', partition: 0 },
]

const result = countPartitionsPerTopic(assignments)
expect(result.get('topic1')).toBe(2)
expect(result.get('topic2')).toBe(3)
expect(result.get('topic3')).toBe(1)
expect(result.size).toBe(3)
})
})