Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Kubernetes Workload scaler #2010

Merged
merged 18 commits into from
Aug 6, 2021
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,11 @@ issues:
- path: scale_resolvers_test.go
linters:
- staticcheck
# Got "sigs.k8s.io/controller-runtime/pkg/client/fake is deprecated: please use pkg/envtest for testing"
# This might not be ideal, see: https://github.com/kubernetes-sigs/controller-runtime/issues/768
- path: kubernetes_workload_scaler_test.go
zroubalik marked this conversation as resolved.
Show resolved Hide resolved
linters:
- staticcheck
# https://github.com/go-critic/go-critic/issues/926
- linters:
- gocritic
Expand Down
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
- Add new scaler for Selenium Grid ([#1971](https://github.com/kedacore/keda/pull/1971))
- Support using regex to select the queues in RabbitMQ Scaler ([#1957](https://github.com/kedacore/keda/pull/1957))
- Support custom metric name in RabbitMQ Scaler ([#1976](https://github.com/kedacore/keda/pull/1976))
- Add new scaler for Kubernetes Workload ([#2010](https://github.com/kedacore/keda/pull/2010))

### Improvements

Expand Down
139 changes: 139 additions & 0 deletions pkg/scalers/kubernetes_workload_scaler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
package scalers

import (
"context"
"fmt"
"strconv"
"strings"

"k8s.io/api/autoscaling/v2beta2"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/metrics/pkg/apis/external_metrics"
"sigs.k8s.io/controller-runtime/pkg/client"

kedautil "github.com/kedacore/keda/v2/pkg/util"
)

type kubernetesWorkloadScaler struct {
metadata *kubernetesWorkloadMetadata
kubeClient client.Client
}

const (
kubernetesWorkloadMetricType = "External"
podSelectorKey = "podSelector"
namespaceKey = "namespace"
valueKey = "value"
)

type kubernetesWorkloadMetadata struct {
podSelector labels.Selector
namespace string
value int64
}

// NewKubernetesWorkloadScaler creates a new kubernetesWorkloadScaler
func NewKubernetesWorkloadScaler(kubeClient client.Client, config *ScalerConfig) (Scaler, error) {
meta, parseErr := parseWorkloadMetadata(config)
if parseErr != nil {
return nil, fmt.Errorf("error parsing kubernetes workload metadata: %s", parseErr)
}

return &kubernetesWorkloadScaler{
metadata: meta,
kubeClient: kubeClient,
}, nil
}

func parseWorkloadMetadata(config *ScalerConfig) (*kubernetesWorkloadMetadata, error) {
meta := &kubernetesWorkloadMetadata{}
var err error
meta.namespace = config.Namespace
meta.podSelector, err = labels.Parse(config.TriggerMetadata[podSelectorKey])
if err != nil || meta.podSelector.String() == "" {
return nil, fmt.Errorf("invalid pod selector")
}
meta.value, err = strconv.ParseInt(config.TriggerMetadata[valueKey], 10, 64)
if err != nil || meta.value == 0 {
return nil, fmt.Errorf("value must be an integer greater than 0")
}
return meta, nil
}

// IsActive determines if we need to scale from zero
func (s *kubernetesWorkloadScaler) IsActive(ctx context.Context) (bool, error) {
pods, err := s.getMetricValue(ctx)

if err != nil {
return false, err
}

return pods > 0, nil
}

// Close no need for kubernetes workload scaler
func (s *kubernetesWorkloadScaler) Close() error {
return nil
}

// GetMetricSpecForScaling returns the metric spec for the HPA
func (s *kubernetesWorkloadScaler) GetMetricSpecForScaling() []v2beta2.MetricSpec {
targetMetricValue := resource.NewQuantity(s.metadata.value, resource.DecimalSI)
externalMetric := &v2beta2.ExternalMetricSource{
Metric: v2beta2.MetricIdentifier{
Name: kedautil.NormalizeString(fmt.Sprintf("%s-%s-%s", "workload", s.metadata.namespace, normalizeSelectorString(s.metadata.podSelector))),
},
Target: v2beta2.MetricTarget{
Type: v2beta2.AverageValueMetricType,
AverageValue: targetMetricValue,
},
}
metricSpec := v2beta2.MetricSpec{External: externalMetric, Type: kubernetesWorkloadMetricType}
return []v2beta2.MetricSpec{metricSpec}
}

// GetMetrics returns value for a supported metric
func (s *kubernetesWorkloadScaler) GetMetrics(ctx context.Context, metricName string, metricSelector labels.Selector) ([]external_metrics.ExternalMetricValue, error) {
pods, err := s.getMetricValue(ctx)
if err != nil {
return []external_metrics.ExternalMetricValue{}, fmt.Errorf("error inspecting kubernetes workload: %s", err)
}

metric := external_metrics.ExternalMetricValue{
MetricName: metricName,
Value: *resource.NewQuantity(int64(pods), resource.DecimalSI),
Timestamp: metav1.Now(),
}

return append([]external_metrics.ExternalMetricValue{}, metric), nil
}

func (s *kubernetesWorkloadScaler) getMetricValue(ctx context.Context) (int, error) {
podList := &corev1.PodList{}
listOptions := client.ListOptions{}
listOptions.LabelSelector = s.metadata.podSelector
listOptions.Namespace = s.metadata.namespace
opts := []client.ListOption{
&listOptions,
}

err := s.kubeClient.List(ctx, podList, opts...)
if err != nil {
return 0, err
}

return len(podList.Items), nil
}

func normalizeSelectorString(selector labels.Selector) string {
s := selector.String()
s = strings.ReplaceAll(s, " ", "")
s = strings.ReplaceAll(s, "(", "-")
s = strings.ReplaceAll(s, ")", "-")
s = strings.ReplaceAll(s, ",", "-")
s = strings.ReplaceAll(s, "!", "-")
return s
}
138 changes: 138 additions & 0 deletions pkg/scalers/kubernetes_workload_scaler_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
package scalers

import (
"context"
"fmt"
"testing"
"time"

v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"sigs.k8s.io/controller-runtime/pkg/client/fake"
)

type workloadMetadataTestData struct {
metadata map[string]string
namespace string
isError bool
}

var parseWorkloadMetadataTestDataset = []workloadMetadataTestData{
{map[string]string{"value": "1", "podSelector": "app=demo"}, "test", false},
{map[string]string{"value": "1", "podSelector": "app=demo"}, "default", false},
{map[string]string{"value": "1", "podSelector": "app in (demo1, demo2)"}, "test", false},
{map[string]string{"value": "1", "podSelector": "app in (demo1, demo2),deploy in (deploy1, deploy2)"}, "test", false},
{map[string]string{"podSelector": "app=demo"}, "test", true},
{map[string]string{"podSelector": "app=demo"}, "default", true},
{map[string]string{"value": "1"}, "test", true},
{map[string]string{"value": "1"}, "default", true},
{map[string]string{"value": "a", "podSelector": "app=demo"}, "test", true},
{map[string]string{"value": "a", "podSelector": "app=demo"}, "default", true},
{map[string]string{"value": "0", "podSelector": "app=demo"}, "test", true},
{map[string]string{"value": "0", "podSelector": "app=demo"}, "default", true},
}

func TestParseWorkloadMetadata(t *testing.T) {
for _, testData := range parseWorkloadMetadataTestDataset {
_, err := parseWorkloadMetadata(&ScalerConfig{TriggerMetadata: testData.metadata, Namespace: testData.namespace})
if err != nil && !testData.isError {
t.Error("Expected success but got error", err)
}
if testData.isError && err == nil {
t.Error("Expected error but got success")
}
}
}

type workloadIsActiveTestData struct {
metadata map[string]string
namespace string
podCount int
active bool
}

var isActiveWorkloadTestDataset = []workloadIsActiveTestData{
// "podSelector": "app=demo", "namespace": "test"
{parseWorkloadMetadataTestDataset[0].metadata, parseWorkloadMetadataTestDataset[0].namespace, 0, false},
{parseWorkloadMetadataTestDataset[0].metadata, parseWorkloadMetadataTestDataset[0].namespace, 1, false},
{parseWorkloadMetadataTestDataset[0].metadata, parseWorkloadMetadataTestDataset[0].namespace, 15, false},
// "podSelector": "app=demo", "namespace": "default"
{parseWorkloadMetadataTestDataset[1].metadata, parseWorkloadMetadataTestDataset[1].namespace, 0, false},
{parseWorkloadMetadataTestDataset[1].metadata, parseWorkloadMetadataTestDataset[1].namespace, 1, true},
{parseWorkloadMetadataTestDataset[1].metadata, parseWorkloadMetadataTestDataset[1].namespace, 15, true},
}

func TestWorkloadIsActive(t *testing.T) {
for _, testData := range isActiveWorkloadTestDataset {
s, _ := NewKubernetesWorkloadScaler(
fake.NewFakeClient(createPodlist(testData.podCount)),
&ScalerConfig{
TriggerMetadata: testData.metadata,
AuthParams: map[string]string{},
GlobalHTTPTimeout: 1000 * time.Millisecond,
Namespace: testData.namespace,
},
)
isActive, _ := s.IsActive(context.TODO())
if testData.active && !isActive {
t.Error("Expected active but got inactive")
}
if !testData.active && isActive {
t.Error("Expected inactive but got active")
}
}
}

type workloadGetMetricSpecForScalingTestData struct {
metadata map[string]string
namespace string
name string
}

var getMetricSpecForScalingTestDataset = []workloadGetMetricSpecForScalingTestData{
// "podSelector": "app=demo", "namespace": "test"
{parseWorkloadMetadataTestDataset[0].metadata, parseWorkloadMetadataTestDataset[0].namespace, "workload-test-app=demo"},
// "podSelector": "app=demo", "namespace": "default"
{parseWorkloadMetadataTestDataset[1].metadata, parseWorkloadMetadataTestDataset[1].namespace, "workload-default-app=demo"},
// "podSelector": "app in (demo1, demo2)", "namespace": "test"
{parseWorkloadMetadataTestDataset[2].metadata, parseWorkloadMetadataTestDataset[2].namespace, "workload-test-appin-demo1-demo2-"},
// "podSelector": "app in (demo1, demo2),deploy in (deploy1, deploy2)", "namespace": "test"
{parseWorkloadMetadataTestDataset[3].metadata, parseWorkloadMetadataTestDataset[3].namespace, "workload-test-appin-demo1-demo2--deployin-deploy1-deploy2-"},
}

func TestWorkloadGetMetricSpecForScaling(t *testing.T) {
for _, testData := range getMetricSpecForScalingTestDataset {
s, _ := NewKubernetesWorkloadScaler(
fake.NewFakeClient(),
&ScalerConfig{
TriggerMetadata: testData.metadata,
AuthParams: map[string]string{},
GlobalHTTPTimeout: 1000 * time.Millisecond,
Namespace: testData.namespace,
},
)
metric := s.GetMetricSpecForScaling()

if metric[0].External.Metric.Name != testData.name {
t.Errorf("Expected '%s' as metric name and got '%s'", testData.name, metric[0].External.Metric.Name)
}
}
}

func createPodlist(count int) *v1.PodList {
list := &v1.PodList{}
for i := 0; i < count; i++ {
pod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("demo-pod-v%d", i),
Namespace: "default",
Annotations: map[string]string{},
Labels: map[string]string{
"app": "demo",
},
},
}
list.Items = append(list.Items, *pod)
}
return list
}
6 changes: 4 additions & 2 deletions pkg/scaling/scale_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -373,7 +373,7 @@ func (h *scaleHandler) buildScalers(withTriggers *kedav1alpha1.WithTriggers, pod
return []scalers.Scaler{}, err
}

scaler, err := buildScaler(trigger.Type, config)
scaler, err := buildScaler(h.client, trigger.Type, config)
if err != nil {
closeScalers(scalersRes)
h.recorder.Event(withTriggers, corev1.EventTypeWarning, eventreason.KEDAScalerFailed, err.Error())
Expand All @@ -386,7 +386,7 @@ func (h *scaleHandler) buildScalers(withTriggers *kedav1alpha1.WithTriggers, pod
return scalersRes, nil
}

func buildScaler(triggerType string, config *scalers.ScalerConfig) (scalers.Scaler, error) {
func buildScaler(client client.Client, triggerType string, config *scalers.ScalerConfig) (scalers.Scaler, error) {
// TRIGGERS-START
switch triggerType {
case "artemis-queue":
Expand Down Expand Up @@ -429,6 +429,8 @@ func buildScaler(triggerType string, config *scalers.ScalerConfig) (scalers.Scal
return scalers.NewInfluxDBScaler(config)
case "kafka":
return scalers.NewKafkaScaler(config)
case "kubernetes-workload":
return scalers.NewKubernetesWorkloadScaler(client, config)
case "liiklus":
return scalers.NewLiiklusScaler(config)
case "memory":
Expand Down
Loading