Skip to content
This repository has been archived by the owner on Jun 4, 2021. It is now read-only.

Commit

Permalink
Autoscaling wiht KEDA patched to latest master
Browse files Browse the repository at this point in the history
  • Loading branch information
aslom committed Feb 20, 2020
1 parent bd2decb commit dd0d2d0
Show file tree
Hide file tree
Showing 3 changed files with 264 additions and 2 deletions.
65 changes: 65 additions & 0 deletions kafka/source/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,3 +51,68 @@ event sink.
A more detailed example of the `KafkaSource` can be found in the
[Knative documentation](https://knative.dev/docs/eventing/samples/).

## Experimental KEDA support in Kafka Event Source

Warning: this is *experimental* and may be changed in future. Should not be used in production. This is mainly for discussion and evolving scaling in Knative eventing.

The code is using Unstructured and also imported KEDA API - this is for discussion which version should be used (right now only Unstructured is fully implemented).
KEDA to provide client-go support discussion #494 <https://github.com/kedacore/keda/issues/494>

### Install Kafka

See above

### Install KEDA -

To install the version I used for the experiment:

```bash
git clone https://github.com/kedacore/keda.git
cd keda
git pull
git checkout master
#$ git rev-parse HEAD
#0099c102d538995c81610fb48d12bde1259678a6
git checkout 0099c102d538995c81610fb48d12bde1259678a6
```

### Run Kafka Source Controller

Install Kafka source controller with Keda support:

```bash
export KO_DOCKER_REPO=...
ko apply -f kafka/source/config/
```

#### Local testing

```bash
go run kafka/source/cmd/controller/main.go -kubeconfig $KUBECONFIG
```

### Create Kafka Source that uses YAML

And use example YAML that has minReplicaCount set to value different from 1. For example:

```yaml
apiVersion: sources.eventing.knative.dev/v1alpha1
kind: KafkaSource
metadata:
name: kafka-src10
annotations:
autoscaling.knative.dev/minScale: "0"
autoscaling.knative.dev/maxScale: "10"
autoscaling.knative.dev/class: keda.autoscaling.knative.dev
keda.autoscaling.knative.dev/pollingInterval: "2"
keda.autoscaling.knative.dev/cooldownPeriod: "15"
spec:
bootstrapServers: my-cluster-kafka-bootstrap.kafka:9092 #note the .kafka in URL for namespace
consumerGroup: kafka-source10-11
topics: my-topic-10
sink:
apiVersion: v1
kind: Service
name: hello-display
```
2 changes: 2 additions & 0 deletions kafka/source/pkg/reconciler/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
eventtypeinformer "knative.dev/eventing/pkg/client/injection/informers/eventing/v1alpha1/eventtype"
kubeclient "knative.dev/pkg/client/injection/kube/client"
deploymentinformer "knative.dev/pkg/client/injection/kube/informers/apps/v1/deployment"
"knative.dev/pkg/injection/clients/dynamicclient"

"knative.dev/pkg/configmap"
"knative.dev/pkg/controller"
Expand Down Expand Up @@ -56,6 +57,7 @@ func NewController(

c := &Reconciler{
KubeClientSet: kubeclient.Get(ctx),
DynamicClientSet: dynamicclient.Get(ctx),
EventingClientSet: eventingclient.Get(ctx),
kafkaClientSet: kafkaclient.Get(ctx),
kafkaLister: kafkaInformer.Lister(),
Expand Down
199 changes: 197 additions & 2 deletions kafka/source/pkg/reconciler/kafkasource.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,22 @@ import (
"context"
"errors"
"fmt"
"strconv"
"strings"

"knative.dev/pkg/metrics"
"knative.dev/serving/pkg/apis/autoscaling"

"go.uber.org/zap"
appsv1 "k8s.io/api/apps/v1"
v1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/equality"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime/schema"
"knative.dev/eventing-contrib/kafka/source/pkg/apis/sources/v1alpha1"
Expand All @@ -40,6 +45,7 @@ import (
eventingclientset "knative.dev/eventing/pkg/client/clientset/versioned"

// NewController stuff
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes"
appsv1listers "k8s.io/client-go/listers/apps/v1"
versioned "knative.dev/eventing-contrib/kafka/source/pkg/client/clientset/versioned"
Expand Down Expand Up @@ -86,8 +92,8 @@ func newDeploymentFailed(namespace, name string, err error) pkgreconciler.Event

type Reconciler struct {
// KubeClientSet allows us to talk to the k8s for core APIs
KubeClientSet kubernetes.Interface

KubeClientSet kubernetes.Interface
DynamicClientSet dynamic.Interface
// EventingClientSet allows us to configure Eventing objects
EventingClientSet eventingclientset.Interface

Expand All @@ -103,6 +109,12 @@ type Reconciler struct {
metricsConfig *metrics.ExporterOptions

sinkResolver *resolver.URIResolver

minReplicaCount *int32
maxReplicaCount *int32
cooldownPeriod *int32
pollingInterval *int32
triggerLagThreshold *int32
}

// Check that our Reconciler implements Interface
Expand Down Expand Up @@ -163,6 +175,7 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, src *v1alpha1.KafkaSourc
logging.FromContext(ctx).Error("Unable to create the receive adapter", zap.Error(err))
return err
}
r.scaleKafkaSource(ctx, ra, src)
src.Status.MarkDeployed(ra)

err = r.reconcileEventTypes(ctx, src)
Expand Down Expand Up @@ -206,6 +219,188 @@ func checkResourcesStatus(src *v1alpha1.KafkaSource) error {
return nil
}

func (r *Reconciler) scaleKafkaSource(ctx context.Context, ra *v1.Deployment, src *v1alpha1.KafkaSource) {
r.readScalingAnnotations(ctx, src)
// no scaling annotatins so no scaling work needed
if r.minReplicaCount == nil && r.maxReplicaCount == nil {
return
}
_, error := r.deployKedaScaledObject(ctx, ra, src)
if error != nil {
// additional logging?
}
}

func (r *Reconciler) deployKedaScaledObject(ctx context.Context, ra *v1.Deployment, src *v1alpha1.KafkaSource) (*unstructured.Unstructured, error) {
logger := logging.FromContext(ctx).Desugar()
deploymentName := ra.GetName()
logger.Info("Got ra", zap.Any("receiveAdapter", ra))
logger.Info("Got ra name "+deploymentName, zap.Any("deploymentName", deploymentName))
namespace := src.Namespace
name := src.Name
gvk := schema.GroupVersionKind{
Group: "keda.k8s.io",
Version: "v1alpha1",
Kind: "ScaledObject",
}
gvr, _ := meta.UnsafeGuessKindToResource(gvk)
scaledObjectResourceInterface := r.DynamicClientSet.Resource(gvr).Namespace(namespace)
if scaledObjectResourceInterface == nil {
return nil, fmt.Errorf("unable to create dynamic client for ScaledObject")
}
scaledObjectUnstr, err := r.generateKedaScaledObjectUnstructured(ctx, ra, src)
if err != nil {
return nil, err
}
created, err := scaledObjectResourceInterface.Create(scaledObjectUnstr, metav1.CreateOptions{})
if err != nil {
logger.Error("Failed to create ScaledObject so going to do update", zap.Error(err))
//fmt.Printf("Doing update as failed to create ScaledObject: %s \n", err)
// will replace - https://github.com/kubernetes/client-go/blob/master/examples/dynamic-create-update-delete-deployment/main.go
// doing kubectl "replace" https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#concurrency-control-and-consistency
// first get resourceVersion
existing, err := scaledObjectResourceInterface.Get(name, metav1.GetOptions{})
if err != nil {
logger.Error("Failed to create ScaledObject:", zap.Error(err))
return nil, err
}
resourceVersion := existing.GetResourceVersion()
scaledObjectUnstr.SetResourceVersion(resourceVersion)
updated, err := scaledObjectResourceInterface.Update(scaledObjectUnstr, metav1.UpdateOptions{})
if err != nil {
logger.Error("Update failed to create ScaledObject", zap.Error(err))
return nil, err
} else {
logger.Info("Update success", zap.Any("updated", updated))
return updated, nil
}
}
return created, nil
}

const (
kedaAutoscalingAnnotationClass = "keda.autoscaling.knative.dev"
kedaCooldownPeriodAnnodationKey = kedaAutoscalingAnnotationClass + "/cooldownPeriod"
kedaPollingIntervalAnnodationKey = kedaAutoscalingAnnotationClass + "/pollingInterval"
kedaTriggerLagThresholdAnnodationKey = kedaAutoscalingAnnotationClass + "/trigger.lagThreshold"
)

func convertMapKeyToInt32(dict map[string]string, key string, logger *zap.Logger) *int32 {
val, ok := dict[key]
if !ok {
return nil
}
i, err := strconv.ParseInt(val, 10, 32)
if err != nil {
logger.Error("Expected annotation value to be integer but got "+val, zap.Any("annotations key", key))
return nil
}
i32 := int32(i)
return &i32
}

func (r *Reconciler) readScalingAnnotations(ctx context.Context, src *v1alpha1.KafkaSource) {
logger := logging.FromContext(ctx).Desugar()
meta := src.GetObjectMeta()
annotations := meta.GetAnnotations()
if annotations != nil {
scalingClass := annotations[autoscaling.ClassAnnotationKey]
r.minReplicaCount = convertMapKeyToInt32(annotations, autoscaling.MinScaleAnnotationKey, logger)
r.maxReplicaCount = convertMapKeyToInt32(annotations, autoscaling.MaxScaleAnnotationKey, logger)
if scalingClass == kedaAutoscalingAnnotationClass {
r.cooldownPeriod = convertMapKeyToInt32(annotations, kedaCooldownPeriodAnnodationKey, logger)
r.pollingInterval = convertMapKeyToInt32(annotations, kedaPollingIntervalAnnodationKey, logger)
r.pollingInterval = convertMapKeyToInt32(annotations, kedaPollingIntervalAnnodationKey, logger)
r.triggerLagThreshold = convertMapKeyToInt32(annotations, kedaTriggerLagThresholdAnnodationKey, logger)

}
}
}

func (r *Reconciler) generateKedaScaledObjectUnstructured(ctx context.Context, ra *v1.Deployment, src *v1alpha1.KafkaSource) (*unstructured.Unstructured, error) {
logger := logging.FromContext(ctx).Desugar()
deploymentName := ra.GetName()
namespace := src.Namespace
name := src.Name
logger.Info("Unstructured ScaledObject name "+name, zap.Any("name", name))
srcName := src.GetName()
srcUID := src.GetUID()
srcResVersion := src.GetResourceVersion()
logger.Info("Got srcResVersion="+srcResVersion, zap.Any("srcResVersion", srcResVersion))
srcKind := src.GetGroupVersionKind().Kind
logger.Info("Got srcKind srcName srcUID", zap.Any("srcKind", srcKind), zap.Any("srcName", srcName), zap.Any("srcUID", srcUID))
srcBrokerList := src.Spec.BootstrapServers
srcConcumerGroup := src.Spec.ConsumerGroup
triggers := make([]map[string]interface{}, 0, 1)
topics := strings.Split(src.Spec.Topics, ",")
if len(topics) == 0 {
return nil, fmt.Errorf("Comma-separated list of topics can not be empty")
}
for _, topic := range topics {
triggerMetadata := map[string]interface{}{
"brokerList": srcBrokerList,
"consumerGroup": srcConcumerGroup,
"topic": topic,
}
if r.triggerLagThreshold != nil {
logger.Info("Got triggerLagThreshold", zap.Any("triggerLagThreshold", r.triggerLagThreshold))
triggerMetadata["lagThreshold"] = strconv.Itoa(int(*r.triggerLagThreshold))
}
trigger := map[string]interface{}{
"type": "kafka",
"metadata": triggerMetadata,
}
triggers = append(triggers, trigger)
}
spec := map[string]interface{}{
"scaleTargetRef": map[string]interface{}{
"deploymentName": deploymentName,
},
"triggers": triggers,
}
if r.minReplicaCount != nil {
logger.Info("Got minReplicaCount", zap.Any("minReplicaCount", r.minReplicaCount))
spec["minReplicaCount"] = *r.minReplicaCount
}
if r.maxReplicaCount != nil {
logger.Info("Got maxReplicaCount", zap.Any("maxReplicaCount", r.maxReplicaCount))
spec["maxReplicaCount"] = *r.maxReplicaCount
}
if r.cooldownPeriod != nil {
logger.Info("Got cooldownPeriod", zap.Any("cooldownPeriod", r.cooldownPeriod))
spec["cooldownPeriod"] = *r.cooldownPeriod
}
if r.pollingInterval != nil {
logger.Info("Got pollingInterval", zap.Any("pollingInterval", r.minReplicaCount))
spec["pollingInterval"] = *r.pollingInterval
}
soUnstr := &unstructured.Unstructured{
Object: map[string]interface{}{
"apiVersion": "keda.k8s.io/v1alpha1",
"kind": "ScaledObject",
"metadata": map[string]interface{}{
"creationTimestamp": nil,
"namespace": namespace,
"name": name,
"labels": map[string]interface{}{
"deploymentName": deploymentName,
},
"ownerReferences": []map[string]interface{}{{
"apiVersion": "sources.eventing.knative.dev/v1alpha1",
"kind": srcKind,
"name": srcName,
"uid": srcUID,
"blockOwnerDeletion": true,
"controller": true,
}},
},
"spec": spec,
},
}
logger.Info("Unstructured SO name "+name, zap.Any("name", name), zap.Any("soUnstr", soUnstr))
return soUnstr, nil
}

func (r *Reconciler) createReceiveAdapter(ctx context.Context, src *v1alpha1.KafkaSource, sinkURI string) (*appsv1.Deployment, error) {

if err := checkResourcesStatus(src); err != nil {
Expand Down

0 comments on commit dd0d2d0

Please sign in to comment.