-
Notifications
You must be signed in to change notification settings - Fork 1.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add an elasticsearch scaler based on search template #2304
Changes from 1 commit
8a1b7b4
44ba555
cc4c9f1
e8c6415
9d703c2
7b408fd
6afb6ca
bef268d
4664f21
0d5042f
6f35589
4d835d3
3108152
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -37,6 +37,7 @@ type elasticsearchMetadata struct { | |
parameters []string | ||
valueLocation string | ||
targetValue int | ||
metricName string | ||
} | ||
|
||
var elasticsearchLog = logf.Log.WithName("elasticsearch_scaler") | ||
|
@@ -73,7 +74,7 @@ func parseElasticsearchMetadata(config *ScalerConfig) (*elasticsearchMetadata, e | |
if val, ok := config.TriggerMetadata["unsafeSsl"]; ok { | ||
meta.unsafeSsl, err = strconv.ParseBool(val) | ||
if err != nil { | ||
return nil, fmt.Errorf("error parsing unsafeSsL: %s", err) | ||
return nil, fmt.Errorf("error parsing unsafeSsl: %s", err) | ||
} | ||
} else { | ||
meta.unsafeSsl = defaultUnsafeSsl | ||
|
@@ -120,6 +121,7 @@ func parseElasticsearchMetadata(config *ScalerConfig) (*elasticsearchMetadata, e | |
return nil, fmt.Errorf("targetValue parsing error %s", err.Error()) | ||
} | ||
|
||
meta.metricName = GenerateMetricNameWithIndex(config.ScalerIndex, kedautil.NormalizeString(fmt.Sprintf("elasticsearch-%s", meta.searchTemplateName))) | ||
return &meta, nil | ||
} | ||
|
||
|
@@ -133,12 +135,9 @@ func newElasticsearchClient(meta *elasticsearchMetadata) (*elasticsearch.Client, | |
config.Password = meta.password | ||
} | ||
|
||
if meta.unsafeSsl { | ||
tr := &http.Transport{ | ||
TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, | ||
} | ||
config.Transport = tr | ||
} | ||
transport := http.DefaultTransport.(*http.Transport) | ||
transport.TLSClientConfig = &tls.Config{InsecureSkipVerify: meta.unsafeSsl} | ||
config.Transport = transport | ||
|
||
esClient, err := elasticsearch.NewClient(config) | ||
if err != nil { | ||
|
@@ -160,7 +159,7 @@ func (s *elasticsearchScaler) Close(ctx context.Context) error { | |
|
||
// IsActive returns true if there are pending messages to be processed | ||
func (s *elasticsearchScaler) IsActive(ctx context.Context) (bool, error) { | ||
messages, err := s.getQueryResult() | ||
messages, err := s.getQueryResult(ctx) | ||
if err != nil { | ||
elasticsearchLog.Error(err, fmt.Sprintf("Error inspecting elasticsearch: %s", err)) | ||
return false, err | ||
|
@@ -169,7 +168,7 @@ func (s *elasticsearchScaler) IsActive(ctx context.Context) (bool, error) { | |
} | ||
|
||
// getQueryResult returns result of the scaler query | ||
func (s *elasticsearchScaler) getQueryResult() (int, error) { | ||
func (s *elasticsearchScaler) getQueryResult(ctx context.Context) (int, error) { | ||
// Build the request body. | ||
var body bytes.Buffer | ||
if err := json.NewEncoder(&body).Encode(buildQuery(s.metadata)); err != nil { | ||
|
@@ -180,6 +179,7 @@ func (s *elasticsearchScaler) getQueryResult() (int, error) { | |
res, err := s.esClient.SearchTemplate( | ||
&body, | ||
s.esClient.SearchTemplate.WithIndex(s.metadata.indexes...), | ||
s.esClient.SearchTemplate.WithContext(ctx), | ||
) | ||
if err != nil { | ||
elasticsearchLog.Error(err, fmt.Sprintf("Could not query elasticsearch: %s", err)) | ||
|
@@ -234,11 +234,10 @@ func getValueFromSearch(body []byte, valueLocation string) (int, error) { | |
// GetMetricSpecForScaling returns the MetricSpec for the Horizontal Pod Autoscaler | ||
func (s *elasticsearchScaler) GetMetricSpecForScaling(context.Context) []v2beta2.MetricSpec { | ||
targetValue := resource.NewQuantity(int64(s.metadata.targetValue), resource.DecimalSI) | ||
metricName := kedautil.NormalizeString(fmt.Sprintf("elasticsearch-%s", s.metadata.searchTemplateName)) | ||
|
||
externalMetric := &v2beta2.ExternalMetricSource{ | ||
Metric: v2beta2.MetricIdentifier{ | ||
Name: GenerateMetricNameWithIndex(s.metadata.targetValue, metricName), | ||
Name: GenerateMetricNameWithIndex(s.metadata.targetValue, s.metadata.metricName), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You should call only once to |
||
}, | ||
Target: v2beta2.MetricTarget{ | ||
Type: v2beta2.AverageValueMetricType, | ||
|
@@ -253,7 +252,7 @@ func (s *elasticsearchScaler) GetMetricSpecForScaling(context.Context) []v2beta2 | |
|
||
// GetMetrics returns value for a supported metric and an error if there is a problem getting the metric | ||
func (s *elasticsearchScaler) GetMetrics(ctx context.Context, metricName string, metricSelector labels.Selector) ([]external_metrics.ExternalMetricValue, error) { | ||
num, err := s.getQueryResult() | ||
num, err := s.getQueryResult(ctx) | ||
if err != nil { | ||
return []external_metrics.ExternalMetricValue{}, fmt.Errorf("error inspecting elasticsearch: %s", err) | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You should pass the context until here because you could add
.WithContext
. With this change, the request context can be managed from outside the scaler.