From 81e7d3bee7c0f157b2948b3bab8c68b2d5583183 Mon Sep 17 00:00:00 2001 From: Nicolas Lassalle Date: Tue, 23 Nov 2021 16:30:59 +0100 Subject: [PATCH] Add Elasticsearch Scaler based on search template (#2311) Signed-off-by: Nicolas L --- .pre-commit-config.yaml | 2 +- CHANGELOG.md | 1 + go.mod | 1 + go.sum | 2 + pkg/scalers/elasticsearch_scaler.go | 277 +++++++++++++++ pkg/scalers/elasticsearch_scaler_test.go | 430 +++++++++++++++++++++++ pkg/scalers/metrics_api_scaler.go | 2 +- pkg/scaling/scale_handler.go | 2 + tests/scalers/elasticsearch.test.ts | 331 +++++++++++++++++ 9 files changed, 1046 insertions(+), 2 deletions(-) create mode 100644 pkg/scalers/elasticsearch_scaler.go create mode 100644 pkg/scalers/elasticsearch_scaler_test.go create mode 100644 tests/scalers/elasticsearch.test.ts diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index b31a24da3ba..1430e5c745f 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -32,7 +32,7 @@ repos: entry: "(?i)(black|white)[_-]?(list|List)" pass_filenames: true - id: sort-scalers - name: Check if scalers are sorted in scaler_handler.go + name: Check if scalers are sorted in scale_handler.go language: system entry: "bash tools/sort_scalers.sh" files: .*scale_handler\.go$ diff --git a/CHANGELOG.md b/CHANGELOG.md index b4f96a76474..6c4409db0b3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -29,6 +29,7 @@ - Improve Redis Scaler, upgrade library, add username and Sentinel support ([#2181](https://github.com/kedacore/keda/pull/2181)) - Add GCP identity authentication when using Pubsub Scaler ([#2225](https://github.com/kedacore/keda/pull/2225)) - Add ScalersCache to reuse scalers unless they need changing ([#2187](https://github.com/kedacore/keda/pull/2187)) +- Add Elasticsearch Scaler based on search template ([#2311](https://github.com/kedacore/keda/pull/2311)) - Cache metric names provided by KEDA Metrics Server ([#2279](https://github.com/kedacore/keda/pull/2279)) ### Improvements diff --git a/go.mod b/go.mod index 3b1f2e8a1c6..60793db6db5 100644 --- a/go.mod +++ b/go.mod @@ -17,6 +17,7 @@ require ( github.com/Shopify/sarama v1.30.0 github.com/aws/aws-sdk-go v1.42.3 github.com/denisenkom/go-mssqldb v0.11.0 + github.com/elastic/go-elasticsearch/v7 v7.15.1 github.com/go-logr/logr v0.4.0 github.com/go-playground/assert/v2 v2.0.1 github.com/go-redis/redis/v8 v8.11.4 diff --git a/go.sum b/go.sum index a14105f5414..1bb81244dcc 100644 --- a/go.sum +++ b/go.sum @@ -254,6 +254,8 @@ github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21 h1:YEetp8 github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1:+020luEh2TKB4/GOp8oxxtq0Daoen/Cii55CzbTV6DU= github.com/eapache/queue v1.1.0 h1:YOEu7KNc61ntiQlcEeUIoDTJ2o8mQznoNvUhiigpIqc= github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I= +github.com/elastic/go-elasticsearch/v7 v7.15.1 h1:Wd8RLHb5D8xPBU8vGlnLXyflkso9G+rCmsXjqH8LLQQ= +github.com/elastic/go-elasticsearch/v7 v7.15.1/go.mod h1:OJ4wdbtDNk5g503kvlHLyErCgQwwzmDtaFC4XyOxXA4= github.com/elazarl/goproxy v0.0.0-20180725130230-947c36da3153/go.mod h1:/Zj4wYkgs4iZTTu3o/KG3Itv/qCCa8VVMlb3i9OVuzc= github.com/emicklei/go-restful v0.0.0-20170410110728-ff4f55a20633/go.mod h1:otzb+WCGbkyDHkqmQmT5YD2WR4BBwUdeQoFo8l/7tVs= github.com/emicklei/go-restful v2.9.5+incompatible/go.mod h1:otzb+WCGbkyDHkqmQmT5YD2WR4BBwUdeQoFo8l/7tVs= diff --git a/pkg/scalers/elasticsearch_scaler.go b/pkg/scalers/elasticsearch_scaler.go new file mode 100644 index 00000000000..cf5330112bc --- /dev/null +++ b/pkg/scalers/elasticsearch_scaler.go @@ -0,0 +1,277 @@ +package scalers + +import ( + "bytes" + "context" + "crypto/tls" + "encoding/json" + "fmt" + "io/ioutil" + "net/http" + "strconv" + "strings" + + "github.com/elastic/go-elasticsearch/v7" + "github.com/tidwall/gjson" + "k8s.io/api/autoscaling/v2beta2" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/metrics/pkg/apis/external_metrics" + logf "sigs.k8s.io/controller-runtime/pkg/log" + + kedautil "github.com/kedacore/keda/v2/pkg/util" +) + +type elasticsearchScaler struct { + metadata *elasticsearchMetadata + esClient *elasticsearch.Client +} + +type elasticsearchMetadata struct { + addresses []string + unsafeSsl bool + username string + password string + indexes []string + searchTemplateName string + parameters []string + valueLocation string + targetValue int + metricName string +} + +var elasticsearchLog = logf.Log.WithName("elasticsearch_scaler") + +// NewElasticsearchScaler creates a new elasticsearch scaler +func NewElasticsearchScaler(config *ScalerConfig) (Scaler, error) { + meta, err := parseElasticsearchMetadata(config) + if err != nil { + return nil, fmt.Errorf("error parsing elasticsearch metadata: %s", err) + } + + esClient, err := newElasticsearchClient(meta) + if err != nil { + return nil, fmt.Errorf("error getting elasticsearch client: %s", err) + } + return &elasticsearchScaler{ + metadata: meta, + esClient: esClient, + }, nil +} + +const defaultUnsafeSsl = false + +func parseElasticsearchMetadata(config *ScalerConfig) (*elasticsearchMetadata, error) { + meta := elasticsearchMetadata{} + + var err error + addresses, err := GetFromAuthOrMeta(config, "addresses") + if err != nil { + return nil, err + } + meta.addresses = splitAndTrimBySep(addresses, ",") + + if val, ok := config.TriggerMetadata["unsafeSsl"]; ok { + meta.unsafeSsl, err = strconv.ParseBool(val) + if err != nil { + return nil, fmt.Errorf("error parsing unsafeSsl: %s", err) + } + } else { + meta.unsafeSsl = defaultUnsafeSsl + } + + if val, ok := config.AuthParams["username"]; ok { + meta.username = val + } else if val, ok := config.TriggerMetadata["username"]; ok { + meta.username = val + } + + if config.AuthParams["password"] != "" { + meta.password = config.AuthParams["password"] + } else if config.TriggerMetadata["passwordFromEnv"] != "" { + meta.password = config.ResolvedEnv[config.TriggerMetadata["passwordFromEnv"]] + } + + index, err := GetFromAuthOrMeta(config, "index") + if err != nil { + return nil, err + } + meta.indexes = splitAndTrimBySep(index, ";") + + meta.searchTemplateName, err = GetFromAuthOrMeta(config, "searchTemplateName") + if err != nil { + return nil, err + } + + if val, ok := config.TriggerMetadata["parameters"]; ok { + meta.parameters = splitAndTrimBySep(val, ";") + } + + meta.valueLocation, err = GetFromAuthOrMeta(config, "valueLocation") + if err != nil { + return nil, err + } + + targetValue, err := GetFromAuthOrMeta(config, "targetValue") + if err != nil { + return nil, err + } + meta.targetValue, err = strconv.Atoi(targetValue) + if err != nil { + return nil, fmt.Errorf("targetValue parsing error %s", err.Error()) + } + + meta.metricName = GenerateMetricNameWithIndex(config.ScalerIndex, kedautil.NormalizeString(fmt.Sprintf("elasticsearch-%s", meta.searchTemplateName))) + return &meta, nil +} + +// newElasticsearchClient creates elasticsearch db connection +func newElasticsearchClient(meta *elasticsearchMetadata) (*elasticsearch.Client, error) { + config := elasticsearch.Config{Addresses: meta.addresses} + if meta.username != "" { + config.Username = meta.username + } + if meta.password != "" { + config.Password = meta.password + } + + transport := http.DefaultTransport.(*http.Transport) + transport.TLSClientConfig = &tls.Config{InsecureSkipVerify: meta.unsafeSsl} + config.Transport = transport + + esClient, err := elasticsearch.NewClient(config) + if err != nil { + elasticsearchLog.Error(err, fmt.Sprintf("Found error when creating client: %s", err)) + return nil, err + } + + _, err = esClient.Info() + if err != nil { + elasticsearchLog.Error(err, fmt.Sprintf("Found error when pinging search engine: %s", err)) + return nil, err + } + return esClient, nil +} + +func (s *elasticsearchScaler) Close(ctx context.Context) error { + return nil +} + +// IsActive returns true if there are pending messages to be processed +func (s *elasticsearchScaler) IsActive(ctx context.Context) (bool, error) { + messages, err := s.getQueryResult(ctx) + if err != nil { + elasticsearchLog.Error(err, fmt.Sprintf("Error inspecting elasticsearch: %s", err)) + return false, err + } + return messages > 0, nil +} + +// getQueryResult returns result of the scaler query +func (s *elasticsearchScaler) getQueryResult(ctx context.Context) (int, error) { + // Build the request body. + var body bytes.Buffer + if err := json.NewEncoder(&body).Encode(buildQuery(s.metadata)); err != nil { + elasticsearchLog.Error(err, "Error encoding query: %s", err) + } + + // Run the templated search + res, err := s.esClient.SearchTemplate( + &body, + s.esClient.SearchTemplate.WithIndex(s.metadata.indexes...), + s.esClient.SearchTemplate.WithContext(ctx), + ) + if err != nil { + elasticsearchLog.Error(err, fmt.Sprintf("Could not query elasticsearch: %s", err)) + return 0, err + } + + defer res.Body.Close() + b, err := ioutil.ReadAll(res.Body) + if err != nil { + return 0, err + } + v, err := getValueFromSearch(b, s.metadata.valueLocation) + if err != nil { + return 0, err + } + return v, nil +} + +func buildQuery(metadata *elasticsearchMetadata) map[string]interface{} { + parameters := map[string]interface{}{} + for _, p := range metadata.parameters { + if p != "" { + kv := splitAndTrimBySep(p, ":") + parameters[kv[0]] = kv[1] + } + } + query := map[string]interface{}{ + "id": metadata.searchTemplateName, + } + if len(parameters) > 0 { + query["params"] = parameters + } + return query +} + +func getValueFromSearch(body []byte, valueLocation string) (int, error) { + r := gjson.GetBytes(body, valueLocation) + errorMsg := "valueLocation must point to value of type number but got: '%s'" + if r.Type == gjson.String { + q, err := strconv.Atoi(r.String()) + if err != nil { + return 0, fmt.Errorf(errorMsg, r.String()) + } + return q, nil + } + if r.Type != gjson.Number { + return 0, fmt.Errorf(errorMsg, r.Type.String()) + } + return int(r.Num), nil +} + +// GetMetricSpecForScaling returns the MetricSpec for the Horizontal Pod Autoscaler +func (s *elasticsearchScaler) GetMetricSpecForScaling(context.Context) []v2beta2.MetricSpec { + targetValue := resource.NewQuantity(int64(s.metadata.targetValue), resource.DecimalSI) + + externalMetric := &v2beta2.ExternalMetricSource{ + Metric: v2beta2.MetricIdentifier{ + Name: s.metadata.metricName, + }, + Target: v2beta2.MetricTarget{ + Type: v2beta2.AverageValueMetricType, + AverageValue: targetValue, + }, + } + metricSpec := v2beta2.MetricSpec{ + External: externalMetric, Type: externalMetricType, + } + return []v2beta2.MetricSpec{metricSpec} +} + +// GetMetrics returns value for a supported metric and an error if there is a problem getting the metric +func (s *elasticsearchScaler) GetMetrics(ctx context.Context, metricName string, metricSelector labels.Selector) ([]external_metrics.ExternalMetricValue, error) { + num, err := s.getQueryResult(ctx) + if err != nil { + return []external_metrics.ExternalMetricValue{}, fmt.Errorf("error inspecting elasticsearch: %s", err) + } + + metric := external_metrics.ExternalMetricValue{ + MetricName: metricName, + Value: *resource.NewQuantity(int64(num), resource.DecimalSI), + Timestamp: metav1.Now(), + } + + return append([]external_metrics.ExternalMetricValue{}, metric), nil +} + +// Splits a string separated by a specified separator and trims space from all the elements. +func splitAndTrimBySep(s string, sep string) []string { + x := strings.Split(s, sep) + for i := range x { + x[i] = strings.Trim(x[i], " ") + } + return x +} diff --git a/pkg/scalers/elasticsearch_scaler_test.go b/pkg/scalers/elasticsearch_scaler_test.go new file mode 100644 index 00000000000..c219dbba801 --- /dev/null +++ b/pkg/scalers/elasticsearch_scaler_test.go @@ -0,0 +1,430 @@ +package scalers + +import ( + "context" + "errors" + "fmt" + "testing" + + "github.com/stretchr/testify/assert" +) + +type parseElasticsearchMetadataTestData struct { + name string + metadata map[string]string + resolvedEnv map[string]string + authParams map[string]string + expectedMetadata *elasticsearchMetadata + expectedError error +} + +type paramsTestData struct { + name string + metadata map[string]string + authParams map[string]string + expectedQuery map[string]interface{} +} + +type elasticsearchMetricIdentifier struct { + metadataTestData *parseElasticsearchMetadataTestData + scalerIndex int + name string +} + +var testCases = []parseElasticsearchMetadataTestData{ + { + name: "no addresses given", + metadata: map[string]string{}, + authParams: map[string]string{}, + expectedError: errors.New("no addresses given"), + }, + { + name: "no index given", + metadata: map[string]string{"addresses": "http://localhost:9200"}, + authParams: map[string]string{"username": "admin"}, + expectedError: errors.New("no index given"), + }, + { + name: "no searchTemplateName given", + metadata: map[string]string{ + "addresses": "http://localhost:9200", + "index": "index1", + }, + authParams: map[string]string{"username": "admin"}, + expectedError: errors.New("no searchTemplateName given"), + }, + { + name: "no valueLocation given", + metadata: map[string]string{ + "addresses": "http://localhost:9200", + "index": "index1", + "searchTemplateName": "searchTemplateName", + }, + authParams: map[string]string{"username": "admin"}, + expectedError: errors.New("no valueLocation given"), + }, + { + name: "no targetValue given", + metadata: map[string]string{ + "addresses": "http://localhost:9200", + "index": "index1", + "searchTemplateName": "searchTemplateName", + "valueLocation": "toto", + }, + authParams: map[string]string{"username": "admin"}, + expectedError: errors.New("no targetValue given"), + }, + { + name: "all fields ok", + metadata: map[string]string{ + "addresses": "http://localhost:9200", + "unsafeSsl": "true", + "index": "index1", + "searchTemplateName": "myAwesomeSearch", + "parameters": "param1:value1", + "valueLocation": "hits.hits[0]._source.value", + "targetValue": "12", + }, + authParams: map[string]string{ + "username": "admin", + "password": "password", + }, + expectedMetadata: &elasticsearchMetadata{ + addresses: []string{"http://localhost:9200"}, + unsafeSsl: true, + indexes: []string{"index1"}, + username: "admin", + password: "password", + searchTemplateName: "myAwesomeSearch", + parameters: []string{"param1:value1"}, + valueLocation: "hits.hits[0]._source.value", + targetValue: 12, + metricName: "s0-elasticsearch-myAwesomeSearch", + }, + expectedError: nil, + }, + { + name: "multi indexes", + metadata: map[string]string{ + "addresses": "http://localhost:9200", + "unsafeSsl": "false", + "index": "index1;index2", + "searchTemplateName": "myAwesomeSearch", + "parameters": "param1:value1", + "valueLocation": "hits.hits[0]._source.value", + "targetValue": "12", + }, + authParams: map[string]string{ + "username": "admin", + "password": "password", + }, + expectedMetadata: &elasticsearchMetadata{ + addresses: []string{"http://localhost:9200"}, + unsafeSsl: false, + indexes: []string{"index1", "index2"}, + username: "admin", + password: "password", + searchTemplateName: "myAwesomeSearch", + parameters: []string{"param1:value1"}, + valueLocation: "hits.hits[0]._source.value", + targetValue: 12, + metricName: "s0-elasticsearch-myAwesomeSearch", + }, + expectedError: nil, + }, + { + name: "multi indexes trimmed", + metadata: map[string]string{ + "addresses": "http://localhost:9200", + "unsafeSsl": "false", + "index": "index1 ; index2", + "searchTemplateName": "myAwesomeSearch", + "parameters": "param1:value1", + "valueLocation": "hits.hits[0]._source.value", + "targetValue": "12", + }, + authParams: map[string]string{ + "username": "admin", + "password": "password", + }, + expectedMetadata: &elasticsearchMetadata{ + addresses: []string{"http://localhost:9200"}, + unsafeSsl: false, + indexes: []string{"index1", "index2"}, + username: "admin", + password: "password", + searchTemplateName: "myAwesomeSearch", + parameters: []string{"param1:value1"}, + valueLocation: "hits.hits[0]._source.value", + targetValue: 12, + metricName: "s0-elasticsearch-myAwesomeSearch", + }, + expectedError: nil, + }, + { + name: "multi addresses", + metadata: map[string]string{ + "addresses": "http://localhost:9200,http://localhost:9201", + "unsafeSsl": "false", + "index": "index1", + "searchTemplateName": "myAwesomeSearch", + "parameters": "param1:value1", + "valueLocation": "hits.hits[0]._source.value", + "targetValue": "12", + }, + authParams: map[string]string{ + "username": "admin", + "password": "password", + }, + expectedMetadata: &elasticsearchMetadata{ + addresses: []string{"http://localhost:9200", "http://localhost:9201"}, + unsafeSsl: false, + indexes: []string{"index1"}, + username: "admin", + password: "password", + searchTemplateName: "myAwesomeSearch", + parameters: []string{"param1:value1"}, + valueLocation: "hits.hits[0]._source.value", + targetValue: 12, + metricName: "s0-elasticsearch-myAwesomeSearch", + }, + expectedError: nil, + }, + { + name: "multi addresses trimmed", + metadata: map[string]string{ + "addresses": "http://localhost:9200 , http://localhost:9201", + "unsafeSsl": "false", + "index": "index1", + "searchTemplateName": "myAwesomeSearch", + "parameters": "param1:value1", + "valueLocation": "hits.hits[0]._source.value", + "targetValue": "12", + }, + authParams: map[string]string{ + "username": "admin", + "password": "password", + }, + expectedMetadata: &elasticsearchMetadata{ + addresses: []string{"http://localhost:9200", "http://localhost:9201"}, + unsafeSsl: false, + indexes: []string{"index1"}, + username: "admin", + password: "password", + searchTemplateName: "myAwesomeSearch", + parameters: []string{"param1:value1"}, + valueLocation: "hits.hits[0]._source.value", + targetValue: 12, + metricName: "s0-elasticsearch-myAwesomeSearch", + }, + expectedError: nil, + }, + { + name: "password from env", + metadata: map[string]string{ + "addresses": "http://localhost:9200,http://localhost:9201", + "unsafeSsl": "false", + "index": "index1", + "searchTemplateName": "myAwesomeSearch", + "parameters": "param1:value1", + "valueLocation": "hits.hits[0]._source.value", + "targetValue": "12", + "passwordFromEnv": "ELASTICSEARCH_PASSWORD", + }, + authParams: map[string]string{ + "username": "admin", + }, + resolvedEnv: map[string]string{ + "ELASTICSEARCH_PASSWORD": "password", + }, + expectedMetadata: &elasticsearchMetadata{ + addresses: []string{"http://localhost:9200", "http://localhost:9201"}, + unsafeSsl: false, + indexes: []string{"index1"}, + username: "admin", + password: "password", + searchTemplateName: "myAwesomeSearch", + parameters: []string{"param1:value1"}, + valueLocation: "hits.hits[0]._source.value", + targetValue: 12, + metricName: "s0-elasticsearch-myAwesomeSearch", + }, + expectedError: nil, + }, +} + +func TestParseElasticsearchMetadata(t *testing.T) { + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + metadata, err := parseElasticsearchMetadata(&ScalerConfig{ + TriggerMetadata: tc.metadata, + AuthParams: tc.authParams, + ResolvedEnv: tc.resolvedEnv, + }) + if tc.expectedError != nil { + assert.Contains(t, err.Error(), tc.expectedError.Error()) + } else { + assert.NoError(t, err) + fmt.Println(tc.name) + assert.Equal(t, tc.expectedMetadata, metadata) + } + }) + } +} + +func TestUnsafeSslDefaultValue(t *testing.T) { + tc := &parseElasticsearchMetadataTestData{ + name: "all fields ok", + metadata: map[string]string{ + "addresses": "http://localhost:9200", + "index": "index1", + "searchTemplateName": "myAwesomeSearch", + "parameters": "param1:value1", + "valueLocation": "hits.hits[0]._source.value", + "targetValue": "12", + }, + authParams: map[string]string{ + "username": "admin", + "password": "password", + }, + expectedMetadata: &elasticsearchMetadata{ + addresses: []string{"http://localhost:9200"}, + unsafeSsl: false, + indexes: []string{"index1"}, + username: "admin", + password: "password", + searchTemplateName: "myAwesomeSearch", + parameters: []string{"param1:value1"}, + valueLocation: "hits.hits[0]._source.value", + targetValue: 12, + metricName: "s0-elasticsearch-myAwesomeSearch", + }, + expectedError: nil, + } + metadata, err := parseElasticsearchMetadata(&ScalerConfig{ + TriggerMetadata: tc.metadata, + AuthParams: tc.authParams, + }) + assert.NoError(t, err) + assert.Equal(t, tc.expectedMetadata, metadata) +} + +func TestBuildQuery(t *testing.T) { + var testCases = []paramsTestData{ + { + name: "no params", + metadata: map[string]string{ + "addresses": "http://localhost:9200", + "index": "index1", + "searchTemplateName": "myAwesomeSearch", + "parameters": "", + "valueLocation": "hits.hits[0]._source.value", + "targetValue": "12", + }, + authParams: map[string]string{ + "username": "admin", + "password": "password", + }, + expectedQuery: map[string]interface{}{ + "id": "myAwesomeSearch", + }, + }, + { + name: "one param", + metadata: map[string]string{ + "addresses": "http://localhost:9200", + "index": "index1", + "searchTemplateName": "myAwesomeSearch", + "parameters": "param1:value1", + "valueLocation": "hits.hits[0]._source.value", + "targetValue": "12", + }, + authParams: map[string]string{ + "username": "admin", + "password": "password", + }, + expectedQuery: map[string]interface{}{ + "id": "myAwesomeSearch", + "params": map[string]interface{}{ + "param1": "value1", + }, + }, + }, + { + name: "two params", + metadata: map[string]string{ + "addresses": "http://localhost:9200", + "index": "index1", + "searchTemplateName": "myAwesomeSearch", + "parameters": "param1:value1;param2:value2", + "valueLocation": "hits.hits[0]._source.value", + "targetValue": "12", + }, + authParams: map[string]string{ + "username": "admin", + "password": "password", + }, + expectedQuery: map[string]interface{}{ + "id": "myAwesomeSearch", + "params": map[string]interface{}{ + "param1": "value1", + "param2": "value2", + }, + }, + }, + { + name: "params are trimmed", + metadata: map[string]string{ + "addresses": "http://localhost:9200", + "index": "index1", + "searchTemplateName": "myAwesomeSearch", + "parameters": "param1 : value1 ; param2 : value2 ", + "valueLocation": "hits.hits[0]._source.value", + "targetValue": "12", + }, + authParams: map[string]string{ + "username": "admin", + "password": "password", + }, + expectedQuery: map[string]interface{}{ + "id": "myAwesomeSearch", + "params": map[string]interface{}{ + "param1": "value1", + "param2": "value2", + }, + }, + }, + } + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + metadata, err := parseElasticsearchMetadata(&ScalerConfig{ + TriggerMetadata: tc.metadata, + AuthParams: tc.authParams, + }) + assert.NoError(t, err) + assert.Equal(t, tc.expectedQuery, buildQuery(metadata)) + }) + } +} + +func TestElasticsearchGetMetricSpecForScaling(t *testing.T) { + var elasticsearchMetricIdentifiers = []elasticsearchMetricIdentifier{ + {&testCases[5], 0, "s0-elasticsearch-myAwesomeSearch"}, + {&testCases[6], 1, "s1-elasticsearch-myAwesomeSearch"}, + } + + for _, testData := range elasticsearchMetricIdentifiers { + ctx := context.Background() + meta, err := parseElasticsearchMetadata(&ScalerConfig{ + TriggerMetadata: testData.metadataTestData.metadata, + AuthParams: testData.metadataTestData.authParams, + ScalerIndex: testData.scalerIndex, + }) + if err != nil { + t.Fatal("Could not parse metadata:", err) + } + elasticsearchScaler := elasticsearchScaler{metadata: meta, esClient: nil} + metricSpec := elasticsearchScaler.GetMetricSpecForScaling(ctx) + assert.Equal(t, metricSpec[0].External.Metric.Name, testData.name) + } +} diff --git a/pkg/scalers/metrics_api_scaler.go b/pkg/scalers/metrics_api_scaler.go index 901b234435b..fd62a5bc773 100644 --- a/pkg/scalers/metrics_api_scaler.go +++ b/pkg/scalers/metrics_api_scaler.go @@ -186,7 +186,7 @@ func parseMetricsAPIMetadata(config *ScalerConfig) (*metricsAPIScalerMetadata, e // GetValueFromResponse uses provided valueLocation to access the numeric value in provided body func GetValueFromResponse(body []byte, valueLocation string) (*resource.Quantity, error) { r := gjson.GetBytes(body, valueLocation) - errorMsg := "valueLocation must point to value of type number or a string representing a Quanitity got: '%s'" + errorMsg := "valueLocation must point to value of type number or a string representing a Quantity got: '%s'" if r.Type == gjson.String { q, err := resource.ParseQuantity(r.String()) if err != nil { diff --git a/pkg/scaling/scale_handler.go b/pkg/scaling/scale_handler.go index 68be3bcadb9..4f815dfc59f 100644 --- a/pkg/scaling/scale_handler.go +++ b/pkg/scaling/scale_handler.go @@ -355,6 +355,8 @@ func buildScaler(ctx context.Context, client client.Client, triggerType string, return scalers.NewCPUMemoryScaler(corev1.ResourceCPU, config) case "cron": return scalers.NewCronScaler(config) + case "elasticsearch": + return scalers.NewElasticsearchScaler(config) case "external": return scalers.NewExternalScaler(config) case "external-push": diff --git a/tests/scalers/elasticsearch.test.ts b/tests/scalers/elasticsearch.test.ts new file mode 100644 index 00000000000..3b04013d3c5 --- /dev/null +++ b/tests/scalers/elasticsearch.test.ts @@ -0,0 +1,331 @@ +import * as fs from 'fs' +import * as sh from 'shelljs' +import * as tmp from 'tmp' +import test from 'ava' +import {waitForRollout} from "./helpers"; + +const testNamespace = 'elasticsearch-test' +const elasticsearchNamespace = 'elasticsearch' +const deploymentName = 'podinfo' +const indexName = 'keda' +const searchTemplateName = 'keda-search-template' +const elasticPassword = 'passw0rd!' +const kubectlExecCurl = `kubectl exec -n ${elasticsearchNamespace} elasticsearch-0 -- curl -sS -H "content-type: application/json" -u "elastic:${elasticPassword}"` + +test.before(t => { + // install elasticsearch + sh.exec(`kubectl create namespace ${elasticsearchNamespace}`) + const elasticsearchTmpFile = tmp.fileSync() + fs.writeFileSync(elasticsearchTmpFile.name, elasticsearchStatefulsetYaml.replace('{{ELASTIC_PASSWORD}}', elasticPassword)) + + t.is(0, sh.exec(`kubectl apply --namespace ${elasticsearchNamespace} -f ${elasticsearchTmpFile.name}`).code, 'creating an elasticsearch statefulset should work.') + t.is(0, waitForRollout('statefulset', "elasticsearch", elasticsearchNamespace)) + + // Create the index and the search template + sh.exec(`${kubectlExecCurl} -XPUT http://localhost:9200/${indexName} -d '${elastisearchCreateIndex}'`) + sh.exec(`${kubectlExecCurl} -XPUT http://localhost:9200/_scripts/${searchTemplateName} -d '${elasticsearchSearchTemplate}'`) + + + sh.exec(`kubectl create namespace ${testNamespace}`) + + // deploy dummy app and scaled object + const tmpFile = tmp.fileSync() + fs.writeFileSync(tmpFile.name, deployYaml.replace(/{{DEPLOYMENT_NAME}}/g, deploymentName) + .replace('{{ELASTICSEARCH_NAMESPACE}}', elasticsearchNamespace) + .replace('{{SEARCH_TEMPLATE_NAME}}', searchTemplateName) + .replace('{{INDEX_NAME}}', indexName) + .replace('{{ELASTIC_PASSWORD_BASE64}}', Buffer.from(elasticPassword).toString('base64')) + ) + + t.is( + 0, + sh.exec(`kubectl apply -f ${tmpFile.name} --namespace ${testNamespace}`).code, + 'creating a deployment should work..' + ) +}) + +test.serial('Deployment should have 0 replicas on start', t => { + const replicaCount = sh.exec( + `kubectl get deployment.apps/${deploymentName} --namespace ${testNamespace} -o jsonpath="{.spec.replicas}"` + ).stdout + t.is(replicaCount, '0', 'replica count should start out as 0') +}) + +test.serial(`Deployment should scale to 5 (the max) then back to 0`, t => { + + for (let i = 0; i < 5; i++) { + let doc = elasticsearchDummyDoc.replace("{{TIMESTAMP}}", new Date().toISOString()) + sh.exec(`${kubectlExecCurl} -XPOST http://localhost:9200/${indexName}/_doc -d '${doc}'`) + } + + let replicaCount = '0' + + const maxReplicaCount = '5' + + for (let i = 0; i < 90 && replicaCount !== maxReplicaCount; i++) { + replicaCount = sh.exec( + `kubectl get deployment.apps/${deploymentName} --namespace ${testNamespace} -o jsonpath="{.spec.replicas}"` + ).stdout + if (replicaCount !== maxReplicaCount) { + sh.exec('sleep 2s') + } + } + + t.is(maxReplicaCount, replicaCount, `Replica count should be ${maxReplicaCount} after 60 seconds`) + + for (let i = 0; i < 36 && replicaCount !== '0'; i++) { + replicaCount = sh.exec( + `kubectl get deployment.apps/${deploymentName} --namespace ${testNamespace} -o jsonpath="{.spec.replicas}"` + ).stdout + if (replicaCount !== '0') { + sh.exec('sleep 5s') + } + } + + t.is('0', replicaCount, 'Replica count should be 0 after 3 minutes') +}) + +test.after.always.cb('clean up elasticsearch deployment', t => { + sh.exec(`kubectl delete namespace ${testNamespace}`) + + // uninstall elasticsearch + sh.exec(`kubectl delete --namespace ${elasticsearchNamespace} sts/elasticsearch`) + sh.exec(`kubectl delete namespace ${elasticsearchNamespace}`) + + t.end() +}) + +const deployYaml = `apiVersion: apps/v1 +kind: Deployment +metadata: + labels: + app: {{DEPLOYMENT_NAME}} + name: {{DEPLOYMENT_NAME}} +spec: + replicas: 0 + selector: + matchLabels: + app: {{DEPLOYMENT_NAME}} + template: + metadata: + labels: + app: {{DEPLOYMENT_NAME}} + spec: + containers: + - image: stefanprodan/podinfo + name: {{DEPLOYMENT_NAME}} +--- +apiVersion: v1 +kind: Secret +metadata: + name: elasticsearch-secrets +type: Opaque +data: + password: {{ELASTIC_PASSWORD_BASE64}} +--- +apiVersion: keda.sh/v1alpha1 +kind: TriggerAuthentication +metadata: + name: keda-trigger-auth-elasticsearch-secret +spec: + secretTargetRef: + - parameter: password + name: elasticsearch-secrets + key: password +--- +apiVersion: keda.sh/v1alpha1 +kind: ScaledObject +metadata: + name: elasticsearch-scaledobject +spec: + minReplicaCount: 0 + maxReplicaCount: 5 + pollingInterval: 3 + cooldownPeriod: 5 + scaleTargetRef: + name: {{DEPLOYMENT_NAME}} + triggers: + - type: elasticsearch + metadata: + addresses: "http://elasticsearch-svc.{{ELASTICSEARCH_NAMESPACE}}.svc.cluster.local:9200" + username: "elastic" + index: {{INDEX_NAME}} + searchTemplateName: {{SEARCH_TEMPLATE_NAME}} + valueLocation: "hits.total.value" + targetValue: "1" + parameters: "dummy_value:1;dumb_value:oOooo" + authenticationRef: + name: keda-trigger-auth-elasticsearch-secret +` + +const elasticsearchStatefulsetYaml = ` +kind: Service +apiVersion: v1 +metadata: + name: elasticsearch-svc +spec: + type: ClusterIP + ports: + - name: http + port: 9200 + targetPort: 9200 + protocol: TCP + selector: + name: elasticsearch +--- +apiVersion: apps/v1 +kind: StatefulSet +metadata: + name: elasticsearch +spec: + replicas: 1 + selector: + matchLabels: + name: elasticsearch + template: + metadata: + labels: + name: elasticsearch + spec: + containers: + - name: elasticsearch + image: docker.elastic.co/elasticsearch/elasticsearch:7.15.1 + imagePullPolicy: IfNotPresent + env: + - name: POD_IP + valueFrom: + fieldRef: + apiVersion: v1 + fieldPath: status.podIP + - name: POD_NAME + valueFrom: + fieldRef: + apiVersion: v1 + fieldPath: metadata.name + - name: NODE_NAME + valueFrom: + fieldRef: + apiVersion: v1 + fieldPath: spec.nodeName + - name: NAMESPACE + valueFrom: + fieldRef: + apiVersion: v1 + fieldPath: metadata.namespace + - name: ES_JAVA_OPTS + value: -Xms256m -Xmx256m + - name: cluster.name + value: elasticsearch-keda + - name: cluster.initial_master_nodes + valueFrom: + fieldRef: + apiVersion: v1 + fieldPath: status.podIP + - name: node.data + value: "true" + - name: node.ml + value: "false" + - name: node.ingest + value: "false" + - name: node.master + value: "true" + - name: node.remote_cluster_client + value: "false" + - name: node.transform + value: "false" + - name: ELASTIC_PASSWORD + value: "{{ELASTIC_PASSWORD}}" + - name: xpack.security.enabled + value: "true" + - name: node.store.allow_mmap + value: "false" + ports: + - containerPort: 9200 + name: http + protocol: TCP + - containerPort: 9300 + name: transport + protocol: TCP + resources: + requests: + cpu: 100m + memory: 1Gi + limits: + memory: 1Gi + readinessProbe: + exec: + command: + - /usr/bin/curl + - -sS + - -u "elastic:{{ELASTIC_PASSWORD}}" + - http://localhost:9200 + failureThreshold: 3 + initialDelaySeconds: 10 + periodSeconds: 5 + successThreshold: 1 + timeoutSeconds: 5 + + + serviceName: elasticsearch-svc +` + +const elastisearchCreateIndex = ` +{ + "mappings": { + "properties": { + "@timestamp": { + "type": "date" + }, + "dummy": { + "type": "integer" + }, + "dumb": { + "type": "keyword" + } + } + }, + "settings": { + "number_of_replicas": 0, + "number_of_shards": 1 + } +}` + +const elasticsearchDummyDoc = ` +{ + "@timestamp": "{{TIMESTAMP}}", + "dummy": 1, + "dumb": "oOooo" +}` + +const elasticsearchSearchTemplate = ` +{ + "script": { + "lang": "mustache", + "source": { + "query": { + "bool": { + "filter": [ + { + "range": { + "@timestamp": { + "gte": "now-1m/m", + "lte": "now/m" + } + } + }, + { + "term": { + "dummy": "{{dummy_value}}" + } + }, + { + "term": { + "dumb": "{{dumb_value}}" + } + } + ] + } + } + } + } +}`