diff --git a/kafka/source/README.md b/kafka/source/README.md index d338cfac36..e53e6869d0 100644 --- a/kafka/source/README.md +++ b/kafka/source/README.md @@ -51,3 +51,68 @@ event sink. A more detailed example of the `KafkaSource` can be found in the [Knative documentation](https://knative.dev/docs/eventing/samples/). + +## Experimental KEDA support in Kafka Event Source + +Warning: this is *experimental* and may be changed in future. Should not be used in production. This is mainly for discussion and evolving scaling in Knative eventing. + +The code is using Unstructured and also imported KEDA API - this is for discussion which version should be used (right now only Unstructured is fully implemented). +KEDA to provide client-go support discussion #494 + +### Install Kafka + +See above + +### Install KEDA - + +To install the version I used for the experiment: + +```bash +git clone https://github.com/kedacore/keda.git +cd keda +git pull +git checkout master +#$ git rev-parse HEAD +#0099c102d538995c81610fb48d12bde1259678a6 +git checkout 0099c102d538995c81610fb48d12bde1259678a6 +``` + +### Run Kafka Source Controller + +Install Kafka source controller with Keda support: + +```bash +export KO_DOCKER_REPO=... +ko apply -f kafka/source/config/ +``` + +#### Local testing + +```bash +go run kafka/source/cmd/controller/main.go -kubeconfig $KUBECONFIG +``` + +### Create Kafka Source that uses YAML + +And use example YAML that has minReplicaCount set to value different from 1. For example: + +```yaml +apiVersion: sources.eventing.knative.dev/v1alpha1 +kind: KafkaSource +metadata: + name: kafka-src10 + annotations: + autoscaling.knative.dev/minScale: "0" + autoscaling.knative.dev/maxScale: "10" + autoscaling.knative.dev/class: keda.autoscaling.knative.dev + keda.autoscaling.knative.dev/pollingInterval: "2" + keda.autoscaling.knative.dev/cooldownPeriod: "15" +spec: + bootstrapServers: my-cluster-kafka-bootstrap.kafka:9092 #note the .kafka in URL for namespace + consumerGroup: kafka-source10-11 + topics: my-topic-10 + sink: + apiVersion: v1 + kind: Service + name: hello-display +``` diff --git a/kafka/source/pkg/reconciler/controller.go b/kafka/source/pkg/reconciler/controller.go index d5bc0727b6..54ee033ae9 100644 --- a/kafka/source/pkg/reconciler/controller.go +++ b/kafka/source/pkg/reconciler/controller.go @@ -29,6 +29,7 @@ import ( eventtypeinformer "knative.dev/eventing/pkg/client/injection/informers/eventing/v1alpha1/eventtype" kubeclient "knative.dev/pkg/client/injection/kube/client" deploymentinformer "knative.dev/pkg/client/injection/kube/informers/apps/v1/deployment" + "knative.dev/pkg/injection/clients/dynamicclient" "knative.dev/pkg/configmap" "knative.dev/pkg/controller" @@ -56,6 +57,7 @@ func NewController( c := &Reconciler{ KubeClientSet: kubeclient.Get(ctx), + DynamicClientSet: dynamicclient.Get(ctx), EventingClientSet: eventingclient.Get(ctx), kafkaClientSet: kafkaclient.Get(ctx), kafkaLister: kafkaInformer.Lister(), diff --git a/kafka/source/pkg/reconciler/kafkasource.go b/kafka/source/pkg/reconciler/kafkasource.go index 89b5e61c3a..2f57c4a627 100644 --- a/kafka/source/pkg/reconciler/kafkasource.go +++ b/kafka/source/pkg/reconciler/kafkasource.go @@ -20,17 +20,22 @@ import ( "context" "errors" "fmt" + "strconv" "strings" "knative.dev/pkg/metrics" + "knative.dev/serving/pkg/apis/autoscaling" "go.uber.org/zap" appsv1 "k8s.io/api/apps/v1" + v1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/equality" apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime/schema" "knative.dev/eventing-contrib/kafka/source/pkg/apis/sources/v1alpha1" @@ -40,6 +45,7 @@ import ( eventingclientset "knative.dev/eventing/pkg/client/clientset/versioned" // NewController stuff + "k8s.io/client-go/dynamic" "k8s.io/client-go/kubernetes" appsv1listers "k8s.io/client-go/listers/apps/v1" versioned "knative.dev/eventing-contrib/kafka/source/pkg/client/clientset/versioned" @@ -86,8 +92,8 @@ func newDeploymentFailed(namespace, name string, err error) pkgreconciler.Event type Reconciler struct { // KubeClientSet allows us to talk to the k8s for core APIs - KubeClientSet kubernetes.Interface - + KubeClientSet kubernetes.Interface + DynamicClientSet dynamic.Interface // EventingClientSet allows us to configure Eventing objects EventingClientSet eventingclientset.Interface @@ -103,6 +109,12 @@ type Reconciler struct { metricsConfig *metrics.ExporterOptions sinkResolver *resolver.URIResolver + + minReplicaCount *int32 + maxReplicaCount *int32 + cooldownPeriod *int32 + pollingInterval *int32 + triggerLagThreshold *int32 } // Check that our Reconciler implements Interface @@ -163,6 +175,7 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, src *v1alpha1.KafkaSourc logging.FromContext(ctx).Error("Unable to create the receive adapter", zap.Error(err)) return err } + r.scaleKafkaSource(ctx, ra, src) src.Status.MarkDeployed(ra) err = r.reconcileEventTypes(ctx, src) @@ -206,6 +219,188 @@ func checkResourcesStatus(src *v1alpha1.KafkaSource) error { return nil } +func (r *Reconciler) scaleKafkaSource(ctx context.Context, ra *v1.Deployment, src *v1alpha1.KafkaSource) { + r.readScalingAnnotations(ctx, src) + // no scaling annotatins so no scaling work needed + if r.minReplicaCount == nil && r.maxReplicaCount == nil { + return + } + _, error := r.deployKedaScaledObject(ctx, ra, src) + if error != nil { + // additional logging? + } +} + +func (r *Reconciler) deployKedaScaledObject(ctx context.Context, ra *v1.Deployment, src *v1alpha1.KafkaSource) (*unstructured.Unstructured, error) { + logger := logging.FromContext(ctx).Desugar() + deploymentName := ra.GetName() + logger.Info("Got ra", zap.Any("receiveAdapter", ra)) + logger.Info("Got ra name "+deploymentName, zap.Any("deploymentName", deploymentName)) + namespace := src.Namespace + name := src.Name + gvk := schema.GroupVersionKind{ + Group: "keda.k8s.io", + Version: "v1alpha1", + Kind: "ScaledObject", + } + gvr, _ := meta.UnsafeGuessKindToResource(gvk) + scaledObjectResourceInterface := r.DynamicClientSet.Resource(gvr).Namespace(namespace) + if scaledObjectResourceInterface == nil { + return nil, fmt.Errorf("unable to create dynamic client for ScaledObject") + } + scaledObjectUnstr, err := r.generateKedaScaledObjectUnstructured(ctx, ra, src) + if err != nil { + return nil, err + } + created, err := scaledObjectResourceInterface.Create(scaledObjectUnstr, metav1.CreateOptions{}) + if err != nil { + logger.Error("Failed to create ScaledObject so going to do update", zap.Error(err)) + //fmt.Printf("Doing update as failed to create ScaledObject: %s \n", err) + // will replace - https://github.com/kubernetes/client-go/blob/master/examples/dynamic-create-update-delete-deployment/main.go + // doing kubectl "replace" https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#concurrency-control-and-consistency + // first get resourceVersion + existing, err := scaledObjectResourceInterface.Get(name, metav1.GetOptions{}) + if err != nil { + logger.Error("Failed to create ScaledObject:", zap.Error(err)) + return nil, err + } + resourceVersion := existing.GetResourceVersion() + scaledObjectUnstr.SetResourceVersion(resourceVersion) + updated, err := scaledObjectResourceInterface.Update(scaledObjectUnstr, metav1.UpdateOptions{}) + if err != nil { + logger.Error("Update failed to create ScaledObject", zap.Error(err)) + return nil, err + } else { + logger.Info("Update success", zap.Any("updated", updated)) + return updated, nil + } + } + return created, nil +} + +const ( + kedaAutoscalingAnnotationClass = "keda.autoscaling.knative.dev" + kedaCooldownPeriodAnnodationKey = kedaAutoscalingAnnotationClass + "/cooldownPeriod" + kedaPollingIntervalAnnodationKey = kedaAutoscalingAnnotationClass + "/pollingInterval" + kedaTriggerLagThresholdAnnodationKey = kedaAutoscalingAnnotationClass + "/trigger.lagThreshold" +) + +func convertMapKeyToInt32(dict map[string]string, key string, logger *zap.Logger) *int32 { + val, ok := dict[key] + if !ok { + return nil + } + i, err := strconv.ParseInt(val, 10, 32) + if err != nil { + logger.Error("Expected annotation value to be integer but got "+val, zap.Any("annotations key", key)) + return nil + } + i32 := int32(i) + return &i32 +} + +func (r *Reconciler) readScalingAnnotations(ctx context.Context, src *v1alpha1.KafkaSource) { + logger := logging.FromContext(ctx).Desugar() + meta := src.GetObjectMeta() + annotations := meta.GetAnnotations() + if annotations != nil { + scalingClass := annotations[autoscaling.ClassAnnotationKey] + r.minReplicaCount = convertMapKeyToInt32(annotations, autoscaling.MinScaleAnnotationKey, logger) + r.maxReplicaCount = convertMapKeyToInt32(annotations, autoscaling.MaxScaleAnnotationKey, logger) + if scalingClass == kedaAutoscalingAnnotationClass { + r.cooldownPeriod = convertMapKeyToInt32(annotations, kedaCooldownPeriodAnnodationKey, logger) + r.pollingInterval = convertMapKeyToInt32(annotations, kedaPollingIntervalAnnodationKey, logger) + r.pollingInterval = convertMapKeyToInt32(annotations, kedaPollingIntervalAnnodationKey, logger) + r.triggerLagThreshold = convertMapKeyToInt32(annotations, kedaTriggerLagThresholdAnnodationKey, logger) + + } + } +} + +func (r *Reconciler) generateKedaScaledObjectUnstructured(ctx context.Context, ra *v1.Deployment, src *v1alpha1.KafkaSource) (*unstructured.Unstructured, error) { + logger := logging.FromContext(ctx).Desugar() + deploymentName := ra.GetName() + namespace := src.Namespace + name := src.Name + logger.Info("Unstructured ScaledObject name "+name, zap.Any("name", name)) + srcName := src.GetName() + srcUID := src.GetUID() + srcResVersion := src.GetResourceVersion() + logger.Info("Got srcResVersion="+srcResVersion, zap.Any("srcResVersion", srcResVersion)) + srcKind := src.GetGroupVersionKind().Kind + logger.Info("Got srcKind srcName srcUID", zap.Any("srcKind", srcKind), zap.Any("srcName", srcName), zap.Any("srcUID", srcUID)) + srcBrokerList := src.Spec.BootstrapServers + srcConcumerGroup := src.Spec.ConsumerGroup + triggers := make([]map[string]interface{}, 0, 1) + topics := strings.Split(src.Spec.Topics, ",") + if len(topics) == 0 { + return nil, fmt.Errorf("Comma-separated list of topics can not be empty") + } + for _, topic := range topics { + triggerMetadata := map[string]interface{}{ + "brokerList": srcBrokerList, + "consumerGroup": srcConcumerGroup, + "topic": topic, + } + if r.triggerLagThreshold != nil { + logger.Info("Got triggerLagThreshold", zap.Any("triggerLagThreshold", r.triggerLagThreshold)) + triggerMetadata["lagThreshold"] = strconv.Itoa(int(*r.triggerLagThreshold)) + } + trigger := map[string]interface{}{ + "type": "kafka", + "metadata": triggerMetadata, + } + triggers = append(triggers, trigger) + } + spec := map[string]interface{}{ + "scaleTargetRef": map[string]interface{}{ + "deploymentName": deploymentName, + }, + "triggers": triggers, + } + if r.minReplicaCount != nil { + logger.Info("Got minReplicaCount", zap.Any("minReplicaCount", r.minReplicaCount)) + spec["minReplicaCount"] = *r.minReplicaCount + } + if r.maxReplicaCount != nil { + logger.Info("Got maxReplicaCount", zap.Any("maxReplicaCount", r.maxReplicaCount)) + spec["maxReplicaCount"] = *r.maxReplicaCount + } + if r.cooldownPeriod != nil { + logger.Info("Got cooldownPeriod", zap.Any("cooldownPeriod", r.cooldownPeriod)) + spec["cooldownPeriod"] = *r.cooldownPeriod + } + if r.pollingInterval != nil { + logger.Info("Got pollingInterval", zap.Any("pollingInterval", r.minReplicaCount)) + spec["pollingInterval"] = *r.pollingInterval + } + soUnstr := &unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": "keda.k8s.io/v1alpha1", + "kind": "ScaledObject", + "metadata": map[string]interface{}{ + "creationTimestamp": nil, + "namespace": namespace, + "name": name, + "labels": map[string]interface{}{ + "deploymentName": deploymentName, + }, + "ownerReferences": []map[string]interface{}{{ + "apiVersion": "sources.eventing.knative.dev/v1alpha1", + "kind": srcKind, + "name": srcName, + "uid": srcUID, + "blockOwnerDeletion": true, + "controller": true, + }}, + }, + "spec": spec, + }, + } + logger.Info("Unstructured SO name "+name, zap.Any("name", name), zap.Any("soUnstr", soUnstr)) + return soUnstr, nil +} + func (r *Reconciler) createReceiveAdapter(ctx context.Context, src *v1alpha1.KafkaSource, sinkURI string) (*appsv1.Deployment, error) { if err := checkResourcesStatus(src); err != nil {