Skip to content

Commit

Permalink
Add Elasticsearch Scaler based on search template (#2311)
Browse files Browse the repository at this point in the history
Signed-off-by: Nicolas L <nicolas.lassalle@zenika.com>
  • Loading branch information
orphaner authored Nov 23, 2021
1 parent 2664389 commit 81e7d3b
Show file tree
Hide file tree
Showing 9 changed files with 1,046 additions and 2 deletions.
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ repos:
entry: "(?i)(black|white)[_-]?(list|List)"
pass_filenames: true
- id: sort-scalers
name: Check if scalers are sorted in scaler_handler.go
name: Check if scalers are sorted in scale_handler.go
language: system
entry: "bash tools/sort_scalers.sh"
files: .*scale_handler\.go$
Expand Down
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
- Improve Redis Scaler, upgrade library, add username and Sentinel support ([#2181](https://github.com/kedacore/keda/pull/2181))
- Add GCP identity authentication when using Pubsub Scaler ([#2225](https://github.com/kedacore/keda/pull/2225))
- Add ScalersCache to reuse scalers unless they need changing ([#2187](https://github.com/kedacore/keda/pull/2187))
- Add Elasticsearch Scaler based on search template ([#2311](https://github.com/kedacore/keda/pull/2311))
- Cache metric names provided by KEDA Metrics Server ([#2279](https://github.com/kedacore/keda/pull/2279))

### Improvements
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ require (
github.com/Shopify/sarama v1.30.0
github.com/aws/aws-sdk-go v1.42.3
github.com/denisenkom/go-mssqldb v0.11.0
github.com/elastic/go-elasticsearch/v7 v7.15.1
github.com/go-logr/logr v0.4.0
github.com/go-playground/assert/v2 v2.0.1
github.com/go-redis/redis/v8 v8.11.4
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,8 @@ github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21 h1:YEetp8
github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1:+020luEh2TKB4/GOp8oxxtq0Daoen/Cii55CzbTV6DU=
github.com/eapache/queue v1.1.0 h1:YOEu7KNc61ntiQlcEeUIoDTJ2o8mQznoNvUhiigpIqc=
github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I=
github.com/elastic/go-elasticsearch/v7 v7.15.1 h1:Wd8RLHb5D8xPBU8vGlnLXyflkso9G+rCmsXjqH8LLQQ=
github.com/elastic/go-elasticsearch/v7 v7.15.1/go.mod h1:OJ4wdbtDNk5g503kvlHLyErCgQwwzmDtaFC4XyOxXA4=
github.com/elazarl/goproxy v0.0.0-20180725130230-947c36da3153/go.mod h1:/Zj4wYkgs4iZTTu3o/KG3Itv/qCCa8VVMlb3i9OVuzc=
github.com/emicklei/go-restful v0.0.0-20170410110728-ff4f55a20633/go.mod h1:otzb+WCGbkyDHkqmQmT5YD2WR4BBwUdeQoFo8l/7tVs=
github.com/emicklei/go-restful v2.9.5+incompatible/go.mod h1:otzb+WCGbkyDHkqmQmT5YD2WR4BBwUdeQoFo8l/7tVs=
Expand Down
277 changes: 277 additions & 0 deletions pkg/scalers/elasticsearch_scaler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,277 @@
package scalers

import (
"bytes"
"context"
"crypto/tls"
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"strconv"
"strings"

"github.com/elastic/go-elasticsearch/v7"
"github.com/tidwall/gjson"
"k8s.io/api/autoscaling/v2beta2"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/metrics/pkg/apis/external_metrics"
logf "sigs.k8s.io/controller-runtime/pkg/log"

kedautil "github.com/kedacore/keda/v2/pkg/util"
)

type elasticsearchScaler struct {
metadata *elasticsearchMetadata
esClient *elasticsearch.Client
}

type elasticsearchMetadata struct {
addresses []string
unsafeSsl bool
username string
password string
indexes []string
searchTemplateName string
parameters []string
valueLocation string
targetValue int
metricName string
}

var elasticsearchLog = logf.Log.WithName("elasticsearch_scaler")

// NewElasticsearchScaler creates a new elasticsearch scaler
func NewElasticsearchScaler(config *ScalerConfig) (Scaler, error) {
meta, err := parseElasticsearchMetadata(config)
if err != nil {
return nil, fmt.Errorf("error parsing elasticsearch metadata: %s", err)
}

esClient, err := newElasticsearchClient(meta)
if err != nil {
return nil, fmt.Errorf("error getting elasticsearch client: %s", err)
}
return &elasticsearchScaler{
metadata: meta,
esClient: esClient,
}, nil
}

const defaultUnsafeSsl = false

func parseElasticsearchMetadata(config *ScalerConfig) (*elasticsearchMetadata, error) {
meta := elasticsearchMetadata{}

var err error
addresses, err := GetFromAuthOrMeta(config, "addresses")
if err != nil {
return nil, err
}
meta.addresses = splitAndTrimBySep(addresses, ",")

if val, ok := config.TriggerMetadata["unsafeSsl"]; ok {
meta.unsafeSsl, err = strconv.ParseBool(val)
if err != nil {
return nil, fmt.Errorf("error parsing unsafeSsl: %s", err)
}
} else {
meta.unsafeSsl = defaultUnsafeSsl
}

if val, ok := config.AuthParams["username"]; ok {
meta.username = val
} else if val, ok := config.TriggerMetadata["username"]; ok {
meta.username = val
}

if config.AuthParams["password"] != "" {
meta.password = config.AuthParams["password"]
} else if config.TriggerMetadata["passwordFromEnv"] != "" {
meta.password = config.ResolvedEnv[config.TriggerMetadata["passwordFromEnv"]]
}

index, err := GetFromAuthOrMeta(config, "index")
if err != nil {
return nil, err
}
meta.indexes = splitAndTrimBySep(index, ";")

meta.searchTemplateName, err = GetFromAuthOrMeta(config, "searchTemplateName")
if err != nil {
return nil, err
}

if val, ok := config.TriggerMetadata["parameters"]; ok {
meta.parameters = splitAndTrimBySep(val, ";")
}

meta.valueLocation, err = GetFromAuthOrMeta(config, "valueLocation")
if err != nil {
return nil, err
}

targetValue, err := GetFromAuthOrMeta(config, "targetValue")
if err != nil {
return nil, err
}
meta.targetValue, err = strconv.Atoi(targetValue)
if err != nil {
return nil, fmt.Errorf("targetValue parsing error %s", err.Error())
}

meta.metricName = GenerateMetricNameWithIndex(config.ScalerIndex, kedautil.NormalizeString(fmt.Sprintf("elasticsearch-%s", meta.searchTemplateName)))
return &meta, nil
}

// newElasticsearchClient creates elasticsearch db connection
func newElasticsearchClient(meta *elasticsearchMetadata) (*elasticsearch.Client, error) {
config := elasticsearch.Config{Addresses: meta.addresses}
if meta.username != "" {
config.Username = meta.username
}
if meta.password != "" {
config.Password = meta.password
}

transport := http.DefaultTransport.(*http.Transport)
transport.TLSClientConfig = &tls.Config{InsecureSkipVerify: meta.unsafeSsl}
config.Transport = transport

esClient, err := elasticsearch.NewClient(config)
if err != nil {
elasticsearchLog.Error(err, fmt.Sprintf("Found error when creating client: %s", err))
return nil, err
}

_, err = esClient.Info()
if err != nil {
elasticsearchLog.Error(err, fmt.Sprintf("Found error when pinging search engine: %s", err))
return nil, err
}
return esClient, nil
}

func (s *elasticsearchScaler) Close(ctx context.Context) error {
return nil
}

// IsActive returns true if there are pending messages to be processed
func (s *elasticsearchScaler) IsActive(ctx context.Context) (bool, error) {
messages, err := s.getQueryResult(ctx)
if err != nil {
elasticsearchLog.Error(err, fmt.Sprintf("Error inspecting elasticsearch: %s", err))
return false, err
}
return messages > 0, nil
}

// getQueryResult returns result of the scaler query
func (s *elasticsearchScaler) getQueryResult(ctx context.Context) (int, error) {
// Build the request body.
var body bytes.Buffer
if err := json.NewEncoder(&body).Encode(buildQuery(s.metadata)); err != nil {
elasticsearchLog.Error(err, "Error encoding query: %s", err)
}

// Run the templated search
res, err := s.esClient.SearchTemplate(
&body,
s.esClient.SearchTemplate.WithIndex(s.metadata.indexes...),
s.esClient.SearchTemplate.WithContext(ctx),
)
if err != nil {
elasticsearchLog.Error(err, fmt.Sprintf("Could not query elasticsearch: %s", err))
return 0, err
}

defer res.Body.Close()
b, err := ioutil.ReadAll(res.Body)
if err != nil {
return 0, err
}
v, err := getValueFromSearch(b, s.metadata.valueLocation)
if err != nil {
return 0, err
}
return v, nil
}

func buildQuery(metadata *elasticsearchMetadata) map[string]interface{} {
parameters := map[string]interface{}{}
for _, p := range metadata.parameters {
if p != "" {
kv := splitAndTrimBySep(p, ":")
parameters[kv[0]] = kv[1]
}
}
query := map[string]interface{}{
"id": metadata.searchTemplateName,
}
if len(parameters) > 0 {
query["params"] = parameters
}
return query
}

func getValueFromSearch(body []byte, valueLocation string) (int, error) {
r := gjson.GetBytes(body, valueLocation)
errorMsg := "valueLocation must point to value of type number but got: '%s'"
if r.Type == gjson.String {
q, err := strconv.Atoi(r.String())
if err != nil {
return 0, fmt.Errorf(errorMsg, r.String())
}
return q, nil
}
if r.Type != gjson.Number {
return 0, fmt.Errorf(errorMsg, r.Type.String())
}
return int(r.Num), nil
}

// GetMetricSpecForScaling returns the MetricSpec for the Horizontal Pod Autoscaler
func (s *elasticsearchScaler) GetMetricSpecForScaling(context.Context) []v2beta2.MetricSpec {
targetValue := resource.NewQuantity(int64(s.metadata.targetValue), resource.DecimalSI)

externalMetric := &v2beta2.ExternalMetricSource{
Metric: v2beta2.MetricIdentifier{
Name: s.metadata.metricName,
},
Target: v2beta2.MetricTarget{
Type: v2beta2.AverageValueMetricType,
AverageValue: targetValue,
},
}
metricSpec := v2beta2.MetricSpec{
External: externalMetric, Type: externalMetricType,
}
return []v2beta2.MetricSpec{metricSpec}
}

// GetMetrics returns value for a supported metric and an error if there is a problem getting the metric
func (s *elasticsearchScaler) GetMetrics(ctx context.Context, metricName string, metricSelector labels.Selector) ([]external_metrics.ExternalMetricValue, error) {
num, err := s.getQueryResult(ctx)
if err != nil {
return []external_metrics.ExternalMetricValue{}, fmt.Errorf("error inspecting elasticsearch: %s", err)
}

metric := external_metrics.ExternalMetricValue{
MetricName: metricName,
Value: *resource.NewQuantity(int64(num), resource.DecimalSI),
Timestamp: metav1.Now(),
}

return append([]external_metrics.ExternalMetricValue{}, metric), nil
}

// Splits a string separated by a specified separator and trims space from all the elements.
func splitAndTrimBySep(s string, sep string) []string {
x := strings.Split(s, sep)
for i := range x {
x[i] = strings.Trim(x[i], " ")
}
return x
}
Loading

0 comments on commit 81e7d3b

Please sign in to comment.