Skip to content

Commit

Permalink
Refactor Loki Scaler (#6264)
Browse files Browse the repository at this point in the history
Signed-off-by: rickbrouwer <rickbrouwer@gmail.com>
  • Loading branch information
rickbrouwer authored Oct 23, 2024
1 parent d849c75 commit 2cf3c4c
Show file tree
Hide file tree
Showing 3 changed files with 85 additions and 169 deletions.
213 changes: 72 additions & 141 deletions pkg/scalers/loki_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,252 +19,184 @@ import (
)

const (
lokiServerAddress = "serverAddress"
lokiQuery = "query"
lokiThreshold = "threshold"
lokiActivationThreshold = "activationThreshold"
lokiNamespace = "namespace"
tenantName = "tenantName"
defaultIgnoreNullValues = true
tenantNameHeaderKey = "X-Scope-OrgID"
lokiIgnoreNullValues = "ignoreNullValues"
)

var (
lokiDefaultIgnoreNullValues = true
)

type lokiScaler struct {
metricType v2.MetricTargetType
metadata *lokiMetadata
metadata lokiMetadata
httpClient *http.Client
logger logr.Logger
}

type lokiMetadata struct {
serverAddress string
query string
threshold float64
activationThreshold float64
lokiAuth *authentication.AuthMeta
triggerIndex int
tenantName string
ignoreNullValues bool
unsafeSsl bool
ServerAddress string `keda:"name=serverAddress,order=triggerMetadata"`
Query string `keda:"name=query,order=triggerMetadata"`
Threshold float64 `keda:"name=threshold,order=triggerMetadata"`
ActivationThreshold float64 `keda:"name=activationThreshold,order=triggerMetadata,default=0"`
TenantName string `keda:"name=tenantName,order=triggerMetadata,optional"`
IgnoreNullValues bool `keda:"name=ignoreNullValues,order=triggerMetadata,default=true"`
UnsafeSsl bool `keda:"name=unsafeSsl,order=triggerMetadata,default=false"`
TriggerIndex int
Auth *authentication.AuthMeta
}

type lokiQueryResult struct {
Status string `json:"status"`
Data struct {
ResultType string `json:"resultType"`
Result []struct {
Metric struct {
} `json:"metric"`
Value []interface{} `json:"value"`
Metric struct{} `json:"metric"`
Value []interface{} `json:"value"`
} `json:"result"`
} `json:"data"`
}

// NewLokiScaler returns a new lokiScaler
func NewLokiScaler(config *scalersconfig.ScalerConfig) (Scaler, error) {
metricType, err := GetMetricTargetType(config)
if err != nil {
return nil, fmt.Errorf("error getting scaler metric type: %w", err)
}

logger := InitializeLogger(config, "loki_scaler")

meta, err := parseLokiMetadata(config)
if err != nil {
return nil, fmt.Errorf("error parsing loki metadata: %w", err)
}

httpClient := kedautil.CreateHTTPClient(config.GlobalHTTPTimeout, meta.unsafeSsl)
httpClient := kedautil.CreateHTTPClient(config.GlobalHTTPTimeout, meta.UnsafeSsl)

return &lokiScaler{
metricType: metricType,
metadata: meta,
httpClient: httpClient,
logger: logger,
logger: InitializeLogger(config, "loki_scaler"),
}, nil
}

func parseLokiMetadata(config *scalersconfig.ScalerConfig) (meta *lokiMetadata, err error) {
meta = &lokiMetadata{}

if val, ok := config.TriggerMetadata[lokiServerAddress]; ok && val != "" {
meta.serverAddress = val
} else {
return nil, fmt.Errorf("no %s given", lokiServerAddress)
}

if val, ok := config.TriggerMetadata[lokiQuery]; ok && val != "" {
meta.query = val
} else {
return nil, fmt.Errorf("no %s given", lokiQuery)
}

if val, ok := config.TriggerMetadata[lokiThreshold]; ok && val != "" {
t, err := strconv.ParseFloat(val, 64)
if err != nil {
return nil, fmt.Errorf("error parsing %s: %w", lokiThreshold, err)
}

meta.threshold = t
} else {
if config.AsMetricSource {
meta.threshold = 0
} else {
return nil, fmt.Errorf("no %s given", lokiThreshold)
}
}

meta.activationThreshold = 0
if val, ok := config.TriggerMetadata[lokiActivationThreshold]; ok {
t, err := strconv.ParseFloat(val, 64)
if err != nil {
return nil, fmt.Errorf("activationThreshold parsing error %w", err)
}

meta.activationThreshold = t
}

if val, ok := config.TriggerMetadata[tenantName]; ok && val != "" {
meta.tenantName = val
func parseLokiMetadata(config *scalersconfig.ScalerConfig) (lokiMetadata, error) {
meta := lokiMetadata{}
err := config.TypedConfig(&meta)
if err != nil {
return meta, fmt.Errorf("error parsing loki metadata: %w", err)
}

meta.ignoreNullValues = lokiDefaultIgnoreNullValues
if val, ok := config.TriggerMetadata[lokiIgnoreNullValues]; ok && val != "" {
ignoreNullValues, err := strconv.ParseBool(val)
if err != nil {
return nil, fmt.Errorf("err incorrect value for ignoreNullValues given: %s please use true or false", val)
}
meta.ignoreNullValues = ignoreNullValues
if config.AsMetricSource {
meta.Threshold = 0
}

meta.unsafeSsl = false
if val, ok := config.TriggerMetadata[unsafeSsl]; ok && val != "" {
unsafeSslValue, err := strconv.ParseBool(val)
if err != nil {
return nil, fmt.Errorf("error parsing %s: %w", unsafeSsl, err)
}

meta.unsafeSsl = unsafeSslValue
}

meta.triggerIndex = config.TriggerIndex

// parse auth configs from ScalerConfig
auth, err := authentication.GetAuthConfigs(config.TriggerMetadata, config.AuthParams)
if err != nil {
return nil, err
return meta, err
}
meta.lokiAuth = auth
meta.Auth = auth
meta.TriggerIndex = config.TriggerIndex

return meta, nil
}

// Close returns a nil error
func (s *lokiScaler) Close(context.Context) error {
if s.httpClient != nil {
s.httpClient.CloseIdleConnections()
}
return nil
}

// GetMetricSpecForScaling returns the MetricSpec for the Horizontal Pod Autoscaler
func (s *lokiScaler) GetMetricSpecForScaling(context.Context) []v2.MetricSpec {
externalMetric := &v2.ExternalMetricSource{
Metric: v2.MetricIdentifier{
Name: GenerateMetricNameWithIndex(s.metadata.triggerIndex, "loki"),
Name: GenerateMetricNameWithIndex(s.metadata.TriggerIndex, "loki"),
},
Target: GetMetricTargetMili(s.metricType, s.metadata.threshold),
}
metricSpec := v2.MetricSpec{
External: externalMetric, Type: externalMetricType,
Target: GetMetricTargetMili(s.metricType, s.metadata.Threshold),
}
metricSpec := v2.MetricSpec{External: externalMetric, Type: externalMetricType}
return []v2.MetricSpec{metricSpec}
}

// ExecuteLokiQuery returns the result of the LogQL query execution
func (s *lokiScaler) ExecuteLokiQuery(ctx context.Context) (float64, error) {
u, err := url.ParseRequestURI(s.metadata.serverAddress)
u, err := url.ParseRequestURI(s.metadata.ServerAddress)
if err != nil {
return -1, err
}
u.Path = "/loki/api/v1/query"

u.RawQuery = url.Values{
"query": []string{s.metadata.query},
}.Encode()
u.RawQuery = url.Values{"query": []string{s.metadata.Query}}.Encode()

req, err := http.NewRequestWithContext(ctx, "GET", u.String(), nil)
if err != nil {
return -1, err
}

if s.metadata.lokiAuth != nil && s.metadata.lokiAuth.EnableBearerAuth {
req.Header.Add("Authorization", authentication.GetBearerToken(s.metadata.lokiAuth))
} else if s.metadata.lokiAuth != nil && s.metadata.lokiAuth.EnableBasicAuth {
req.SetBasicAuth(s.metadata.lokiAuth.Username, s.metadata.lokiAuth.Password)
if s.metadata.Auth != nil {
if s.metadata.Auth.EnableBearerAuth {
req.Header.Add("Authorization", authentication.GetBearerToken(s.metadata.Auth))
} else if s.metadata.Auth.EnableBasicAuth {
req.SetBasicAuth(s.metadata.Auth.Username, s.metadata.Auth.Password)
}
}

if s.metadata.tenantName != "" {
req.Header.Add(tenantNameHeaderKey, s.metadata.tenantName)
if s.metadata.TenantName != "" {
req.Header.Add(tenantNameHeaderKey, s.metadata.TenantName)
}

r, err := s.httpClient.Do(req)
resp, err := s.httpClient.Do(req)
if err != nil {
return -1, err
}
defer r.Body.Close()
defer resp.Body.Close()

b, err := io.ReadAll(r.Body)
body, err := io.ReadAll(resp.Body)
if err != nil {
return -1, err
}

if !(r.StatusCode >= 200 && r.StatusCode <= 299) {
err := fmt.Errorf("loki query api returned error. status: %d response: %s", r.StatusCode, string(b))
s.logger.Error(err, "loki query api returned error")
return -1, err
if resp.StatusCode < 200 || resp.StatusCode > 299 {
return -1, fmt.Errorf("loki query api returned error. status: %d response: %s", resp.StatusCode, string(body))
}

var result lokiQueryResult
err = json.Unmarshal(b, &result)
if err != nil {
if err := json.Unmarshal(body, &result); err != nil {
return -1, err
}

var v float64 = -1
return s.parseQueryResult(result)
}

// allow for zero element or single element result sets
func (s *lokiScaler) parseQueryResult(result lokiQueryResult) (float64, error) {
if len(result.Data.Result) == 0 {
if s.metadata.ignoreNullValues {
if s.metadata.IgnoreNullValues {
return 0, nil
}
return -1, fmt.Errorf("loki metrics may be lost, the result is empty")
} else if len(result.Data.Result) > 1 {
return -1, fmt.Errorf("loki query %s returned multiple elements", s.metadata.query)
}

valueLen := len(result.Data.Result[0].Value)
if valueLen == 0 {
if s.metadata.ignoreNullValues {
if len(result.Data.Result) > 1 {
return -1, fmt.Errorf("loki query %s returned multiple elements", s.metadata.Query)
}

values := result.Data.Result[0].Value
if len(values) == 0 {
if s.metadata.IgnoreNullValues {
return 0, nil
}
return -1, fmt.Errorf("loki metrics may be lost, the value list is empty")
} else if valueLen < 2 {
return -1, fmt.Errorf("loki query %s didn't return enough values", s.metadata.query)
}

val := result.Data.Result[0].Value[1]
if val != nil {
str := val.(string)
v, err = strconv.ParseFloat(str, 64)
if err != nil {
s.logger.Error(err, "Error converting loki value", "loki_value", str)
return -1, err
}
if len(values) < 2 {
return -1, fmt.Errorf("loki query %s didn't return enough values", s.metadata.Query)
}

if values[1] == nil {
return 0, nil
}

str, ok := values[1].(string)
if !ok {
return -1, fmt.Errorf("failed to parse loki value as string")
}

v, err := strconv.ParseFloat(str, 64)
if err != nil {
return -1, fmt.Errorf("error converting loki value %s: %w", str, err)
}

return v, nil
Expand All @@ -279,6 +211,5 @@ func (s *lokiScaler) GetMetricsAndActivity(ctx context.Context, metricName strin
}

metric := GenerateMetricInMili(metricName, val)

return []external_metrics.ExternalMetricValue{metric}, val > s.metadata.activationThreshold, nil
return []external_metrics.ExternalMetricValue{metric}, val > s.metadata.ActivationThreshold, nil
}
Loading

0 comments on commit 2cf3c4c

Please sign in to comment.