From d745707eb5c1522ec8beec972fb713929a8068ca Mon Sep 17 00:00:00 2001 From: Jinli Liang Date: Wed, 8 Dec 2021 19:41:59 +1100 Subject: [PATCH] allow kafka scaler to lag for all topics within a consumer group Signed-off-by: Jinli Liang --- CHANGELOG.md | 2 +- pkg/scalers/kafka_scaler.go | 164 ++++++++++++++++++------------- pkg/scalers/kafka_scaler_test.go | 6 +- tests/scalers/kafka.test.ts | 156 +++++++++++++++++++++++++++-- 4 files changed, 248 insertions(+), 80 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 1cf9803c9f1..2143d189ed8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -30,7 +30,7 @@ - Graphite Scaler: use the latest datapoint returned, not the earliest ([#2365](https://github.com/kedacore/keda/pull/2365)) - Kubernetes Workload Scaler: ignore terminated pods ([#2384](https://github.com/kedacore/keda/pull/2384)) - `keda-operator` Cluster Role: add `list` and `watch` access to service accounts ([#2406](https://github.com/kedacore/keda/pull/2406))|([#2410](https://github.com/kedacore/keda/pull/2410)) - +- Apache Kafka Scaler: allow flag `topic` to be optional, in such case, lag of all topics within the consumer group will be used for scaling ([#2409](https://github.com/kedacore/keda/pull/2409)) - TODO ([#XXX](https://github.com/kedacore/keda/pull/XXX)) ### Breaking Changes diff --git a/pkg/scalers/kafka_scaler.go b/pkg/scalers/kafka_scaler.go index 0007da4fc9d..2e491cc10c1 100644 --- a/pkg/scalers/kafka_scaler.go +++ b/pkg/scalers/kafka_scaler.go @@ -119,7 +119,9 @@ func parseKafkaMetadata(config *ScalerConfig) (kafkaMetadata, error) { case config.TriggerMetadata["topic"] != "": meta.topic = config.TriggerMetadata["topic"] default: - return meta, errors.New("no topic given") + meta.topic = "" + kafkaLog.V(1).Info(fmt.Sprintf("cosumer group %s has no topic specified, "+ + "will use all topics subscribed by the consumer group for scaling", meta.group)) } meta.offsetResetPolicy = defaultOffsetResetPolicy @@ -209,34 +211,35 @@ func parseKafkaMetadata(config *ScalerConfig) (kafkaMetadata, error) { // IsActive determines if we need to scale from zero func (s *kafkaScaler) IsActive(ctx context.Context) (bool, error) { - partitions, err := s.getPartitions() + topicPartitions, err := s.getTopicPartitions() if err != nil { return false, err } - offsets, err := s.getOffsets(partitions) + offsets, err := s.getOffsets(topicPartitions) if err != nil { return false, err } - topicOffsets, err := s.getTopicOffsets(partitions) + topicPartitionsOffsets, err := s.getTopicPartitionOffsets(topicPartitions) if err != nil { return false, err } - for _, partition := range partitions { - lag, err := s.getLagForPartition(partition, offsets, topicOffsets) - if err != nil && lag == invalidOffset { - return true, nil - } - kafkaLog.V(1).Info(fmt.Sprintf("Group %s has a lag of %d for topic %s and partition %d\n", s.metadata.group, lag, s.metadata.topic, partition)) + for topic, partitionsOffsets := range topicPartitionsOffsets { + for partition := range partitionsOffsets { + lag, err := s.getLagForPartition(topic, partition, offsets, partitionsOffsets) + if err != nil && lag == invalidOffset { + return true, nil + } + kafkaLog.V(1).Info(fmt.Sprintf("Group %s has a lag of %d for topic %s and partition %d\n", s.metadata.group, lag, topic, partition)) - // Return as soon as a lag was detected for any partition - if lag > 0 { - return true, nil + // Return as soon as a lag was detected for any partition + if lag > 0 { + return true, nil + } } } - return false, nil } @@ -289,29 +292,41 @@ func getKafkaClients(metadata kafkaMetadata) (sarama.Client, sarama.ClusterAdmin return client, admin, nil } -func (s *kafkaScaler) getPartitions() ([]int32, error) { - topicsMetadata, err := s.admin.DescribeTopics([]string{s.metadata.topic}) +func (s *kafkaScaler) getTopicPartitions() (map[string][]int32, error) { + var topicsToDescribe = make([]string, 0) + + // when no topic is specified, query to cg group to fetch all subscribed topics + if s.metadata.topic == "" { + listCGOffsetResponse, err := s.admin.ListConsumerGroupOffsets(s.metadata.group, nil) + if err != nil { + return nil, fmt.Errorf("error listing cg offset: %s", err) + } + for topicName := range listCGOffsetResponse.Blocks { + topicsToDescribe = append(topicsToDescribe, topicName) + } + } else { + topicsToDescribe = []string{s.metadata.topic} + } + + topicsMetadata, err := s.admin.DescribeTopics(topicsToDescribe) if err != nil { return nil, fmt.Errorf("error describing topics: %s", err) } - if len(topicsMetadata) != 1 { - return nil, fmt.Errorf("expected only 1 topic metadata, got %d", len(topicsMetadata)) - } - partitionMetadata := topicsMetadata[0].Partitions - partitions := make([]int32, len(partitionMetadata)) - for i, p := range partitionMetadata { - partitions[i] = p.ID + topicPartitions := make(map[string][]int32, len(topicsMetadata)) + for _, topicMetadata := range topicsMetadata { + partitionMetadata := topicMetadata.Partitions + partitions := make([]int32, len(partitionMetadata)) + for i, p := range partitionMetadata { + partitions[i] = p.ID + } + topicPartitions[topicMetadata.Name] = partitions } - - return partitions, nil + return topicPartitions, nil } -func (s *kafkaScaler) getOffsets(partitions []int32) (*sarama.OffsetFetchResponse, error) { - offsets, err := s.admin.ListConsumerGroupOffsets(s.metadata.group, map[string][]int32{ - s.metadata.topic: partitions, - }) - +func (s *kafkaScaler) getOffsets(topicPartitions map[string][]int32) (*sarama.OffsetFetchResponse, error) { + offsets, err := s.admin.ListConsumerGroupOffsets(s.metadata.group, topicPartitions) if err != nil { return nil, fmt.Errorf("error listing consumer group offsets: %s", err) } @@ -319,16 +334,16 @@ func (s *kafkaScaler) getOffsets(partitions []int32) (*sarama.OffsetFetchRespons return offsets, nil } -func (s *kafkaScaler) getLagForPartition(partition int32, offsets *sarama.OffsetFetchResponse, topicOffsets map[int32]int64) (int64, error) { - block := offsets.GetBlock(s.metadata.topic, partition) +func (s *kafkaScaler) getLagForPartition(topic string, partition int32, offsets *sarama.OffsetFetchResponse, topicOffsets map[int32]int64) (int64, error) { + block := offsets.GetBlock(topic, partition) if block == nil { - kafkaLog.Error(fmt.Errorf("error finding offset block for topic %s and partition %d", s.metadata.topic, partition), "") - return 0, fmt.Errorf("error finding offset block for topic %s and partition %d", s.metadata.topic, partition) + kafkaLog.Error(fmt.Errorf("error finding offset block for topic %s and partition %d", topic, partition), "") + return 0, fmt.Errorf("error finding offset block for topic %s and partition %d", topic, partition) } consumerOffset := block.Offset if consumerOffset == invalidOffset && s.metadata.offsetResetPolicy == latest { - kafkaLog.V(0).Info(fmt.Sprintf("invalid offset found for topic %s in group %s and partition %d, probably no offset is committed yet", s.metadata.topic, s.metadata.group, partition)) - return invalidOffset, fmt.Errorf("invalid offset found for topic %s in group %s and partition %d, probably no offset is committed yet", s.metadata.topic, s.metadata.group, partition) + kafkaLog.V(0).Info(fmt.Sprintf("invalid offset found for topic %s in group %s and partition %d, probably no offset is committed yet", topic, s.metadata.group, partition)) + return invalidOffset, fmt.Errorf("invalid offset found for topic %s in group %s and partition %d, probably no offset is committed yet", topic, s.metadata.group, partition) } latestOffset := topicOffsets[partition] @@ -351,9 +366,17 @@ func (s *kafkaScaler) Close(context.Context) error { func (s *kafkaScaler) GetMetricSpecForScaling(context.Context) []v2beta2.MetricSpec { targetMetricValue := resource.NewQuantity(s.metadata.lagThreshold, resource.DecimalSI) + + var metricName string + if s.metadata.topic != "" { + metricName = fmt.Sprintf("kafka-%s", s.metadata.topic) + } else { + metricName = fmt.Sprintf("kafka-%s-topics", s.metadata.group) + } + externalMetric := &v2beta2.ExternalMetricSource{ Metric: v2beta2.MetricIdentifier{ - Name: GenerateMetricNameWithIndex(s.metadata.scalerIndex, kedautil.NormalizeString(fmt.Sprintf("kafka-%s", s.metadata.topic))), + Name: GenerateMetricNameWithIndex(s.metadata.scalerIndex, kedautil.NormalizeString(metricName)), }, Target: v2beta2.MetricTarget{ Type: v2beta2.AverageValueMetricType, @@ -366,34 +389,36 @@ func (s *kafkaScaler) GetMetricSpecForScaling(context.Context) []v2beta2.MetricS // GetMetrics returns value for a supported metric and an error if there is a problem getting the metric func (s *kafkaScaler) GetMetrics(ctx context.Context, metricName string, metricSelector labels.Selector) ([]external_metrics.ExternalMetricValue, error) { - partitions, err := s.getPartitions() + topicPartitions, err := s.getTopicPartitions() if err != nil { return []external_metrics.ExternalMetricValue{}, err } - offsets, err := s.getOffsets(partitions) + offsets, err := s.getOffsets(topicPartitions) if err != nil { return []external_metrics.ExternalMetricValue{}, err } - topicOffsets, err := s.getTopicOffsets(partitions) + topicsPartitionsOffsets, err := s.getTopicPartitionOffsets(topicPartitions) if err != nil { return []external_metrics.ExternalMetricValue{}, err } totalLag := int64(0) - for _, partition := range partitions { - lag, _ := s.getLagForPartition(partition, offsets, topicOffsets) - - totalLag += lag + var totalTopicPartitions int64 + for topic, partitionsOffsets := range topicsPartitionsOffsets { + for partition := range partitionsOffsets { + lag, _ := s.getLagForPartition(topic, partition, offsets, partitionsOffsets) + totalLag += lag + } + totalTopicPartitions += (int64)(len(partitionsOffsets)) } - - kafkaLog.V(1).Info(fmt.Sprintf("Kafka scaler: Providing metrics based on totalLag %v, partitions %v, threshold %v", totalLag, len(partitions), s.metadata.lagThreshold)) + kafkaLog.V(1).Info(fmt.Sprintf("Kafka scaler: Providing metrics based on totalLag %v, topicPartitions %v, threshold %v", totalLag, len(topicPartitions), s.metadata.lagThreshold)) if !s.metadata.allowIdleConsumers { - // don't scale out beyond the number of partitions - if (totalLag / s.metadata.lagThreshold) > int64(len(partitions)) { - totalLag = int64(len(partitions)) * s.metadata.lagThreshold + // don't scale out beyond the number of topicPartitions + if (totalLag / s.metadata.lagThreshold) > totalTopicPartitions { + totalLag = totalTopicPartitions * s.metadata.lagThreshold } } @@ -406,7 +431,7 @@ func (s *kafkaScaler) GetMetrics(ctx context.Context, metricName string, metricS return append([]external_metrics.ExternalMetricValue{}, metric), nil } -func (s *kafkaScaler) getTopicOffsets(partitions []int32) (map[int32]int64, error) { +func (s *kafkaScaler) getTopicPartitionOffsets(topicPartitions map[string][]int32) (map[string]map[int32]int64, error) { version := int16(0) if s.client.Config().Version.IsAtLeast(sarama.V0_10_1_0) { version = 1 @@ -415,41 +440,42 @@ func (s *kafkaScaler) getTopicOffsets(partitions []int32) (map[int32]int64, erro // Step 1: build one OffsetRequest instance per broker. requests := make(map[*sarama.Broker]*sarama.OffsetRequest) - for _, partitionID := range partitions { - broker, err := s.client.Leader(s.metadata.topic, partitionID) - if err != nil { - return nil, err - } - - request, ok := requests[broker] - if !ok { - request = &sarama.OffsetRequest{Version: version} - requests[broker] = request + for topic, partitions := range topicPartitions { + for _, partitionID := range partitions { + broker, err := s.client.Leader(topic, partitionID) + if err != nil { + return nil, err + } + request, ok := requests[broker] + if !ok { + request = &sarama.OffsetRequest{Version: version} + requests[broker] = request + } + request.AddBlock(topic, partitionID, sarama.OffsetNewest, 1) } - - request.AddBlock(s.metadata.topic, partitionID, sarama.OffsetNewest, 1) } - offsets := make(map[int32]int64) + topicPartitionsOffsets := make(map[string]map[int32]int64) - // Step 2: send requests, one per broker, and collect offsets + // Step 2: send requests, one per broker, and collect topicPartitionsOffsets for broker, request := range requests { response, err := broker.GetAvailableOffsets(request) - if err != nil { return nil, err } - for _, blocks := range response.Blocks { + for topic, blocks := range response.Blocks { + if _, found := topicPartitionsOffsets[topic]; !found { + topicPartitionsOffsets[topic] = make(map[int32]int64) + } for partitionID, block := range blocks { if block.Err != sarama.ErrNoError { return nil, block.Err } - - offsets[partitionID] = block.Offset + topicPartitionsOffsets[topic][partitionID] = block.Offset } } } - return offsets, nil + return topicPartitionsOffsets, nil } diff --git a/pkg/scalers/kafka_scaler_test.go b/pkg/scalers/kafka_scaler_test.go index d5f4647bc0f..176779b1af1 100644 --- a/pkg/scalers/kafka_scaler_test.go +++ b/pkg/scalers/kafka_scaler_test.go @@ -52,8 +52,8 @@ var parseKafkaMetadataTestDataset = []parseKafkaMetadataTestData{ {map[string]string{}, true, 0, nil, "", "", "", false}, // failure, no consumer group {map[string]string{"bootstrapServers": "foobar:9092"}, true, 1, []string{"foobar:9092"}, "", "", "latest", false}, - // failure, no topic - {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group"}, true, 1, []string{"foobar:9092"}, "my-group", "", offsetResetPolicy("latest"), false}, + // success, no topic + {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group"}, false, 1, []string{"foobar:9092"}, "my-group", "", offsetResetPolicy("latest"), false}, // failure, version not supported {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "version": "1.2.3.4"}, true, 1, []string{"foobar:9092"}, "my-group", "my-topic", offsetResetPolicy("latest"), false}, // success @@ -118,6 +118,7 @@ var parseKafkaAuthParamsTestDataset = []parseKafkaAuthParamsTestData{ var kafkaMetricIdentifiers = []kafkaMetricIdentifier{ {&parseKafkaMetadataTestDataset[4], 0, "s0-kafka-my-topic"}, {&parseKafkaMetadataTestDataset[4], 1, "s1-kafka-my-topic"}, + {&parseKafkaMetadataTestDataset[2], 1, "s1-kafka-my-group-topics"}, } func TestGetBrokers(t *testing.T) { @@ -190,6 +191,7 @@ func TestKafkaAuthParams(t *testing.T) { } } } + func TestKafkaGetMetricSpecForScaling(t *testing.T) { for _, testData := range kafkaMetricIdentifiers { meta, err := parseKafkaMetadata(&ScalerConfig{TriggerMetadata: testData.metadataTestData.metadata, AuthParams: validWithAuthParams, ScalerIndex: testData.scalerIndex}) diff --git a/tests/scalers/kafka.test.ts b/tests/scalers/kafka.test.ts index 61413775ab9..e96f642f5de 100644 --- a/tests/scalers/kafka.test.ts +++ b/tests/scalers/kafka.test.ts @@ -7,6 +7,7 @@ const defaultNamespace = 'kafka-test' const defaultCluster = 'kafka-cluster' const timeToWait = 300 const defaultTopic = 'kafka-topic' +const defaultTopic2 = 'kafka-topic-2' const defaultKafkaClient = 'kafka-client' const strimziOperatorVersion = '0.18.0' const commandToCheckReplicas = `kubectl get deployments/kafka-consumer --namespace ${defaultNamespace} -o jsonpath="{.spec.replicas}"` @@ -17,11 +18,13 @@ const kafkaTopicYamlFile = tmp.fileSync() const kafkaClientYamlFile = tmp.fileSync() const kafkaApplicationLatestYamlFile = tmp.fileSync() const kafkaApplicationEarliestYamlFile = tmp.fileSync() +const kafkaApplicationMultipleTopicsYamlFile = tmp.fileSync() const scaledObjectEarliestYamlFile = tmp.fileSync() const scaledObjectLatestYamlFile = tmp.fileSync() +const scaledObjectMultipleTopicsYamlFile = tmp.fileSync() test.before('Set up, create necessary resources.', t => { - sh.config.silent = true + sh.config.silent = true sh.exec(`kubectl create namespace ${defaultNamespace}`) const strimziOperatorYaml = sh.exec(`curl -L https://github.com/strimzi/strimzi-kafka-operator/releases/download/${strimziOperatorVersion}/strimzi-cluster-operator-${strimziOperatorVersion}.yaml`).stdout @@ -42,9 +45,9 @@ test.before('Set up, create necessary resources.', t => { 0, sh.exec(`kubectl wait kafka/${defaultCluster} --for=condition=Ready --timeout=${timeToWait}s --namespace ${defaultNamespace}`).code, 'Kafka instance should be ready within given time limit.' - ) + ) - fs.writeFileSync(kafkaTopicYamlFile.name, kafkaTopicYaml) + fs.writeFileSync(kafkaTopicYamlFile.name, kafkaTopicsYaml) t.is( 0, sh.exec(`kubectl apply -f ${kafkaTopicYamlFile.name} --namespace ${defaultNamespace}`).code, @@ -53,8 +56,13 @@ test.before('Set up, create necessary resources.', t => { t.is( 0, sh.exec(`kubectl wait kafkatopic/${defaultTopic} --for=condition=Ready --timeout=${timeToWait}s --namespace ${defaultNamespace}`).code, - 'Kafka topic should be ready within given time limit.' - ) + 'Kafka topic should be ready withlanguage-mattersin given time limit.' + ) + t.is( + 0, + sh.exec(`kubectl wait kafkatopic/${defaultTopic2} --for=condition=Ready --timeout=${timeToWait}s --namespace ${defaultNamespace}`).code, + 'Kafka topic2 should be ready within given time limit.' + ) fs.writeFileSync(kafkaClientYamlFile.name, kafkaClientYaml) t.is( @@ -66,7 +74,7 @@ test.before('Set up, create necessary resources.', t => { 0, sh.exec(`kubectl wait pod/${defaultKafkaClient} --for=condition=Ready --timeout=${timeToWait}s --namespace ${defaultNamespace}`).code, 'Kafka client should be ready within given time limit.' - ) + ) fs.writeFileSync(kafkaApplicationEarliestYamlFile.name, kafkaApplicationEarliestYaml) @@ -170,7 +178,6 @@ test.serial('Applying ScaledObject latest policy should not scale up pods', t => sh.exec(`sleep 5s`) waitForReplicaCount(1, commandToCheckReplicas) t.is('0', sh.exec(commandToCheckReplicas).stdout, 'Replica count should be 0.') - }) @@ -187,11 +194,80 @@ test.serial('Latest Scale object should scale with new messages', t => { } }) +test.serial('Cleanup after latest policy test', t=> { + t.is( + 0, + sh.exec(`kubectl delete -f ${scaledObjectLatestYamlFile.name} --namespace ${defaultNamespace}`).code, + 'Deleting Scaled Object should work.' + ) + t.is( + 0, + sh.exec(`kubectl delete -f ${kafkaApplicationLatestYamlFile.name} --namespace ${defaultNamespace}`).code, + 'Deleting kafka application should work.' + ) + sh.exec(`sleep 10s`) +}) + +test.serial('Applying ScaledObject with multiple topics should scale up pods', t => { + // Make the consumer commit the all offsets for all topics in the group + sh.exec(`kubectl exec --namespace ${defaultNamespace} ${defaultKafkaClient} -- sh -c 'kafka-console-consumer --bootstrap-server "${defaultCluster}-kafka-bootstrap.${defaultNamespace}:9092" --topic ${defaultTopic} --group multiTopic --from-beginning --consumer-property enable.auto.commit=true --timeout-ms 15000'`) + sh.exec(`kubectl exec --namespace ${defaultNamespace} ${defaultKafkaClient} -- sh -c 'kafka-console-consumer --bootstrap-server "${defaultCluster}-kafka-bootstrap.${defaultNamespace}:9092" --topic ${defaultTopic2} --group multiTopic --from-beginning --consumer-property enable.auto.commit=true --timeout-ms 15000'`) + + fs.writeFileSync(kafkaApplicationMultipleTopicsYamlFile.name, kafkaApplicationMultipleTopicsYaml) + t.is( + 0, + sh.exec(`kubectl apply -f ${kafkaApplicationMultipleTopicsYamlFile.name} --namespace ${defaultNamespace}`).code, + 'Deploying Kafka application should work.' + ) + sh.exec(`sleep 5s`) + fs.writeFileSync(scaledObjectMultipleTopicsYamlFile.name, scaledObjectMultipleTopicsYaml) + + t.is( + 0, + sh.exec(`kubectl apply -f ${scaledObjectMultipleTopicsYamlFile.name} --namespace ${defaultNamespace}`).code, + 'Deploying Scaled Object should work.' + ) + sh.exec(`sleep 5s`) + waitForReplicaCount(1, commandToCheckReplicas) + t.is('0', sh.exec(commandToCheckReplicas).stdout, 'Replica count should be 0.') + + // produce a single msg to the default topic should not scale + sh.exec(`kubectl exec --namespace ${defaultNamespace} ${defaultKafkaClient} -- sh -exc 'echo "{\"text\": \"foo\"}" | kafka-console-producer --broker-list ${defaultCluster}-kafka-bootstrap.${defaultNamespace}:9092 --topic ${defaultTopic}'`) + sh.exec(`sleep 20s`) + waitForReplicaCount(0, commandToCheckReplicas) + t.is('0', sh.exec(commandToCheckReplicas).stdout, 'Replica count should be 0.') + + // produce one more msg to the different topic should trigger scale + sh.exec(`kubectl exec --namespace ${defaultNamespace} ${defaultKafkaClient} -- sh -exc 'echo "{\"text\": \"foo\"}" | kafka-console-producer --broker-list ${defaultCluster}-kafka-bootstrap.${defaultNamespace}:9092 --topic ${defaultTopic2}'`) + sh.exec(`sleep 20s`) + waitForReplicaCount(1, commandToCheckReplicas) + t.is('1', sh.exec(commandToCheckReplicas).stdout, 'Replica count should be 1.') +}) + +test.serial('Cleanup after multiple topics test', t=> { + t.is( + 0, + sh.exec(`kubectl delete -f ${scaledObjectMultipleTopicsYamlFile.name} --namespace ${defaultNamespace}`).code, + 'Deleting Scaled Object should work.' + ) + t.is( + 0, + sh.exec(`kubectl delete -f ${kafkaApplicationMultipleTopicsYamlFile.name} --namespace ${defaultNamespace}`).code, + 'Deleting kafka application should work.' + ) +}) + + test.after.always('Clean up, delete created resources.', t => { const resources = [ `${scaledObjectEarliestYamlFile.name}`, `${scaledObjectLatestYamlFile.name}`, + `${scaledObjectMultipleTopicsYamlFile.name}`, + + `${kafkaApplicationEarliestYamlFile.name}`, `${kafkaApplicationLatestYamlFile.name}`, + `${kafkaApplicationMultipleTopicsYamlFile.name}`, + `${kafkaClientYamlFile.name}`, `${kafkaTopicYamlFile.name}`, `${kafkaClusterYamlFile.name}`, @@ -231,13 +307,27 @@ spec: topicOperator: {} userOperator: {}` -const kafkaTopicYaml = `apiVersion: kafka.strimzi.io/v1beta1 +const kafkaTopicsYaml = `apiVersion: kafka.strimzi.io/v1beta1 kind: KafkaTopic metadata: name: ${defaultTopic} labels: strimzi.io/cluster: ${defaultCluster} namespace: ${defaultNamespace} +spec: + partitions: 3 + replicas: 1 + config: + retention.ms: 604800000 + segment.bytes: 1073741824 +--- +apiVersion: kafka.strimzi.io/v1beta1 +kind: KafkaTopic +metadata: + name: ${defaultTopic2} + labels: + strimzi.io/cluster: ${defaultCluster} + namespace: ${defaultNamespace} spec: partitions: 3 replicas: 1 @@ -310,6 +400,40 @@ spec: - -c - "kafka-console-consumer --bootstrap-server ${defaultCluster}-kafka-bootstrap.${defaultNamespace}:9092 --topic ${defaultTopic} --group earliest --from-beginning --consumer-property enable.auto.commit=false"` +const kafkaApplicationMultipleTopicsYaml = ` +apiVersion: apps/v1 +kind: Deployment +metadata: + name: kafka-consumer + namespace: ${defaultNamespace} + labels: + app: kafka-consumer +spec: + selector: + matchLabels: + app: kafka-consumer + template: + metadata: + labels: + app: kafka-consumer + spec: + containers: + # only recent version of kafka-console-consumer support flag "include" + # old version's equiv flag will violate language-matters commit hook + # work around -> create two consumer container joining the same group + - name: kafka-consumer + image: confluentinc/cp-kafka:5.2.1 + command: + - sh + - -c + - "kafka-console-consumer --bootstrap-server ${defaultCluster}-kafka-bootstrap.${defaultNamespace}:9092 --topic '${defaultTopic}' --group multiTopic --from-beginning --consumer-property enable.auto.commit=false" + - name: kafka-consumer-2 + image: confluentinc/cp-kafka:5.2.1 + command: + - sh + - -c + - "kafka-console-consumer --bootstrap-server ${defaultCluster}-kafka-bootstrap.${defaultNamespace}:9092 --topic '${defaultTopic2}' --group multiTopic --from-beginning --consumer-property enable.auto.commit=false"` + const scaledObjectEarliestYaml = `apiVersion: keda.sh/v1alpha1 kind: ScaledObject metadata: @@ -343,3 +467,19 @@ spec: consumerGroup: latest lagThreshold: '1' offsetResetPolicy: 'latest'` + +const scaledObjectMultipleTopicsYaml = `apiVersion: keda.sh/v1alpha1 +kind: ScaledObject +metadata: + name: kafka-consumer-multi-topic + namespace: ${defaultNamespace} +spec: + scaleTargetRef: + name: kafka-consumer + triggers: + - type: kafka + metadata: + bootstrapServers: ${defaultCluster}-kafka-bootstrap.${defaultNamespace}:9092 + consumerGroup: multiTopic + lagThreshold: '2' + offsetResetPolicy: 'latest'`