Skip to content

Commit

Permalink
Reference ScaledObject's/ScaledJob's name in the scalers log
Browse files Browse the repository at this point in the history
Signed-off-by: Zbynek Roubalik <zroubalik@gmail.com>
  • Loading branch information
zroubalik committed Aug 2, 2022
1 parent 81ae8e9 commit 6b9114b
Show file tree
Hide file tree
Showing 84 changed files with 791 additions and 665 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ To learn more about our roadmap, we recommend reading [this document](ROADMAP.md
### Improvements

- **General:** `external` extension reduces connection establishment with long links ([#3193](https://github.com/kedacore/keda/issues/3193))
- **General:** Reference ScaledObject's/ScaledJob's name in the scalers log ([3419](https://github.com/kedacore/keda/issues/3419))
- **General:** Use `mili` scale for the returned metrics ([#3135](https://github.com/kedacore/keda/issue/3135))
- **General:** Use more readable timestamps in KEDA Operator logs ([#3066](https://github.com/kedacore/keda/issue/3066))
- **AWS SQS Queue Scaler:** Support for scaling to include in-flight messages. ([#3133](https://github.com/kedacore/keda/issues/3133))
Expand Down
30 changes: 15 additions & 15 deletions controllers/keda/scaledobject_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,11 +91,11 @@ var _ = Describe("ScaledObjectController", func() {

for i, tm := range triggerMeta {
config := &scalers.ScalerConfig{
Name: fmt.Sprintf("test.%d", i),
Namespace: "test",
TriggerMetadata: tm,
ResolvedEnv: nil,
AuthParams: nil,
ScalableObjectName: fmt.Sprintf("test.%d", i),
ScalableObjectNamespace: "test",
TriggerMetadata: tm,
ResolvedEnv: nil,
AuthParams: nil,
}

s, err := scalers.NewPrometheusScaler(config)
Expand Down Expand Up @@ -141,11 +141,11 @@ var _ = Describe("ScaledObjectController", func() {
expectedExternalMetricNames := make([]string, 0)

config := &scalers.ScalerConfig{
Name: "test",
Namespace: "test",
TriggerMetadata: triggerMeta[0],
ResolvedEnv: nil,
AuthParams: nil,
ScalableObjectName: "test",
ScalableObjectNamespace: "test",
TriggerMetadata: triggerMeta[0],
ResolvedEnv: nil,
AuthParams: nil,
}

s, err := scalers.NewPrometheusScaler(config)
Expand Down Expand Up @@ -191,11 +191,11 @@ var _ = Describe("ScaledObjectController", func() {
testScalers := make([]cache.ScalerBuilder, 0)
for i := 0; i < 4; i++ {
config := &scalers.ScalerConfig{
Name: fmt.Sprintf("test.%d", i),
Namespace: "test",
TriggerMetadata: triggerMeta[0],
ResolvedEnv: nil,
AuthParams: nil,
ScalableObjectName: fmt.Sprintf("test.%d", i),
ScalableObjectNamespace: "test",
TriggerMetadata: triggerMeta[0],
ResolvedEnv: nil,
AuthParams: nil,
}

s, err := scalers.NewPrometheusScaler(config)
Expand Down
10 changes: 5 additions & 5 deletions pkg/scalers/activemq_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,10 @@ import (
"strings"
"text/template"

"github.com/go-logr/logr"
v2beta2 "k8s.io/api/autoscaling/v2beta2"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/metrics/pkg/apis/external_metrics"
logf "sigs.k8s.io/controller-runtime/pkg/log"

kedautil "github.com/kedacore/keda/v2/pkg/util"
)
Expand All @@ -24,6 +24,7 @@ type activeMQScaler struct {
metricType v2beta2.MetricTargetType
metadata *activeMQMetadata
httpClient *http.Client
logger logr.Logger
}

type activeMQMetadata struct {
Expand Down Expand Up @@ -52,8 +53,6 @@ const (
defaultActiveMQRestAPITemplate = "http://{{.ManagementEndpoint}}/api/jolokia/read/org.apache.activemq:type=Broker,brokerName={{.BrokerName}},destinationType=Queue,destinationName={{.DestinationName}}/QueueSize"
)

var activeMQLog = logf.Log.WithName("activeMQ_scaler")

// NewActiveMQScaler creates a new activeMQ Scaler
func NewActiveMQScaler(config *ScalerConfig) (Scaler, error) {
metricType, err := GetMetricTargetType(config)
Expand All @@ -71,6 +70,7 @@ func NewActiveMQScaler(config *ScalerConfig) (Scaler, error) {
metricType: metricType,
metadata: meta,
httpClient: httpClient,
logger: InitializeLogger(config, "active_mq_scaler"),
}, nil
}

Expand Down Expand Up @@ -170,7 +170,7 @@ func parseActiveMQMetadata(config *ScalerConfig) (*activeMQMetadata, error) {
func (s *activeMQScaler) IsActive(ctx context.Context) (bool, error) {
queueSize, err := s.getQueueMessageCount(ctx)
if err != nil {
activeMQLog.Error(err, "Unable to access activeMQ management endpoint", "managementEndpoint", s.metadata.managementEndpoint)
s.logger.Error(err, "Unable to access activeMQ management endpoint", "managementEndpoint", s.metadata.managementEndpoint)
return false, err
}

Expand Down Expand Up @@ -260,7 +260,7 @@ func (s *activeMQScaler) getQueueMessageCount(ctx context.Context) (int64, error
return -1, fmt.Errorf("ActiveMQ management endpoint response error code : %d %d", resp.StatusCode, monitoringInfo.Status)
}

activeMQLog.V(1).Info(fmt.Sprintf("ActiveMQ scaler: Providing metrics based on current queue size %d queue size limit %d", queueMessageCount, s.metadata.targetQueueSize))
s.logger.V(1).Info(fmt.Sprintf("ActiveMQ scaler: Providing metrics based on current queue size %d queue size limit %d", queueMessageCount, s.metadata.targetQueueSize))

return queueMessageCount, nil
}
Expand Down
12 changes: 6 additions & 6 deletions pkg/scalers/artemis_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@ import (
"strconv"
"strings"

"github.com/go-logr/logr"
v2beta2 "k8s.io/api/autoscaling/v2beta2"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/metrics/pkg/apis/external_metrics"
logf "sigs.k8s.io/controller-runtime/pkg/log"

kedautil "github.com/kedacore/keda/v2/pkg/util"
)
Expand All @@ -22,6 +22,7 @@ type artemisScaler struct {
metricType v2beta2.MetricTargetType
metadata *artemisMetadata
httpClient *http.Client
logger logr.Logger
}

//revive:disable:var-naming breaking change on restApiTemplate, wouldn't bring any benefit to users
Expand Down Expand Up @@ -55,8 +56,6 @@ const (
defaultCorsHeader = "http://%s"
)

var artemisLog = logf.Log.WithName("artemis_queue_scaler")

// NewArtemisQueueScaler creates a new artemis queue Scaler
func NewArtemisQueueScaler(config *ScalerConfig) (Scaler, error) {
// do we need to guarantee this timeout for a specific
Expand All @@ -78,6 +77,7 @@ func NewArtemisQueueScaler(config *ScalerConfig) (Scaler, error) {
metricType: metricType,
metadata: artemisMetadata,
httpClient: httpClient,
logger: InitializeLogger(config, "artemis_queue_scaler"),
}, nil
}

Expand Down Expand Up @@ -181,7 +181,7 @@ func parseArtemisMetadata(config *ScalerConfig) (*artemisMetadata, error) {
func (s *artemisScaler) IsActive(ctx context.Context) (bool, error) {
messages, err := s.getQueueMessageCount(ctx)
if err != nil {
artemisLog.Error(err, "Unable to access the artemis management endpoint", "managementEndpoint", s.metadata.managementEndpoint)
s.logger.Error(err, "Unable to access the artemis management endpoint", "managementEndpoint", s.metadata.managementEndpoint)
return false, err
}

Expand Down Expand Up @@ -262,7 +262,7 @@ func (s *artemisScaler) getQueueMessageCount(ctx context.Context) (int64, error)
return -1, fmt.Errorf("artemis management endpoint response error code : %d %d", resp.StatusCode, monitoringInfo.Status)
}

artemisLog.V(1).Info(fmt.Sprintf("Artemis scaler: Providing metrics based on current queue length %d queue length limit %d", messageCount, s.metadata.queueLength))
s.logger.V(1).Info(fmt.Sprintf("Artemis scaler: Providing metrics based on current queue length %d queue length limit %d", messageCount, s.metadata.queueLength))

return messageCount, nil
}
Expand All @@ -283,7 +283,7 @@ func (s *artemisScaler) GetMetrics(ctx context.Context, metricName string, metri
messages, err := s.getQueueMessageCount(ctx)

if err != nil {
artemisLog.Error(err, "Unable to access the artemis management endpoint", "managementEndpoint", s.metadata.managementEndpoint)
s.logger.Error(err, "Unable to access the artemis management endpoint", "managementEndpoint", s.metadata.managementEndpoint)
return []external_metrics.ExternalMetricValue{}, err
}

Expand Down
72 changes: 36 additions & 36 deletions pkg/scalers/aws_cloudwatch_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@ import (
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/cloudwatch"
"github.com/aws/aws-sdk-go/service/cloudwatch/cloudwatchiface"
"github.com/go-logr/logr"
"k8s.io/api/autoscaling/v2beta2"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/metrics/pkg/apis/external_metrics"
logf "sigs.k8s.io/controller-runtime/pkg/log"

kedautil "github.com/kedacore/keda/v2/pkg/util"
)
Expand All @@ -32,6 +32,7 @@ type awsCloudwatchScaler struct {
metricType v2beta2.MetricTargetType
metadata *awsCloudwatchMetadata
cwClient cloudwatchiface.CloudWatchAPI
logger logr.Logger
}

type awsCloudwatchMetadata struct {
Expand All @@ -58,8 +59,6 @@ type awsCloudwatchMetadata struct {
scalerIndex int
}

var cloudwatchLog = logf.Log.WithName("aws_cloudwatch_scaler")

// NewAwsCloudwatchScaler creates a new awsCloudwatchScaler
func NewAwsCloudwatchScaler(config *ScalerConfig) (Scaler, error) {
metricType, err := GetMetricTargetType(config)
Expand All @@ -76,6 +75,7 @@ func NewAwsCloudwatchScaler(config *ScalerConfig) (Scaler, error) {
metricType: metricType,
metadata: meta,
cwClient: createCloudwatchClient(meta),
logger: InitializeLogger(config, "aws_cloudwatch_scaler"),
}, nil
}

Expand Down Expand Up @@ -290,11 +290,11 @@ func computeQueryWindow(current time.Time, metricPeriodSec, metricEndTimeOffsetS
return
}

func (c *awsCloudwatchScaler) GetMetrics(ctx context.Context, metricName string, metricSelector labels.Selector) ([]external_metrics.ExternalMetricValue, error) {
metricValue, err := c.GetCloudwatchMetrics()
func (s *awsCloudwatchScaler) GetMetrics(ctx context.Context, metricName string, metricSelector labels.Selector) ([]external_metrics.ExternalMetricValue, error) {
metricValue, err := s.GetCloudwatchMetrics()

if err != nil {
cloudwatchLog.Error(err, "Error getting metric value")
s.logger.Error(err, "Error getting metric value")
return []external_metrics.ExternalMetricValue{}, err
}

Expand All @@ -303,70 +303,70 @@ func (c *awsCloudwatchScaler) GetMetrics(ctx context.Context, metricName string,
return append([]external_metrics.ExternalMetricValue{}, metric), nil
}

func (c *awsCloudwatchScaler) GetMetricSpecForScaling(context.Context) []v2beta2.MetricSpec {
func (s *awsCloudwatchScaler) GetMetricSpecForScaling(context.Context) []v2beta2.MetricSpec {
var metricNameSuffix string

if c.metadata.expression != "" {
metricNameSuffix = c.metadata.metricsName
if s.metadata.expression != "" {
metricNameSuffix = s.metadata.metricsName
} else {
metricNameSuffix = c.metadata.dimensionName[0]
metricNameSuffix = s.metadata.dimensionName[0]
}

externalMetric := &v2beta2.ExternalMetricSource{
Metric: v2beta2.MetricIdentifier{
Name: GenerateMetricNameWithIndex(c.metadata.scalerIndex, kedautil.NormalizeString(fmt.Sprintf("aws-cloudwatch-%s", metricNameSuffix))),
Name: GenerateMetricNameWithIndex(s.metadata.scalerIndex, kedautil.NormalizeString(fmt.Sprintf("aws-cloudwatch-%s", metricNameSuffix))),
},
Target: GetMetricTargetMili(c.metricType, c.metadata.targetMetricValue),
Target: GetMetricTargetMili(s.metricType, s.metadata.targetMetricValue),
}
metricSpec := v2beta2.MetricSpec{External: externalMetric, Type: externalMetricType}
return []v2beta2.MetricSpec{metricSpec}
}

func (c *awsCloudwatchScaler) IsActive(ctx context.Context) (bool, error) {
val, err := c.GetCloudwatchMetrics()
func (s *awsCloudwatchScaler) IsActive(ctx context.Context) (bool, error) {
val, err := s.GetCloudwatchMetrics()

if err != nil {
return false, err
}

return val > c.metadata.activationTargetMetricValue, nil
return val > s.metadata.activationTargetMetricValue, nil
}

func (c *awsCloudwatchScaler) Close(context.Context) error {
func (s *awsCloudwatchScaler) Close(context.Context) error {
return nil
}

func (c *awsCloudwatchScaler) GetCloudwatchMetrics() (float64, error) {
func (s *awsCloudwatchScaler) GetCloudwatchMetrics() (float64, error) {
var input cloudwatch.GetMetricDataInput

startTime, endTime := computeQueryWindow(time.Now(), c.metadata.metricStatPeriod, c.metadata.metricEndTimeOffset, c.metadata.metricCollectionTime)
startTime, endTime := computeQueryWindow(time.Now(), s.metadata.metricStatPeriod, s.metadata.metricEndTimeOffset, s.metadata.metricCollectionTime)

if c.metadata.expression != "" {
if s.metadata.expression != "" {
input = cloudwatch.GetMetricDataInput{
StartTime: aws.Time(startTime),
EndTime: aws.Time(endTime),
ScanBy: aws.String(cloudwatch.ScanByTimestampDescending),
MetricDataQueries: []*cloudwatch.MetricDataQuery{
{
Expression: aws.String(c.metadata.expression),
Expression: aws.String(s.metadata.expression),
Id: aws.String("q1"),
Period: aws.Int64(c.metadata.metricStatPeriod),
Label: aws.String(c.metadata.metricsName),
Period: aws.Int64(s.metadata.metricStatPeriod),
Label: aws.String(s.metadata.metricsName),
},
},
}
} else {
dimensions := []*cloudwatch.Dimension{}
for i := range c.metadata.dimensionName {
for i := range s.metadata.dimensionName {
dimensions = append(dimensions, &cloudwatch.Dimension{
Name: &c.metadata.dimensionName[i],
Value: &c.metadata.dimensionValue[i],
Name: &s.metadata.dimensionName[i],
Value: &s.metadata.dimensionValue[i],
})
}

var metricUnit *string
if c.metadata.metricUnit != "" {
metricUnit = aws.String(c.metadata.metricUnit)
if s.metadata.metricUnit != "" {
metricUnit = aws.String(s.metadata.metricUnit)
}

input = cloudwatch.GetMetricDataInput{
Expand All @@ -378,12 +378,12 @@ func (c *awsCloudwatchScaler) GetCloudwatchMetrics() (float64, error) {
Id: aws.String("c1"),
MetricStat: &cloudwatch.MetricStat{
Metric: &cloudwatch.Metric{
Namespace: aws.String(c.metadata.namespace),
Namespace: aws.String(s.metadata.namespace),
Dimensions: dimensions,
MetricName: aws.String(c.metadata.metricsName),
MetricName: aws.String(s.metadata.metricsName),
},
Period: aws.Int64(c.metadata.metricStatPeriod),
Stat: aws.String(c.metadata.metricStat),
Period: aws.Int64(s.metadata.metricStatPeriod),
Stat: aws.String(s.metadata.metricStat),
Unit: metricUnit,
},
ReturnData: aws.Bool(true),
Expand All @@ -392,20 +392,20 @@ func (c *awsCloudwatchScaler) GetCloudwatchMetrics() (float64, error) {
}
}

output, err := c.cwClient.GetMetricData(&input)
output, err := s.cwClient.GetMetricData(&input)

if err != nil {
cloudwatchLog.Error(err, "Failed to get output")
s.logger.Error(err, "Failed to get output")
return -1, err
}

cloudwatchLog.V(1).Info("Received Metric Data", "data", output)
s.logger.V(1).Info("Received Metric Data", "data", output)
var metricValue float64
if len(output.MetricDataResults) > 0 && len(output.MetricDataResults[0].Values) > 0 {
metricValue = *output.MetricDataResults[0].Values[0]
} else {
cloudwatchLog.Info("empty metric data received, returning minMetricValue")
metricValue = c.metadata.minMetricValue
s.logger.Info("empty metric data received, returning minMetricValue")
metricValue = s.metadata.minMetricValue
}

return metricValue, nil
Expand Down
5 changes: 3 additions & 2 deletions pkg/scalers/aws_cloudwatch_scaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/cloudwatch"
"github.com/aws/aws-sdk-go/service/cloudwatch/cloudwatchiface"
"github.com/go-logr/logr"
"github.com/stretchr/testify/assert"
"k8s.io/apimachinery/pkg/labels"
)
Expand Down Expand Up @@ -487,7 +488,7 @@ func TestAWSCloudwatchGetMetricSpecForScaling(t *testing.T) {
if err != nil {
t.Fatal("Could not parse metadata:", err)
}
mockAWSCloudwatchScaler := awsCloudwatchScaler{"", meta, &mockCloudwatch{}}
mockAWSCloudwatchScaler := awsCloudwatchScaler{"", meta, &mockCloudwatch{}, logr.Logger{}}

metricSpec := mockAWSCloudwatchScaler.GetMetricSpecForScaling(ctx)
metricName := metricSpec[0].External.Metric.Name
Expand All @@ -500,7 +501,7 @@ func TestAWSCloudwatchGetMetricSpecForScaling(t *testing.T) {
func TestAWSCloudwatchScalerGetMetrics(t *testing.T) {
var selector labels.Selector
for _, meta := range awsCloudwatchGetMetricTestData {
mockAWSCloudwatchScaler := awsCloudwatchScaler{"", &meta, &mockCloudwatch{}}
mockAWSCloudwatchScaler := awsCloudwatchScaler{"", &meta, &mockCloudwatch{}, logr.Logger{}}
value, err := mockAWSCloudwatchScaler.GetMetrics(context.Background(), meta.metricsName, selector)
switch meta.metricsName {
case testAWSCloudwatchErrorMetric:
Expand Down
Loading

0 comments on commit 6b9114b

Please sign in to comment.