Skip to content

Commit

Permalink
GCP Pub/Sub Scaler: add logic to accept subscription IDs with project…
Browse files Browse the repository at this point in the history
…ID (#2269)

Signed-off-by: Jose Maria Alvarez <jose-maria.alvarez@leroymerlin.es>

Co-authored-by: Herman <hermanbanken@gmail.com>
  • Loading branch information
jmalvarezf-lmes and hermanbanken authored Nov 23, 2021
1 parent 0e6367b commit 2664389
Show file tree
Hide file tree
Showing 4 changed files with 69 additions and 30 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
- GCP PubSub scaler may be used in SubscriptionSize and OldestUnackedMessageAge modes
- Cleanup metric names inside scalers ([#2260](https://github.com/kedacore/keda/pull/2260))
- Validating values length in prometheus query response ([#2264](https://github.com/kedacore/keda/pull/2264))
- Add possibility to reference a GCP PubSub subscription by full link, including project ID ([#2269](https://github.com/kedacore/keda/pull/2269))
- Add `unsafeSsl` parameter in SeleniumGrid scaler ([#2157](https://github.com/kedacore/keda/pull/2157))
- Improve logs of Azure Pipelines Scaler. ([#2297](https://github.com/kedacore/keda/pull/2297))

Expand Down
20 changes: 18 additions & 2 deletions pkg/scalers/gcp_pub_sub_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ import (
"context"
"errors"
"fmt"
"regexp"
"strconv"
"strings"

"k8s.io/api/autoscaling/v2beta2"
"k8s.io/apimachinery/pkg/api/resource"
Expand All @@ -18,6 +20,7 @@ import (
)

const (
compositeSubscriptionIDPrefix = "projects/[a-z][a-zA-Z0-9-]*[a-zA-Z0-9]/subscriptions/[a-zA-Z][a-zA-Z0-9-_~%\\+\\.]*"
defaultTargetSubscriptionSize = 5
defaultTargetOldestUnackedMessageAge = 10
pubSubStackDriverSubscriptionSizeMetricName = "pubsub.googleapis.com/subscription/num_undelivered_messages"
Expand Down Expand Up @@ -232,10 +235,23 @@ func (s *pubsubScaler) getMetrics(ctx context.Context, metricType string) (int64
return -1, err
}
}
subscriptionID, projectID := getSubscriptionData(s)
filter := `metric.type="` + metricType + `" AND resource.labels.subscription_id="` + subscriptionID + `"`

filter := `metric.type="` + metricType + `" AND resource.labels.subscription_id="` + s.metadata.subscriptionName + `"`
return s.client.GetMetrics(ctx, filter, projectID)
}

return s.client.GetMetrics(ctx, filter)
func getSubscriptionData(s *pubsubScaler) (string, string) {
var subscriptionID string
var projectID string
regexpExpression, _ := regexp.Compile(compositeSubscriptionIDPrefix)
if regexpExpression.MatchString(s.metadata.subscriptionName) {
subscriptionID = strings.Split(s.metadata.subscriptionName, "/")[3]
projectID = strings.Split(s.metadata.subscriptionName, "/")[1]
} else {
subscriptionID = s.metadata.subscriptionName
}
return subscriptionID, projectID
}

func getGcpAuthorization(config *ScalerConfig, resolvedEnv map[string]string) (*gcpAuthorizationMetadata, error) {
Expand Down
33 changes: 32 additions & 1 deletion pkg/scalers/gcp_pubsub_scaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,13 @@ type gcpPubSubMetricIdentifier struct {
name string
}

type gcpPubSubSubscription struct {
metadataTestData *parsePubSubMetadataTestData
scalerIndex int
name string
projectID string
}

var testPubSubMetadata = []parsePubSubMetadataTestData{
{map[string]string{}, map[string]string{}, true},
// all properly formed with deprecated field
Expand All @@ -40,14 +47,23 @@ var testPubSubMetadata = []parsePubSubMetadataTestData{
// Credentials from AuthParams
{map[string]string{"GoogleApplicationCredentials": "Creds", "podIdentityOwner": ""}, map[string]string{"subscriptionName": "mysubscription", "value": "7"}, false},
// Credentials from AuthParams with empty creds
{map[string]string{"GoogleApplicationCredentials": "", "podIdentityOwner": ""}, map[string]string{"subscriptionName": "mysubscription", "value": "7"}, true},
{map[string]string{"GoogleApplicationCredentials": "", "podIdentityOwner": ""}, map[string]string{"subscriptionName": "mysubscription", "subscriptionSize": "7"}, true},
// with full link to subscription
{nil, map[string]string{"subscriptionName": "projects/myproject/subscriptions/mysubscription", "subscriptionSize": "7", "credentialsFromEnv": "SAMPLE_CREDS"}, false},
// with full (bad) link to subscription
{nil, map[string]string{"subscriptionName": "projects/myproject/mysubscription", "subscriptionSize": "7", "credentialsFromEnv": "SAMPLE_CREDS"}, false},
}

var gcpPubSubMetricIdentifiers = []gcpPubSubMetricIdentifier{
{&testPubSubMetadata[1], 0, "s0-gcp-ps-mysubscription"},
{&testPubSubMetadata[1], 1, "s1-gcp-ps-mysubscription"},
}

var gcpSubscriptionNameTests = []gcpPubSubSubscription{
{&testPubSubMetadata[10], 1, "mysubscription", "myproject"},
{&testPubSubMetadata[11], 1, "projects/myproject/mysubscription", ""},
}

func TestPubSubParseMetadata(t *testing.T) {
for _, testData := range testPubSubMetadata {
_, err := parsePubSubMetadata(&ScalerConfig{AuthParams: testData.authParams, TriggerMetadata: testData.metadata, ResolvedEnv: testPubSubResolvedEnv})
Expand Down Expand Up @@ -75,3 +91,18 @@ func TestGcpPubSubGetMetricSpecForScaling(t *testing.T) {
}
}
}

func TestGcpPubSubSubscriptionName(t *testing.T) {
for _, testData := range gcpSubscriptionNameTests {
meta, err := parsePubSubMetadata(&ScalerConfig{TriggerMetadata: testData.metadataTestData.metadata, ResolvedEnv: testPubSubResolvedEnv, ScalerIndex: testData.scalerIndex})
if err != nil {
t.Fatal("Could not parse metadata:", err)
}
mockGcpPubSubScaler := pubsubScaler{nil, meta}
subscriptionID, projectID := getSubscriptionData(&mockGcpPubSubScaler)

if subscriptionID != testData.name || projectID != testData.projectID {
t.Error("Wrong Subscription parsing:", subscriptionID, projectID)
}
}
}
45 changes: 18 additions & 27 deletions pkg/scalers/stackdriver_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,42 +62,33 @@ func NewStackDriverClientPodIdentity(ctx context.Context) (*StackDriverClient, e
}

// GetMetrics fetches metrics from stackdriver for a specific filter for the last minute
func (s StackDriverClient) GetMetrics(ctx context.Context, filter string) (int64, error) {
func (s StackDriverClient) GetMetrics(ctx context.Context, filter string, projectID string) (int64, error) {
// Set the start time to 1 minute ago
startTime := time.Now().UTC().Add(time.Minute * -2)

// Set the end time to now
endTime := time.Now().UTC()

// Create a request with the filter and the GCP project ID
var req *monitoringpb.ListTimeSeriesRequest
if len(s.projectID) > 0 {
req = &monitoringpb.ListTimeSeriesRequest{
Name: "projects/" + s.projectID,
Filter: filter,
Interval: &monitoringpb.TimeInterval{
StartTime: &timestamp.Timestamp{
Seconds: startTime.Unix(),
},
EndTime: &timestamp.Timestamp{
Seconds: endTime.Unix(),
},
},
}
} else {
req = &monitoringpb.ListTimeSeriesRequest{
Name: "projects/" + s.credentials.ProjectID,
Filter: filter,
Interval: &monitoringpb.TimeInterval{
StartTime: &timestamp.Timestamp{
Seconds: startTime.Unix(),
},
EndTime: &timestamp.Timestamp{
Seconds: endTime.Unix(),
},
},
var req = &monitoringpb.ListTimeSeriesRequest{
Filter: filter,
Interval: &monitoringpb.TimeInterval{
StartTime: &timestamp.Timestamp{Seconds: startTime.Unix()},
EndTime: &timestamp.Timestamp{Seconds: endTime.Unix()},
},
}

switch projectID {
case "":
if len(s.projectID) > 0 {
req.Name = "projects/" + s.projectID
} else {
req.Name = "projects/" + s.credentials.ProjectID
}
default:
req.Name = "projects/" + projectID
}

// Get an iterator with the list of time series
it := s.metricsClient.ListTimeSeries(ctx, req)

Expand Down

0 comments on commit 2664389

Please sign in to comment.