Skip to content

Commit

Permalink
Reference ScaledObject's/ScaledJob's name in the scalers log
Browse files Browse the repository at this point in the history
Signed-off-by: Zbynek Roubalik <zroubalik@gmail.com>
  • Loading branch information
zroubalik committed Aug 3, 2022
1 parent 81ae8e9 commit 015bcab
Show file tree
Hide file tree
Showing 13 changed files with 113 additions and 95 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ To learn more about our roadmap, we recommend reading [this document](ROADMAP.md
### Improvements

- **General:** `external` extension reduces connection establishment with long links ([#3193](https://github.com/kedacore/keda/issues/3193))
- **General:** Reference ScaledObject's/ScaledJob's name in the scalers log ([3419](https://github.com/kedacore/keda/issues/3419))
- **General:** Use `mili` scale for the returned metrics ([#3135](https://github.com/kedacore/keda/issue/3135))
- **General:** Use more readable timestamps in KEDA Operator logs ([#3066](https://github.com/kedacore/keda/issue/3066))
- **AWS SQS Queue Scaler:** Support for scaling to include in-flight messages. ([#3133](https://github.com/kedacore/keda/issues/3133))
Expand Down
30 changes: 15 additions & 15 deletions controllers/keda/scaledobject_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,11 +91,11 @@ var _ = Describe("ScaledObjectController", func() {

for i, tm := range triggerMeta {
config := &scalers.ScalerConfig{
Name: fmt.Sprintf("test.%d", i),
Namespace: "test",
TriggerMetadata: tm,
ResolvedEnv: nil,
AuthParams: nil,
ScalableObjectName: fmt.Sprintf("test.%d", i),
ScalableObjectNamespace: "test",
TriggerMetadata: tm,
ResolvedEnv: nil,
AuthParams: nil,
}

s, err := scalers.NewPrometheusScaler(config)
Expand Down Expand Up @@ -141,11 +141,11 @@ var _ = Describe("ScaledObjectController", func() {
expectedExternalMetricNames := make([]string, 0)

config := &scalers.ScalerConfig{
Name: "test",
Namespace: "test",
TriggerMetadata: triggerMeta[0],
ResolvedEnv: nil,
AuthParams: nil,
ScalableObjectName: "test",
ScalableObjectNamespace: "test",
TriggerMetadata: triggerMeta[0],
ResolvedEnv: nil,
AuthParams: nil,
}

s, err := scalers.NewPrometheusScaler(config)
Expand Down Expand Up @@ -191,11 +191,11 @@ var _ = Describe("ScaledObjectController", func() {
testScalers := make([]cache.ScalerBuilder, 0)
for i := 0; i < 4; i++ {
config := &scalers.ScalerConfig{
Name: fmt.Sprintf("test.%d", i),
Namespace: "test",
TriggerMetadata: triggerMeta[0],
ResolvedEnv: nil,
AuthParams: nil,
ScalableObjectName: fmt.Sprintf("test.%d", i),
ScalableObjectNamespace: "test",
TriggerMetadata: triggerMeta[0],
ResolvedEnv: nil,
AuthParams: nil,
}

s, err := scalers.NewPrometheusScaler(config)
Expand Down
10 changes: 5 additions & 5 deletions pkg/scalers/activemq_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,10 @@ import (
"strings"
"text/template"

"github.com/go-logr/logr"
v2beta2 "k8s.io/api/autoscaling/v2beta2"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/metrics/pkg/apis/external_metrics"
logf "sigs.k8s.io/controller-runtime/pkg/log"

kedautil "github.com/kedacore/keda/v2/pkg/util"
)
Expand All @@ -24,6 +24,7 @@ type activeMQScaler struct {
metricType v2beta2.MetricTargetType
metadata *activeMQMetadata
httpClient *http.Client
logger logr.Logger
}

type activeMQMetadata struct {
Expand Down Expand Up @@ -52,8 +53,6 @@ const (
defaultActiveMQRestAPITemplate = "http://{{.ManagementEndpoint}}/api/jolokia/read/org.apache.activemq:type=Broker,brokerName={{.BrokerName}},destinationType=Queue,destinationName={{.DestinationName}}/QueueSize"
)

var activeMQLog = logf.Log.WithName("activeMQ_scaler")

// NewActiveMQScaler creates a new activeMQ Scaler
func NewActiveMQScaler(config *ScalerConfig) (Scaler, error) {
metricType, err := GetMetricTargetType(config)
Expand All @@ -71,6 +70,7 @@ func NewActiveMQScaler(config *ScalerConfig) (Scaler, error) {
metricType: metricType,
metadata: meta,
httpClient: httpClient,
logger: InitializeLogger(config, "active_mq_scaler"),
}, nil
}

Expand Down Expand Up @@ -170,7 +170,7 @@ func parseActiveMQMetadata(config *ScalerConfig) (*activeMQMetadata, error) {
func (s *activeMQScaler) IsActive(ctx context.Context) (bool, error) {
queueSize, err := s.getQueueMessageCount(ctx)
if err != nil {
activeMQLog.Error(err, "Unable to access activeMQ management endpoint", "managementEndpoint", s.metadata.managementEndpoint)
s.logger.Error(err, "Unable to access activeMQ management endpoint", "managementEndpoint", s.metadata.managementEndpoint)
return false, err
}

Expand Down Expand Up @@ -260,7 +260,7 @@ func (s *activeMQScaler) getQueueMessageCount(ctx context.Context) (int64, error
return -1, fmt.Errorf("ActiveMQ management endpoint response error code : %d %d", resp.StatusCode, monitoringInfo.Status)
}

activeMQLog.V(1).Info(fmt.Sprintf("ActiveMQ scaler: Providing metrics based on current queue size %d queue size limit %d", queueMessageCount, s.metadata.targetQueueSize))
s.logger.V(1).Info(fmt.Sprintf("ActiveMQ scaler: Providing metrics based on current queue size %d queue size limit %d", queueMessageCount, s.metadata.targetQueueSize))

return queueMessageCount, nil
}
Expand Down
12 changes: 6 additions & 6 deletions pkg/scalers/artemis_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@ import (
"strconv"
"strings"

"github.com/go-logr/logr"
v2beta2 "k8s.io/api/autoscaling/v2beta2"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/metrics/pkg/apis/external_metrics"
logf "sigs.k8s.io/controller-runtime/pkg/log"

kedautil "github.com/kedacore/keda/v2/pkg/util"
)
Expand All @@ -22,6 +22,7 @@ type artemisScaler struct {
metricType v2beta2.MetricTargetType
metadata *artemisMetadata
httpClient *http.Client
logger logr.Logger
}

//revive:disable:var-naming breaking change on restApiTemplate, wouldn't bring any benefit to users
Expand Down Expand Up @@ -55,8 +56,6 @@ const (
defaultCorsHeader = "http://%s"
)

var artemisLog = logf.Log.WithName("artemis_queue_scaler")

// NewArtemisQueueScaler creates a new artemis queue Scaler
func NewArtemisQueueScaler(config *ScalerConfig) (Scaler, error) {
// do we need to guarantee this timeout for a specific
Expand All @@ -78,6 +77,7 @@ func NewArtemisQueueScaler(config *ScalerConfig) (Scaler, error) {
metricType: metricType,
metadata: artemisMetadata,
httpClient: httpClient,
logger: InitializeLogger(config, "artemis_queue_scaler"),
}, nil
}

Expand Down Expand Up @@ -181,7 +181,7 @@ func parseArtemisMetadata(config *ScalerConfig) (*artemisMetadata, error) {
func (s *artemisScaler) IsActive(ctx context.Context) (bool, error) {
messages, err := s.getQueueMessageCount(ctx)
if err != nil {
artemisLog.Error(err, "Unable to access the artemis management endpoint", "managementEndpoint", s.metadata.managementEndpoint)
s.logger.Error(err, "Unable to access the artemis management endpoint", "managementEndpoint", s.metadata.managementEndpoint)
return false, err
}

Expand Down Expand Up @@ -262,7 +262,7 @@ func (s *artemisScaler) getQueueMessageCount(ctx context.Context) (int64, error)
return -1, fmt.Errorf("artemis management endpoint response error code : %d %d", resp.StatusCode, monitoringInfo.Status)
}

artemisLog.V(1).Info(fmt.Sprintf("Artemis scaler: Providing metrics based on current queue length %d queue length limit %d", messageCount, s.metadata.queueLength))
s.logger.V(1).Info(fmt.Sprintf("Artemis scaler: Providing metrics based on current queue length %d queue length limit %d", messageCount, s.metadata.queueLength))

return messageCount, nil
}
Expand All @@ -283,7 +283,7 @@ func (s *artemisScaler) GetMetrics(ctx context.Context, metricName string, metri
messages, err := s.getQueueMessageCount(ctx)

if err != nil {
artemisLog.Error(err, "Unable to access the artemis management endpoint", "managementEndpoint", s.metadata.managementEndpoint)
s.logger.Error(err, "Unable to access the artemis management endpoint", "managementEndpoint", s.metadata.managementEndpoint)
return []external_metrics.ExternalMetricValue{}, err
}

Expand Down
22 changes: 11 additions & 11 deletions pkg/scalers/azure_data_explorer_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@ import (
"strconv"

"github.com/Azure/azure-kusto-go/kusto"
"github.com/go-logr/logr"
"k8s.io/api/autoscaling/v2beta2"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/metrics/pkg/apis/external_metrics"
logf "sigs.k8s.io/controller-runtime/pkg/log"

kedav1alpha1 "github.com/kedacore/keda/v2/apis/keda/v1alpha1"
"github.com/kedacore/keda/v2/pkg/scalers/azure"
Expand All @@ -42,15 +42,15 @@ type azureDataExplorerScaler struct {

const adxName = "azure-data-explorer"

var dataExplorerLogger = logf.Log.WithName("azure_data_explorer_scaler")

func NewAzureDataExplorerScaler(ctx context.Context, config *ScalerConfig) (Scaler, error) {
metricType, err := GetMetricTargetType(config)
if err != nil {
return nil, fmt.Errorf("error getting scaler metric type: %s", err)
}

metadata, err := parseAzureDataExplorerMetadata(config)
logger := InitializeLogger(config, "azure_data_explorer_scaler")

metadata, err := parseAzureDataExplorerMetadata(config, logger)
if err != nil {
return nil, fmt.Errorf("failed to parse azure data explorer metadata: %s", err)
}
Expand All @@ -64,13 +64,13 @@ func NewAzureDataExplorerScaler(ctx context.Context, config *ScalerConfig) (Scal
metricType: metricType,
metadata: metadata,
client: client,
name: config.Name,
namespace: config.Namespace,
name: config.ScalableObjectName,
namespace: config.ScalableObjectNamespace,
}, nil
}

func parseAzureDataExplorerMetadata(config *ScalerConfig) (*azure.DataExplorerMetadata, error) {
metadata, err := parseAzureDataExplorerAuthParams(config)
func parseAzureDataExplorerMetadata(config *ScalerConfig, logger logr.Logger) (*azure.DataExplorerMetadata, error) {
metadata, err := parseAzureDataExplorerAuthParams(config, logger)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -124,7 +124,7 @@ func parseAzureDataExplorerMetadata(config *ScalerConfig) (*azure.DataExplorerMe
}
metadata.ActiveDirectoryEndpoint = activeDirectoryEndpoint

dataExplorerLogger.V(1).Info("Parsed azureDataExplorerMetadata",
logger.V(1).Info("Parsed azureDataExplorerMetadata",
"database", metadata.DatabaseName,
"endpoint", metadata.Endpoint,
"metricName", metadata.MetricName,
Expand All @@ -136,14 +136,14 @@ func parseAzureDataExplorerMetadata(config *ScalerConfig) (*azure.DataExplorerMe
return metadata, nil
}

func parseAzureDataExplorerAuthParams(config *ScalerConfig) (*azure.DataExplorerMetadata, error) {
func parseAzureDataExplorerAuthParams(config *ScalerConfig, logger logr.Logger) (*azure.DataExplorerMetadata, error) {
metadata := azure.DataExplorerMetadata{}

switch config.PodIdentity.Provider {
case kedav1alpha1.PodIdentityProviderAzure, kedav1alpha1.PodIdentityProviderAzureWorkload:
metadata.PodIdentity = config.PodIdentity
case "", kedav1alpha1.PodIdentityProviderNone:
dataExplorerLogger.V(1).Info("Pod Identity is not provided. Trying to resolve clientId, clientSecret and tenantId.")
logger.V(1).Info("Pod Identity is not provided. Trying to resolve clientId, clientSecret and tenantId.")

tenantID, err := getParameterFromConfig(config, "tenantId", true)
if err != nil {
Expand Down
12 changes: 8 additions & 4 deletions pkg/scalers/azure_data_explorer_scaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
"fmt"
"testing"

"github.com/go-logr/logr"

kedav1alpha1 "github.com/kedacore/keda/v2/apis/keda/v1alpha1"
kedautil "github.com/kedacore/keda/v2/pkg/util"
)
Expand Down Expand Up @@ -111,7 +113,8 @@ func TestDataExplorerParseMetadata(t *testing.T) {
ResolvedEnv: dataExplorerResolvedEnv,
TriggerMetadata: testData.metadata,
AuthParams: map[string]string{},
PodIdentity: kedav1alpha1.AuthPodIdentity{}})
PodIdentity: kedav1alpha1.AuthPodIdentity{}},
logr.Discard())

if err != nil && !testData.isError {
t.Error("Expected success but got error", err)
Expand All @@ -128,7 +131,7 @@ func TestDataExplorerParseMetadata(t *testing.T) {
ResolvedEnv: dataExplorerResolvedEnv,
TriggerMetadata: testData.metadata,
AuthParams: map[string]string{},
PodIdentity: kedav1alpha1.AuthPodIdentity{Provider: kedav1alpha1.PodIdentityProviderAzure}})
PodIdentity: kedav1alpha1.AuthPodIdentity{Provider: kedav1alpha1.PodIdentityProviderAzure}}, logr.Discard())

if err != nil && !testData.isError {
t.Error("Expected success but got error", err)
Expand All @@ -145,7 +148,7 @@ func TestDataExplorerParseMetadata(t *testing.T) {
ResolvedEnv: dataExplorerResolvedEnv,
TriggerMetadata: testData.metadata,
AuthParams: map[string]string{},
PodIdentity: kedav1alpha1.AuthPodIdentity{Provider: kedav1alpha1.PodIdentityProviderAzureWorkload}})
PodIdentity: kedav1alpha1.AuthPodIdentity{Provider: kedav1alpha1.PodIdentityProviderAzureWorkload}}, logr.Discard())

if err != nil && !testData.isError {
t.Error("Expected success but got error", err)
Expand All @@ -164,7 +167,8 @@ func TestDataExplorerGetMetricSpecForScaling(t *testing.T) {
TriggerMetadata: testData.metadataTestData.metadata,
AuthParams: map[string]string{},
PodIdentity: kedav1alpha1.AuthPodIdentity{},
ScalerIndex: testData.scalerIndex})
ScalerIndex: testData.scalerIndex},
logr.Discard())
if err != nil {
t.Error("Failed to parse metadata:", err)
}
Expand Down
Loading

0 comments on commit 015bcab

Please sign in to comment.