From 015bcab7ddeb5688787e2cb68fce7e221bfc1094 Mon Sep 17 00:00:00 2001 From: Zbynek Roubalik Date: Tue, 2 Aug 2022 22:41:13 +0200 Subject: [PATCH] Reference ScaledObject's/ScaledJob's name in the scalers log Signed-off-by: Zbynek Roubalik --- CHANGELOG.md | 1 + .../keda/scaledobject_controller_test.go | 30 +++++++++---------- pkg/scalers/activemq_scaler.go | 10 +++---- pkg/scalers/artemis_scaler.go | 12 ++++---- pkg/scalers/azure_data_explorer_scaler.go | 22 +++++++------- .../azure_data_explorer_scaler_test.go | 12 +++++--- pkg/scalers/azure_log_analytics_scaler.go | 26 ++++++++-------- pkg/scalers/external_scaler.go | 26 ++++++++-------- pkg/scalers/external_scaler_test.go | 2 +- pkg/scalers/kubernetes_workload_scaler.go | 5 +++- .../kubernetes_workload_scaler_test.go | 26 ++++++++-------- pkg/scalers/scaler.go | 19 ++++++++---- pkg/scaling/scale_handler.go | 17 ++++++----- 13 files changed, 113 insertions(+), 95 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index ee242c652e3..b1a1abf7a3a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -47,6 +47,7 @@ To learn more about our roadmap, we recommend reading [this document](ROADMAP.md ### Improvements - **General:** `external` extension reduces connection establishment with long links ([#3193](https://github.com/kedacore/keda/issues/3193)) +- **General:** Reference ScaledObject's/ScaledJob's name in the scalers log ([3419](https://github.com/kedacore/keda/issues/3419)) - **General:** Use `mili` scale for the returned metrics ([#3135](https://github.com/kedacore/keda/issue/3135)) - **General:** Use more readable timestamps in KEDA Operator logs ([#3066](https://github.com/kedacore/keda/issue/3066)) - **AWS SQS Queue Scaler:** Support for scaling to include in-flight messages. ([#3133](https://github.com/kedacore/keda/issues/3133)) diff --git a/controllers/keda/scaledobject_controller_test.go b/controllers/keda/scaledobject_controller_test.go index aa5aa8ed95c..e88cd228526 100644 --- a/controllers/keda/scaledobject_controller_test.go +++ b/controllers/keda/scaledobject_controller_test.go @@ -91,11 +91,11 @@ var _ = Describe("ScaledObjectController", func() { for i, tm := range triggerMeta { config := &scalers.ScalerConfig{ - Name: fmt.Sprintf("test.%d", i), - Namespace: "test", - TriggerMetadata: tm, - ResolvedEnv: nil, - AuthParams: nil, + ScalableObjectName: fmt.Sprintf("test.%d", i), + ScalableObjectNamespace: "test", + TriggerMetadata: tm, + ResolvedEnv: nil, + AuthParams: nil, } s, err := scalers.NewPrometheusScaler(config) @@ -141,11 +141,11 @@ var _ = Describe("ScaledObjectController", func() { expectedExternalMetricNames := make([]string, 0) config := &scalers.ScalerConfig{ - Name: "test", - Namespace: "test", - TriggerMetadata: triggerMeta[0], - ResolvedEnv: nil, - AuthParams: nil, + ScalableObjectName: "test", + ScalableObjectNamespace: "test", + TriggerMetadata: triggerMeta[0], + ResolvedEnv: nil, + AuthParams: nil, } s, err := scalers.NewPrometheusScaler(config) @@ -191,11 +191,11 @@ var _ = Describe("ScaledObjectController", func() { testScalers := make([]cache.ScalerBuilder, 0) for i := 0; i < 4; i++ { config := &scalers.ScalerConfig{ - Name: fmt.Sprintf("test.%d", i), - Namespace: "test", - TriggerMetadata: triggerMeta[0], - ResolvedEnv: nil, - AuthParams: nil, + ScalableObjectName: fmt.Sprintf("test.%d", i), + ScalableObjectNamespace: "test", + TriggerMetadata: triggerMeta[0], + ResolvedEnv: nil, + AuthParams: nil, } s, err := scalers.NewPrometheusScaler(config) diff --git a/pkg/scalers/activemq_scaler.go b/pkg/scalers/activemq_scaler.go index 84ba3f6f5a6..c9532d59fad 100644 --- a/pkg/scalers/activemq_scaler.go +++ b/pkg/scalers/activemq_scaler.go @@ -12,10 +12,10 @@ import ( "strings" "text/template" + "github.com/go-logr/logr" v2beta2 "k8s.io/api/autoscaling/v2beta2" "k8s.io/apimachinery/pkg/labels" "k8s.io/metrics/pkg/apis/external_metrics" - logf "sigs.k8s.io/controller-runtime/pkg/log" kedautil "github.com/kedacore/keda/v2/pkg/util" ) @@ -24,6 +24,7 @@ type activeMQScaler struct { metricType v2beta2.MetricTargetType metadata *activeMQMetadata httpClient *http.Client + logger logr.Logger } type activeMQMetadata struct { @@ -52,8 +53,6 @@ const ( defaultActiveMQRestAPITemplate = "http://{{.ManagementEndpoint}}/api/jolokia/read/org.apache.activemq:type=Broker,brokerName={{.BrokerName}},destinationType=Queue,destinationName={{.DestinationName}}/QueueSize" ) -var activeMQLog = logf.Log.WithName("activeMQ_scaler") - // NewActiveMQScaler creates a new activeMQ Scaler func NewActiveMQScaler(config *ScalerConfig) (Scaler, error) { metricType, err := GetMetricTargetType(config) @@ -71,6 +70,7 @@ func NewActiveMQScaler(config *ScalerConfig) (Scaler, error) { metricType: metricType, metadata: meta, httpClient: httpClient, + logger: InitializeLogger(config, "active_mq_scaler"), }, nil } @@ -170,7 +170,7 @@ func parseActiveMQMetadata(config *ScalerConfig) (*activeMQMetadata, error) { func (s *activeMQScaler) IsActive(ctx context.Context) (bool, error) { queueSize, err := s.getQueueMessageCount(ctx) if err != nil { - activeMQLog.Error(err, "Unable to access activeMQ management endpoint", "managementEndpoint", s.metadata.managementEndpoint) + s.logger.Error(err, "Unable to access activeMQ management endpoint", "managementEndpoint", s.metadata.managementEndpoint) return false, err } @@ -260,7 +260,7 @@ func (s *activeMQScaler) getQueueMessageCount(ctx context.Context) (int64, error return -1, fmt.Errorf("ActiveMQ management endpoint response error code : %d %d", resp.StatusCode, monitoringInfo.Status) } - activeMQLog.V(1).Info(fmt.Sprintf("ActiveMQ scaler: Providing metrics based on current queue size %d queue size limit %d", queueMessageCount, s.metadata.targetQueueSize)) + s.logger.V(1).Info(fmt.Sprintf("ActiveMQ scaler: Providing metrics based on current queue size %d queue size limit %d", queueMessageCount, s.metadata.targetQueueSize)) return queueMessageCount, nil } diff --git a/pkg/scalers/artemis_scaler.go b/pkg/scalers/artemis_scaler.go index ca01ed075d6..441aeb38837 100644 --- a/pkg/scalers/artemis_scaler.go +++ b/pkg/scalers/artemis_scaler.go @@ -10,10 +10,10 @@ import ( "strconv" "strings" + "github.com/go-logr/logr" v2beta2 "k8s.io/api/autoscaling/v2beta2" "k8s.io/apimachinery/pkg/labels" "k8s.io/metrics/pkg/apis/external_metrics" - logf "sigs.k8s.io/controller-runtime/pkg/log" kedautil "github.com/kedacore/keda/v2/pkg/util" ) @@ -22,6 +22,7 @@ type artemisScaler struct { metricType v2beta2.MetricTargetType metadata *artemisMetadata httpClient *http.Client + logger logr.Logger } //revive:disable:var-naming breaking change on restApiTemplate, wouldn't bring any benefit to users @@ -55,8 +56,6 @@ const ( defaultCorsHeader = "http://%s" ) -var artemisLog = logf.Log.WithName("artemis_queue_scaler") - // NewArtemisQueueScaler creates a new artemis queue Scaler func NewArtemisQueueScaler(config *ScalerConfig) (Scaler, error) { // do we need to guarantee this timeout for a specific @@ -78,6 +77,7 @@ func NewArtemisQueueScaler(config *ScalerConfig) (Scaler, error) { metricType: metricType, metadata: artemisMetadata, httpClient: httpClient, + logger: InitializeLogger(config, "artemis_queue_scaler"), }, nil } @@ -181,7 +181,7 @@ func parseArtemisMetadata(config *ScalerConfig) (*artemisMetadata, error) { func (s *artemisScaler) IsActive(ctx context.Context) (bool, error) { messages, err := s.getQueueMessageCount(ctx) if err != nil { - artemisLog.Error(err, "Unable to access the artemis management endpoint", "managementEndpoint", s.metadata.managementEndpoint) + s.logger.Error(err, "Unable to access the artemis management endpoint", "managementEndpoint", s.metadata.managementEndpoint) return false, err } @@ -262,7 +262,7 @@ func (s *artemisScaler) getQueueMessageCount(ctx context.Context) (int64, error) return -1, fmt.Errorf("artemis management endpoint response error code : %d %d", resp.StatusCode, monitoringInfo.Status) } - artemisLog.V(1).Info(fmt.Sprintf("Artemis scaler: Providing metrics based on current queue length %d queue length limit %d", messageCount, s.metadata.queueLength)) + s.logger.V(1).Info(fmt.Sprintf("Artemis scaler: Providing metrics based on current queue length %d queue length limit %d", messageCount, s.metadata.queueLength)) return messageCount, nil } @@ -283,7 +283,7 @@ func (s *artemisScaler) GetMetrics(ctx context.Context, metricName string, metri messages, err := s.getQueueMessageCount(ctx) if err != nil { - artemisLog.Error(err, "Unable to access the artemis management endpoint", "managementEndpoint", s.metadata.managementEndpoint) + s.logger.Error(err, "Unable to access the artemis management endpoint", "managementEndpoint", s.metadata.managementEndpoint) return []external_metrics.ExternalMetricValue{}, err } diff --git a/pkg/scalers/azure_data_explorer_scaler.go b/pkg/scalers/azure_data_explorer_scaler.go index 21eb28dc313..e792d38e8bb 100644 --- a/pkg/scalers/azure_data_explorer_scaler.go +++ b/pkg/scalers/azure_data_explorer_scaler.go @@ -22,10 +22,10 @@ import ( "strconv" "github.com/Azure/azure-kusto-go/kusto" + "github.com/go-logr/logr" "k8s.io/api/autoscaling/v2beta2" "k8s.io/apimachinery/pkg/labels" "k8s.io/metrics/pkg/apis/external_metrics" - logf "sigs.k8s.io/controller-runtime/pkg/log" kedav1alpha1 "github.com/kedacore/keda/v2/apis/keda/v1alpha1" "github.com/kedacore/keda/v2/pkg/scalers/azure" @@ -42,15 +42,15 @@ type azureDataExplorerScaler struct { const adxName = "azure-data-explorer" -var dataExplorerLogger = logf.Log.WithName("azure_data_explorer_scaler") - func NewAzureDataExplorerScaler(ctx context.Context, config *ScalerConfig) (Scaler, error) { metricType, err := GetMetricTargetType(config) if err != nil { return nil, fmt.Errorf("error getting scaler metric type: %s", err) } - metadata, err := parseAzureDataExplorerMetadata(config) + logger := InitializeLogger(config, "azure_data_explorer_scaler") + + metadata, err := parseAzureDataExplorerMetadata(config, logger) if err != nil { return nil, fmt.Errorf("failed to parse azure data explorer metadata: %s", err) } @@ -64,13 +64,13 @@ func NewAzureDataExplorerScaler(ctx context.Context, config *ScalerConfig) (Scal metricType: metricType, metadata: metadata, client: client, - name: config.Name, - namespace: config.Namespace, + name: config.ScalableObjectName, + namespace: config.ScalableObjectNamespace, }, nil } -func parseAzureDataExplorerMetadata(config *ScalerConfig) (*azure.DataExplorerMetadata, error) { - metadata, err := parseAzureDataExplorerAuthParams(config) +func parseAzureDataExplorerMetadata(config *ScalerConfig, logger logr.Logger) (*azure.DataExplorerMetadata, error) { + metadata, err := parseAzureDataExplorerAuthParams(config, logger) if err != nil { return nil, err } @@ -124,7 +124,7 @@ func parseAzureDataExplorerMetadata(config *ScalerConfig) (*azure.DataExplorerMe } metadata.ActiveDirectoryEndpoint = activeDirectoryEndpoint - dataExplorerLogger.V(1).Info("Parsed azureDataExplorerMetadata", + logger.V(1).Info("Parsed azureDataExplorerMetadata", "database", metadata.DatabaseName, "endpoint", metadata.Endpoint, "metricName", metadata.MetricName, @@ -136,14 +136,14 @@ func parseAzureDataExplorerMetadata(config *ScalerConfig) (*azure.DataExplorerMe return metadata, nil } -func parseAzureDataExplorerAuthParams(config *ScalerConfig) (*azure.DataExplorerMetadata, error) { +func parseAzureDataExplorerAuthParams(config *ScalerConfig, logger logr.Logger) (*azure.DataExplorerMetadata, error) { metadata := azure.DataExplorerMetadata{} switch config.PodIdentity.Provider { case kedav1alpha1.PodIdentityProviderAzure, kedav1alpha1.PodIdentityProviderAzureWorkload: metadata.PodIdentity = config.PodIdentity case "", kedav1alpha1.PodIdentityProviderNone: - dataExplorerLogger.V(1).Info("Pod Identity is not provided. Trying to resolve clientId, clientSecret and tenantId.") + logger.V(1).Info("Pod Identity is not provided. Trying to resolve clientId, clientSecret and tenantId.") tenantID, err := getParameterFromConfig(config, "tenantId", true) if err != nil { diff --git a/pkg/scalers/azure_data_explorer_scaler_test.go b/pkg/scalers/azure_data_explorer_scaler_test.go index 731a716be02..be3c3f186ce 100644 --- a/pkg/scalers/azure_data_explorer_scaler_test.go +++ b/pkg/scalers/azure_data_explorer_scaler_test.go @@ -21,6 +21,8 @@ import ( "fmt" "testing" + "github.com/go-logr/logr" + kedav1alpha1 "github.com/kedacore/keda/v2/apis/keda/v1alpha1" kedautil "github.com/kedacore/keda/v2/pkg/util" ) @@ -111,7 +113,8 @@ func TestDataExplorerParseMetadata(t *testing.T) { ResolvedEnv: dataExplorerResolvedEnv, TriggerMetadata: testData.metadata, AuthParams: map[string]string{}, - PodIdentity: kedav1alpha1.AuthPodIdentity{}}) + PodIdentity: kedav1alpha1.AuthPodIdentity{}}, + logr.Discard()) if err != nil && !testData.isError { t.Error("Expected success but got error", err) @@ -128,7 +131,7 @@ func TestDataExplorerParseMetadata(t *testing.T) { ResolvedEnv: dataExplorerResolvedEnv, TriggerMetadata: testData.metadata, AuthParams: map[string]string{}, - PodIdentity: kedav1alpha1.AuthPodIdentity{Provider: kedav1alpha1.PodIdentityProviderAzure}}) + PodIdentity: kedav1alpha1.AuthPodIdentity{Provider: kedav1alpha1.PodIdentityProviderAzure}}, logr.Discard()) if err != nil && !testData.isError { t.Error("Expected success but got error", err) @@ -145,7 +148,7 @@ func TestDataExplorerParseMetadata(t *testing.T) { ResolvedEnv: dataExplorerResolvedEnv, TriggerMetadata: testData.metadata, AuthParams: map[string]string{}, - PodIdentity: kedav1alpha1.AuthPodIdentity{Provider: kedav1alpha1.PodIdentityProviderAzureWorkload}}) + PodIdentity: kedav1alpha1.AuthPodIdentity{Provider: kedav1alpha1.PodIdentityProviderAzureWorkload}}, logr.Discard()) if err != nil && !testData.isError { t.Error("Expected success but got error", err) @@ -164,7 +167,8 @@ func TestDataExplorerGetMetricSpecForScaling(t *testing.T) { TriggerMetadata: testData.metadataTestData.metadata, AuthParams: map[string]string{}, PodIdentity: kedav1alpha1.AuthPodIdentity{}, - ScalerIndex: testData.scalerIndex}) + ScalerIndex: testData.scalerIndex}, + logr.Discard()) if err != nil { t.Error("Failed to parse metadata:", err) } diff --git a/pkg/scalers/azure_log_analytics_scaler.go b/pkg/scalers/azure_log_analytics_scaler.go index 5f26d42e3b6..c61ccc6105b 100644 --- a/pkg/scalers/azure_log_analytics_scaler.go +++ b/pkg/scalers/azure_log_analytics_scaler.go @@ -32,10 +32,10 @@ import ( "time" "github.com/Azure/azure-amqp-common-go/v3/auth" + "github.com/go-logr/logr" v2beta2 "k8s.io/api/autoscaling/v2beta2" "k8s.io/apimachinery/pkg/labels" "k8s.io/metrics/pkg/apis/external_metrics" - logf "sigs.k8s.io/controller-runtime/pkg/log" kedav1alpha1 "github.com/kedacore/keda/v2/apis/keda/v1alpha1" "github.com/kedacore/keda/v2/pkg/scalers/azure" @@ -55,6 +55,7 @@ type azureLogAnalyticsScaler struct { name string namespace string httpClient *http.Client + logger logr.Logger } type azureLogAnalyticsMetadata struct { @@ -109,8 +110,6 @@ var tokenCache = struct { m map[string]tokenData }{m: make(map[string]tokenData)} -var logAnalyticsLog = logf.Log.WithName("azure_log_analytics_scaler") - var logAnalyticsResourceURLInCloud = map[string]string{ "AZUREPUBLICCLOUD": "https://api.loganalytics.io", "AZUREUSGOVERNMENTCLOUD": "https://api.loganalytics.us", @@ -126,16 +125,17 @@ func NewAzureLogAnalyticsScaler(config *ScalerConfig) (Scaler, error) { azureLogAnalyticsMetadata, err := parseAzureLogAnalyticsMetadata(config) if err != nil { - return nil, fmt.Errorf("failed to initialize Log Analytics scaler. Scaled object: %s. Namespace: %s. Inner Error: %v", config.Name, config.Namespace, err) + return nil, fmt.Errorf("failed to initialize Log Analytics scaler. Scaled object: %s. Namespace: %s. Inner Error: %v", config.ScalableObjectName, config.ScalableObjectNamespace, err) } return &azureLogAnalyticsScaler{ metricType: metricType, metadata: azureLogAnalyticsMetadata, cache: &sessionCache{metricValue: -1, metricThreshold: -1}, - name: config.Name, - namespace: config.Namespace, + name: config.ScalableObjectName, + namespace: config.ScalableObjectNamespace, httpClient: kedautil.CreateHTTPClient(config.GlobalHTTPTimeout, false), + logger: InitializeLogger(config, "azure_log_analytics_scaler"), }, nil } @@ -268,7 +268,7 @@ func (s *azureLogAnalyticsScaler) GetMetricSpecForScaling(ctx context.Context) [ err := s.updateCache(ctx) if err != nil { - logAnalyticsLog.V(1).Info("failed to get metric spec.", "Scaled object", s.name, "Namespace", s.namespace, "Inner Error", err) + s.logger.V(1).Info("failed to get metric spec.", "Scaled object", s.name, "Namespace", s.namespace, "Inner Error", err) return nil } @@ -330,7 +330,7 @@ func (s *azureLogAnalyticsScaler) getMetricData(ctx context.Context) (metricsDat return metricsData{}, err } - logAnalyticsLog.V(1).Info("Providing metric value", "metrics value", metricsInfo.value, "scaler name", s.name, "namespace", s.namespace) + s.logger.V(1).Info("Providing metric value", "metrics value", metricsInfo.value, "scaler name", s.name, "namespace", s.namespace) return metricsInfo, nil } @@ -355,10 +355,10 @@ func (s *azureLogAnalyticsScaler) getAccessToken(ctx context.Context) (tokenData switch s.metadata.podIdentity.Provider { case "", kedav1alpha1.PodIdentityProviderNone: - logAnalyticsLog.V(1).Info("Token for Service Principal has been refreshed", "clientID", s.metadata.clientID, "scaler name", s.name, "namespace", s.namespace) + s.logger.V(1).Info("Token for Service Principal has been refreshed", "clientID", s.metadata.clientID, "scaler name", s.name, "namespace", s.namespace) _ = setTokenInCache(s.metadata.clientID, s.metadata.clientSecret, newTokenInfo) case kedav1alpha1.PodIdentityProviderAzure, kedav1alpha1.PodIdentityProviderAzureWorkload: - logAnalyticsLog.V(1).Info("Token for Pod Identity has been refreshed", "type", s.metadata.podIdentity, "scaler name", s.name, "namespace", s.namespace) + s.logger.V(1).Info("Token for Pod Identity has been refreshed", "type", s.metadata.podIdentity, "scaler name", s.name, "namespace", s.namespace) _ = setTokenInCache(string(s.metadata.podIdentity.Provider), string(s.metadata.podIdentity.Provider), newTokenInfo) } @@ -384,10 +384,10 @@ func (s *azureLogAnalyticsScaler) executeQuery(ctx context.Context, query string switch s.metadata.podIdentity.Provider { case "", kedav1alpha1.PodIdentityProviderNone: - logAnalyticsLog.V(1).Info("Token for Service Principal has been refreshed", "clientID", s.metadata.clientID, "scaler name", s.name, "namespace", s.namespace) + s.logger.V(1).Info("Token for Service Principal has been refreshed", "clientID", s.metadata.clientID, "scaler name", s.name, "namespace", s.namespace) _ = setTokenInCache(s.metadata.clientID, s.metadata.clientSecret, tokenInfo) case kedav1alpha1.PodIdentityProviderAzure, kedav1alpha1.PodIdentityProviderAzureWorkload: - logAnalyticsLog.V(1).Info("Token for Pod Identity has been refreshed", "type", s.metadata.podIdentity, "scaler name", s.name, "namespace", s.namespace) + s.logger.V(1).Info("Token for Pod Identity has been refreshed", "type", s.metadata.podIdentity, "scaler name", s.name, "namespace", s.namespace) _ = setTokenInCache(string(s.metadata.podIdentity.Provider), string(s.metadata.podIdentity.Provider), tokenInfo) } @@ -492,7 +492,7 @@ func (s *azureLogAnalyticsScaler) refreshAccessToken(ctx context.Context) (token if currentTimeSec < tokenInfo.NotBefore { if currentTimeSec < tokenInfo.NotBefore+10 { sleepDurationSec := int(tokenInfo.NotBefore - currentTimeSec + 1) - logAnalyticsLog.V(1).Info("AAD token not ready", "delay (seconds)", sleepDurationSec, "scaler name", s.name, "namespace", s.namespace) + s.logger.V(1).Info("AAD token not ready", "delay (seconds)", sleepDurationSec, "scaler name", s.name, "namespace", s.namespace) time.Sleep(time.Duration(sleepDurationSec) * time.Second) } else { return tokenData{}, fmt.Errorf("error getting access token. Details: AAD token has been received, but start date begins in %d seconds, so current operation will be skipped", tokenInfo.NotBefore-currentTimeSec) diff --git a/pkg/scalers/external_scaler.go b/pkg/scalers/external_scaler.go index 6109e7724a3..22a205b262b 100644 --- a/pkg/scalers/external_scaler.go +++ b/pkg/scalers/external_scaler.go @@ -7,6 +7,7 @@ import ( "sync" "time" + "github.com/go-logr/logr" "github.com/mitchellh/hashstructure" "google.golang.org/grpc" "google.golang.org/grpc/connectivity" @@ -15,7 +16,6 @@ import ( v2beta2 "k8s.io/api/autoscaling/v2beta2" "k8s.io/apimachinery/pkg/labels" "k8s.io/metrics/pkg/apis/external_metrics" - logf "sigs.k8s.io/controller-runtime/pkg/log" pb "github.com/kedacore/keda/v2/pkg/scalers/externalscaler" ) @@ -24,6 +24,7 @@ type externalScaler struct { metricType v2beta2.MetricTargetType metadata externalScalerMetadata scaledObjectRef pb.ScaledObjectRef + logger logr.Logger } type externalPushScaler struct { @@ -44,8 +45,6 @@ type connectionGroup struct { // a pool of connectionGroup per metadata hash var connectionPool sync.Map -var externalLog = logf.Log.WithName("external_scaler") - // NewExternalScaler creates a new external scaler - calls the GRPC interface // to create a new scaler func NewExternalScaler(config *ScalerConfig) (Scaler, error) { @@ -63,10 +62,11 @@ func NewExternalScaler(config *ScalerConfig) (Scaler, error) { metricType: metricType, metadata: meta, scaledObjectRef: pb.ScaledObjectRef{ - Name: config.Name, - Namespace: config.Namespace, + Name: config.ScalableObjectName, + Namespace: config.ScalableObjectNamespace, ScalerMetadata: meta.originalMetadata, }, + logger: InitializeLogger(config, "external_scaler"), }, nil } @@ -87,8 +87,8 @@ func NewExternalPushScaler(config *ScalerConfig) (PushScaler, error) { metricType: metricType, metadata: meta, scaledObjectRef: pb.ScaledObjectRef{ - Name: config.Name, - Namespace: config.Namespace, + Name: config.ScalableObjectName, + Namespace: config.ScalableObjectNamespace, ScalerMetadata: meta.originalMetadata, }, }, @@ -137,7 +137,7 @@ func (s *externalScaler) IsActive(ctx context.Context) (bool, error) { response, err := grpcClient.IsActive(ctx, &s.scaledObjectRef) if err != nil { - externalLog.Error(err, "error calling IsActive on external scaler") + s.logger.Error(err, "error calling IsActive on external scaler") return false, err } @@ -154,13 +154,13 @@ func (s *externalScaler) GetMetricSpecForScaling(ctx context.Context) []v2beta2. grpcClient, err := getClientForConnectionPool(s.metadata) if err != nil { - externalLog.Error(err, "error building grpc connection") + s.logger.Error(err, "error building grpc connection") return result } response, err := grpcClient.GetMetricSpec(ctx, &s.scaledObjectRef) if err != nil { - externalLog.Error(err, "error") + s.logger.Error(err, "error") return nil } @@ -205,7 +205,7 @@ func (s *externalScaler) GetMetrics(ctx context.Context, metricName string, metr response, err := grpcClient.GetMetrics(ctx, request) if err != nil { - externalLog.Error(err, "error") + s.logger.Error(err, "error") return []external_metrics.ExternalMetricValue{}, err } @@ -224,11 +224,11 @@ func (s *externalPushScaler) Run(ctx context.Context, active chan<- bool) { runWithLog := func() { grpcClient, err := getClientForConnectionPool(s.metadata) if err != nil { - externalLog.Error(err, "error running internalRun") + s.logger.Error(err, "error running internalRun") return } if err := handleIsActiveStream(ctx, s.scaledObjectRef, grpcClient, active); err != nil { - externalLog.Error(err, "error running internalRun") + s.logger.Error(err, "error running internalRun") return } } diff --git a/pkg/scalers/external_scaler_test.go b/pkg/scalers/external_scaler_test.go index 4d97f08d455..dde0e680281 100644 --- a/pkg/scalers/external_scaler_test.go +++ b/pkg/scalers/external_scaler_test.go @@ -55,7 +55,7 @@ func TestExternalPushScaler_Run(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) for i := 0; i < serverCount*iterationCount; i++ { id := i % serverCount - pushScaler, _ := NewExternalPushScaler(&ScalerConfig{Name: "app", Namespace: "namespace", TriggerMetadata: map[string]string{"scalerAddress": servers[id].address}, ResolvedEnv: map[string]string{}}) + pushScaler, _ := NewExternalPushScaler(&ScalerConfig{ScalableObjectName: "app", ScalableObjectNamespace: "namespace", TriggerMetadata: map[string]string{"scalerAddress": servers[id].address}, ResolvedEnv: map[string]string{}}) go pushScaler.Run(ctx, replyCh[i]) } diff --git a/pkg/scalers/kubernetes_workload_scaler.go b/pkg/scalers/kubernetes_workload_scaler.go index e5f266b6234..6c7b3d10764 100644 --- a/pkg/scalers/kubernetes_workload_scaler.go +++ b/pkg/scalers/kubernetes_workload_scaler.go @@ -5,6 +5,7 @@ import ( "fmt" "strconv" + "github.com/go-logr/logr" "k8s.io/api/autoscaling/v2beta2" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/labels" @@ -18,6 +19,7 @@ type kubernetesWorkloadScaler struct { metricType v2beta2.MetricTargetType metadata *kubernetesWorkloadMetadata kubeClient client.Client + logger logr.Logger } const ( @@ -56,13 +58,14 @@ func NewKubernetesWorkloadScaler(kubeClient client.Client, config *ScalerConfig) metricType: metricType, metadata: meta, kubeClient: kubeClient, + logger: InitializeLogger(config, "kubernetes_workload_scaler"), }, nil } func parseWorkloadMetadata(config *ScalerConfig) (*kubernetesWorkloadMetadata, error) { meta := &kubernetesWorkloadMetadata{} var err error - meta.namespace = config.Namespace + meta.namespace = config.ScalableObjectNamespace meta.podSelector, err = labels.Parse(config.TriggerMetadata[podSelectorKey]) if err != nil || meta.podSelector.String() == "" { return nil, fmt.Errorf("invalid pod selector") diff --git a/pkg/scalers/kubernetes_workload_scaler_test.go b/pkg/scalers/kubernetes_workload_scaler_test.go index 53ae145ea69..d7e26e4da05 100644 --- a/pkg/scalers/kubernetes_workload_scaler_test.go +++ b/pkg/scalers/kubernetes_workload_scaler_test.go @@ -36,7 +36,7 @@ var parseWorkloadMetadataTestDataset = []workloadMetadataTestData{ func TestParseWorkloadMetadata(t *testing.T) { for _, testData := range parseWorkloadMetadataTestDataset { - _, err := parseWorkloadMetadata(&ScalerConfig{TriggerMetadata: testData.metadata, Namespace: testData.namespace}) + _, err := parseWorkloadMetadata(&ScalerConfig{TriggerMetadata: testData.metadata, ScalableObjectNamespace: testData.namespace}) if err != nil && !testData.isError { t.Error("Expected success but got error", err) } @@ -69,10 +69,10 @@ func TestWorkloadIsActive(t *testing.T) { s, _ := NewKubernetesWorkloadScaler( fake.NewClientBuilder().WithRuntimeObjects(createPodlist(testData.podCount)).Build(), &ScalerConfig{ - TriggerMetadata: testData.metadata, - AuthParams: map[string]string{}, - GlobalHTTPTimeout: 1000 * time.Millisecond, - Namespace: testData.namespace, + TriggerMetadata: testData.metadata, + AuthParams: map[string]string{}, + GlobalHTTPTimeout: 1000 * time.Millisecond, + ScalableObjectNamespace: testData.namespace, }, ) isActive, _ := s.IsActive(context.TODO()) @@ -108,11 +108,11 @@ func TestWorkloadGetMetricSpecForScaling(t *testing.T) { s, _ := NewKubernetesWorkloadScaler( fake.NewClientBuilder().Build(), &ScalerConfig{ - TriggerMetadata: testData.metadata, - AuthParams: map[string]string{}, - GlobalHTTPTimeout: 1000 * time.Millisecond, - Namespace: testData.namespace, - ScalerIndex: testData.scalerIndex, + TriggerMetadata: testData.metadata, + AuthParams: map[string]string{}, + GlobalHTTPTimeout: 1000 * time.Millisecond, + ScalableObjectNamespace: testData.namespace, + ScalerIndex: testData.scalerIndex, }, ) metric := s.GetMetricSpecForScaling(context.Background()) @@ -175,9 +175,9 @@ func TestWorkloadPhase(t *testing.T) { "podSelector": "app=testphases", "value": "1", }, - AuthParams: map[string]string{}, - GlobalHTTPTimeout: 1000 * time.Millisecond, - Namespace: "default", + AuthParams: map[string]string{}, + GlobalHTTPTimeout: 1000 * time.Millisecond, + ScalableObjectNamespace: "default", }, ) if err != nil { diff --git a/pkg/scalers/scaler.go b/pkg/scalers/scaler.go index e5621eb88c9..22fe8eb83f4 100644 --- a/pkg/scalers/scaler.go +++ b/pkg/scalers/scaler.go @@ -22,12 +22,14 @@ import ( "strings" "time" + "github.com/go-logr/logr" metrics "github.com/rcrowley/go-metrics" "k8s.io/api/autoscaling/v2beta2" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/metrics/pkg/apis/external_metrics" + logf "sigs.k8s.io/controller-runtime/pkg/log" kedav1alpha1 "github.com/kedacore/keda/v2/apis/keda/v1alpha1" ) @@ -64,15 +66,18 @@ type PushScaler interface { // ScalerConfig contains config fields common for all scalers type ScalerConfig struct { - // Name used for external scalers - Name string + // ScalableObjectName specifies name of the ScaledObject/ScaledJob that owns this scaler + ScalableObjectName string + + // ScalableObjectNamespace specifies name of the ScaledObject/ScaledJob that owns this scaler + ScalableObjectNamespace string + + // ScalableObjectType specifies whether this Scaler is owned by ScaledObject or ScaledJob + ScalableObjectType string // The timeout to be used on all HTTP requests from the controller GlobalHTTPTimeout time.Duration - // Namespace used for external scalers - Namespace string - // TriggerMetadata TriggerMetadata map[string]string @@ -127,6 +132,10 @@ func RemoveIndexFromMetricName(scalerIndex int, metricName string) (string, erro return metricNameWithoutIndex, nil } +func InitializeLogger(config *ScalerConfig, scalerName string) logr.Logger { + return logf.Log.WithName(scalerName).WithValues("type", config.ScalableObjectType, "namespace", config.ScalableObjectNamespace, "name", config.ScalableObjectName) +} + // GetMetricTargetType helps getting the metric target type of the scaler func GetMetricTargetType(config *ScalerConfig) (v2beta2.MetricTargetType, error) { switch config.MetricType { diff --git a/pkg/scaling/scale_handler.go b/pkg/scaling/scale_handler.go index d505509d189..3035f0bdd00 100644 --- a/pkg/scaling/scale_handler.go +++ b/pkg/scaling/scale_handler.go @@ -306,14 +306,15 @@ func (h *scaleHandler) buildScalers(ctx context.Context, withTriggers *kedav1alp } } config := &scalers.ScalerConfig{ - Name: withTriggers.Name, - Namespace: withTriggers.Namespace, - TriggerMetadata: trigger.Metadata, - ResolvedEnv: resolvedEnv, - AuthParams: make(map[string]string), - GlobalHTTPTimeout: h.globalHTTPTimeout, - ScalerIndex: triggerIndex, - MetricType: trigger.MetricType, + ScalableObjectName: withTriggers.Name, + ScalableObjectNamespace: withTriggers.Namespace, + ScalableObjectType: withTriggers.Kind, + TriggerMetadata: trigger.Metadata, + ResolvedEnv: resolvedEnv, + AuthParams: make(map[string]string), + GlobalHTTPTimeout: h.globalHTTPTimeout, + ScalerIndex: triggerIndex, + MetricType: trigger.MetricType, } config.AuthParams, config.PodIdentity, err = resolver.ResolveAuthRefAndPodIdentity(ctx, h.client, logger, trigger.AuthenticationRef, podTemplateSpec, withTriggers.Namespace)