Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add an elasticsearch scaler based on search template #2304

Closed
23 changes: 11 additions & 12 deletions pkg/scalers/elasticsearch_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ type elasticsearchMetadata struct {
parameters []string
valueLocation string
targetValue int
metricName string
}

var elasticsearchLog = logf.Log.WithName("elasticsearch_scaler")
Expand Down Expand Up @@ -73,7 +74,7 @@ func parseElasticsearchMetadata(config *ScalerConfig) (*elasticsearchMetadata, e
if val, ok := config.TriggerMetadata["unsafeSsl"]; ok {
meta.unsafeSsl, err = strconv.ParseBool(val)
if err != nil {
return nil, fmt.Errorf("error parsing unsafeSsL: %s", err)
return nil, fmt.Errorf("error parsing unsafeSsl: %s", err)
}
} else {
meta.unsafeSsl = defaultUnsafeSsl
Expand Down Expand Up @@ -120,6 +121,7 @@ func parseElasticsearchMetadata(config *ScalerConfig) (*elasticsearchMetadata, e
return nil, fmt.Errorf("targetValue parsing error %s", err.Error())
}

meta.metricName = GenerateMetricNameWithIndex(config.ScalerIndex, kedautil.NormalizeString(fmt.Sprintf("elasticsearch-%s", meta.searchTemplateName)))
return &meta, nil
}

Expand All @@ -133,12 +135,9 @@ func newElasticsearchClient(meta *elasticsearchMetadata) (*elasticsearch.Client,
config.Password = meta.password
}

if meta.unsafeSsl {
tr := &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
}
config.Transport = tr
}
transport := http.DefaultTransport.(*http.Transport)
transport.TLSClientConfig = &tls.Config{InsecureSkipVerify: meta.unsafeSsl}
config.Transport = transport

esClient, err := elasticsearch.NewClient(config)
if err != nil {
Expand All @@ -160,7 +159,7 @@ func (s *elasticsearchScaler) Close(ctx context.Context) error {

// IsActive returns true if there are pending messages to be processed
func (s *elasticsearchScaler) IsActive(ctx context.Context) (bool, error) {
messages, err := s.getQueryResult()
messages, err := s.getQueryResult(ctx)
if err != nil {
elasticsearchLog.Error(err, fmt.Sprintf("Error inspecting elasticsearch: %s", err))
return false, err
Expand All @@ -169,7 +168,7 @@ func (s *elasticsearchScaler) IsActive(ctx context.Context) (bool, error) {
}

// getQueryResult returns result of the scaler query
func (s *elasticsearchScaler) getQueryResult() (int, error) {
func (s *elasticsearchScaler) getQueryResult(ctx context.Context) (int, error) {
// Build the request body.
var body bytes.Buffer
if err := json.NewEncoder(&body).Encode(buildQuery(s.metadata)); err != nil {
Expand All @@ -180,6 +179,7 @@ func (s *elasticsearchScaler) getQueryResult() (int, error) {
res, err := s.esClient.SearchTemplate(
&body,
s.esClient.SearchTemplate.WithIndex(s.metadata.indexes...),
Copy link
Member

@JorTurFer JorTurFer Nov 21, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should pass the context until here because you could add .WithContext. With this change, the request context can be managed from outside the scaler.

s.esClient.SearchTemplate.WithContext(ctx),
)
if err != nil {
elasticsearchLog.Error(err, fmt.Sprintf("Could not query elasticsearch: %s", err))
Expand Down Expand Up @@ -234,11 +234,10 @@ func getValueFromSearch(body []byte, valueLocation string) (int, error) {
// GetMetricSpecForScaling returns the MetricSpec for the Horizontal Pod Autoscaler
func (s *elasticsearchScaler) GetMetricSpecForScaling(context.Context) []v2beta2.MetricSpec {
targetValue := resource.NewQuantity(int64(s.metadata.targetValue), resource.DecimalSI)
metricName := kedautil.NormalizeString(fmt.Sprintf("elasticsearch-%s", s.metadata.searchTemplateName))

externalMetric := &v2beta2.ExternalMetricSource{
Metric: v2beta2.MetricIdentifier{
Name: GenerateMetricNameWithIndex(s.metadata.targetValue, metricName),
Name: GenerateMetricNameWithIndex(s.metadata.targetValue, s.metadata.metricName),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should call only once to GenerateMetricNameWithIndex. This index is not related with any inside the scaler, it's related with the trigger index inside the ScaledObject to avoid conflicts with duplicated names in the same ScaledObject.
You could call to the function here or in the constructor like you are doing but don't do it both, because the final metric name with current code will be like s{s.metadata.targetValue}-s{config.ScalerIndex}-elasticsearch-%s. It should be like s{config.ScalerIndex}-elasticsearch-%s

},
Target: v2beta2.MetricTarget{
Type: v2beta2.AverageValueMetricType,
Expand All @@ -253,7 +252,7 @@ func (s *elasticsearchScaler) GetMetricSpecForScaling(context.Context) []v2beta2

// GetMetrics returns value for a supported metric and an error if there is a problem getting the metric
func (s *elasticsearchScaler) GetMetrics(ctx context.Context, metricName string, metricSelector labels.Selector) ([]external_metrics.ExternalMetricValue, error) {
num, err := s.getQueryResult()
num, err := s.getQueryResult(ctx)
if err != nil {
return []external_metrics.ExternalMetricValue{}, fmt.Errorf("error inspecting elasticsearch: %s", err)
}
Expand Down
7 changes: 7 additions & 0 deletions pkg/scalers/elasticsearch_scaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ var testCases = []parseElasticsearchMetadataTestData{
parameters: []string{"param1:value1"},
valueLocation: "hits.hits[0]._source.value",
targetValue: 12,
metricName: "s0-elasticsearch-myAwesomeSearch",
},
expectedError: nil,
},
Expand Down Expand Up @@ -120,6 +121,7 @@ var testCases = []parseElasticsearchMetadataTestData{
parameters: []string{"param1:value1"},
valueLocation: "hits.hits[0]._source.value",
targetValue: 12,
metricName: "s0-elasticsearch-myAwesomeSearch",
},
expectedError: nil,
},
Expand Down Expand Up @@ -148,6 +150,7 @@ var testCases = []parseElasticsearchMetadataTestData{
parameters: []string{"param1:value1"},
valueLocation: "hits.hits[0]._source.value",
targetValue: 12,
metricName: "s0-elasticsearch-myAwesomeSearch",
},
expectedError: nil,
},
Expand Down Expand Up @@ -176,6 +179,7 @@ var testCases = []parseElasticsearchMetadataTestData{
parameters: []string{"param1:value1"},
valueLocation: "hits.hits[0]._source.value",
targetValue: 12,
metricName: "s0-elasticsearch-myAwesomeSearch",
},
expectedError: nil,
},
Expand Down Expand Up @@ -204,6 +208,7 @@ var testCases = []parseElasticsearchMetadataTestData{
parameters: []string{"param1:value1"},
valueLocation: "hits.hits[0]._source.value",
targetValue: 12,
metricName: "s0-elasticsearch-myAwesomeSearch",
},
expectedError: nil,
},
Expand Down Expand Up @@ -235,6 +240,7 @@ var testCases = []parseElasticsearchMetadataTestData{
parameters: []string{"param1:value1"},
valueLocation: "hits.hits[0]._source.value",
targetValue: 12,
metricName: "s0-elasticsearch-myAwesomeSearch",
},
expectedError: nil,
},
Expand Down Expand Up @@ -284,6 +290,7 @@ func TestUnsafeSslDefaultValue(t *testing.T) {
parameters: []string{"param1:value1"},
valueLocation: "hits.hits[0]._source.value",
targetValue: 12,
metricName: "s0-elasticsearch-myAwesomeSearch",
},
expectedError: nil,
}
Expand Down
2 changes: 1 addition & 1 deletion tests/scalers/elasticsearch.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ test.before(t => {
fs.writeFileSync(elasticsearchTmpFile.name, elasticsearchStatefulsetYaml.replace('{{ELASTIC_PASSWORD}}', elasticPassword))

t.is(0, sh.exec(`kubectl apply --namespace ${elasticsearchNamespace} -f ${elasticsearchTmpFile.name}`).code, 'creating an elasticsearch statefulset should work.')
t.is(0, waitForRollout('statefulset', "elasticsearch", elasticsearchNamespace))
t.is(0, waitForRollout('statefulset', "elasticsearch", elasticsearchNamespace, 300))

// Create the index and the search template
sh.exec(`${kubectlExecCurl} -XPUT http://localhost:9200/${indexName} -d '${elastisearchCreateIndex}'`)
Expand Down