Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cherry-pick: fix: Use new SDK with proper errors for service bus with pod identity… #4118

Merged
merged 1 commit into from
Jan 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@ Here is an overview of all new **experimental** features:

### Fixes

- **General**: TODO ([#TODO](https://github.com/kedacore/keda/issues/TODO))
- **General**: Prevent a panic that might occur while refreshing a scaler cache ([#4092](https://github.com/kedacore/keda/issues/4092))
- **Azure Service Bus Scaler:** Use correct auth flows with pod identity ([#4026](https://github.com/kedacore/keda/issues/4026))

### Deprecations

Expand Down
10 changes: 8 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -229,9 +229,15 @@ uninstall: manifests kustomize ## Uninstall CRDs from the K8s cluster specified

deploy: manifests kustomize ## Deploy controller to the K8s cluster specified in ~/.kube/config.
cd config/manager && \
$(KUSTOMIZE) edit set image ghcr.io/kedacore/keda=${IMAGE_CONTROLLER}
$(KUSTOMIZE) edit set image ghcr.io/kedacore/keda=${IMAGE_CONTROLLER} && \
if [ "$(AZURE_RUN_AAD_POD_IDENTITY_TESTS)" = true ]; then \
$(KUSTOMIZE) edit add label --force aadpodidbinding:keda; \
fi
cd config/metrics-server && \
$(KUSTOMIZE) edit set image ghcr.io/kedacore/keda-metrics-apiserver=${IMAGE_ADAPTER}
$(KUSTOMIZE) edit set image ghcr.io/kedacore/keda-metrics-apiserver=${IMAGE_ADAPTER} && \
if [ "$(AZURE_RUN_AAD_POD_IDENTITY_TESTS)" = true ]; then \
$(KUSTOMIZE) edit add label --force aadpodidbinding:keda; \
fi
if [ "$(AZURE_RUN_WORKLOAD_IDENTITY_TESTS)" = true ]; then \
cd config/service_account && \
$(KUSTOMIZE) edit add label --force azure.workload.identity/use:true; \
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ require (
github.com/Azure/azure-kusto-go v0.9.2
github.com/Azure/azure-sdk-for-go v67.1.0+incompatible
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.2.0
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.2.0
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.3.0-beta.2
github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus v1.1.3
github.com/Azure/azure-storage-blob-go v0.15.0
github.com/Azure/azure-storage-queue-go v0.0.0-20191125232315-636801874cdd
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,8 @@ github.com/Azure/azure-sdk-for-go/sdk/azcore v0.21.1/go.mod h1:fBF9PQNqB8scdgpZ3
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.2.0 h1:sVW/AFBTGyJxDaMYlq0ct3jUXTtj12tQ6zE2GZUgVQw=
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.2.0/go.mod h1:uGG2W01BaETf0Ozp+QxxKJdMBNRWPdstHG0Fmdwn1/U=
github.com/Azure/azure-sdk-for-go/sdk/azidentity v0.11.0/go.mod h1:HcM1YX14R7CJcghJGOYCgdezslRSVzqwLf/q+4Y2r/0=
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.2.0 h1:t/W5MYAuQy81cvM8VUNfRLzhtKpXhVUAN7Cd7KVbTyc=
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.2.0/go.mod h1:NBanQUfSWiWn3QEpWDTCU0IjBECKOYvl2R8xdRtMtiM=
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.3.0-beta.2 h1:NsprcuNHEsCR48QYlLxx/gAi9OcCzcwX8VVTZVK8fdQ=
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.3.0-beta.2/go.mod h1:NBanQUfSWiWn3QEpWDTCU0IjBECKOYvl2R8xdRtMtiM=
github.com/Azure/azure-sdk-for-go/sdk/internal v0.7.0/go.mod h1:yqy467j36fJxcRV2TzfVZ1pCb5vxm4BtZPUdYWe/Xo8=
github.com/Azure/azure-sdk-for-go/sdk/internal v0.8.3/go.mod h1:KLF4gFr6DcKFZwSuH8w8yEK6DpFl3LP5rhdvAb7Yz5I=
github.com/Azure/azure-sdk-for-go/sdk/internal v1.0.1 h1:XUNQ4mw+zJmaA2KXzP9JlQiecy1SI+Eog7xVkPiqIbg=
Expand Down
51 changes: 51 additions & 0 deletions pkg/scalers/azure/azure_aad_podidentity.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,13 @@ import (
"io"
"net/http"
"net/url"
"os"
"strconv"
"time"

"github.com/Azure/azure-sdk-for-go/sdk/azcore"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/policy"
"github.com/Azure/azure-sdk-for-go/sdk/azidentity"

"github.com/kedacore/keda/v2/pkg/util"
)
Expand All @@ -17,6 +24,20 @@ const (
MSIURLWithClientID = "http://169.254.169.254/metadata/identity/oauth2/token?api-version=2018-02-01&resource=%s&client_id=%s"
)

var globalHTTPTimeout time.Duration

func init() {
valueStr, found := os.LookupEnv("KEDA_HTTP_DEFAULT_TIMEOUT")
globalHTTPTimeoutMS := 3000
if found && valueStr != "" {
value, err := strconv.Atoi(valueStr)
if err == nil {
globalHTTPTimeoutMS = value
}
}
globalHTTPTimeout = time.Duration(globalHTTPTimeoutMS) * time.Millisecond
}

// GetAzureADPodIdentityToken returns the AADToken for resource
func GetAzureADPodIdentityToken(ctx context.Context, httpClient util.HTTPDoer, identityID, audience string) (AADToken, error) {
var token AADToken
Expand Down Expand Up @@ -54,3 +75,33 @@ func GetAzureADPodIdentityToken(ctx context.Context, httpClient util.HTTPDoer, i

return token, nil
}

type ManagedIdentityWrapper struct {
cred *azidentity.ManagedIdentityCredential
}

func ManagedIdentityWrapperCredential(clientID string) (*ManagedIdentityWrapper, error) {
opts := &azidentity.ManagedIdentityCredentialOptions{}
if clientID != "" {
opts.ID = azidentity.ClientID(clientID)
}

msiCred, err := azidentity.NewManagedIdentityCredential(opts)
if err != nil {
return nil, err
}
return &ManagedIdentityWrapper{
cred: msiCred,
}, nil
}

func (w *ManagedIdentityWrapper) GetToken(ctx context.Context, opts policy.TokenRequestOptions) (azcore.AccessToken, error) {
c, cancel := context.WithTimeout(ctx, globalHTTPTimeout)
defer cancel()
tk, err := w.cred.GetToken(c, opts)
if ctxErr := c.Err(); errors.Is(ctxErr, context.DeadlineExceeded) {
// timeout: signal the chain to try its next credential, if any
err = azidentity.NewCredentialUnavailableError("managed identity timed out")
}
return tk, err
}
69 changes: 21 additions & 48 deletions pkg/scalers/azure/azure_aad_workload_identity.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,7 @@ import (
"time"

amqpAuth "github.com/Azure/azure-amqp-common-go/v3/auth"
"github.com/Azure/azure-sdk-for-go/sdk/azcore"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/policy"
"github.com/Azure/azure-sdk-for-go/sdk/azidentity"
"github.com/Azure/go-autorest/autorest"
"github.com/Azure/go-autorest/autorest/azure/auth"
"github.com/AzureAD/microsoft-authentication-library-for-go/apps/confidential"
Expand All @@ -45,18 +44,26 @@ const (
azureAuthrityHostEnv = "AZURE_AUTHORITY_HOST"
)

var DefaultClientID string
var TenantID string
var TokenFilePath string
var AuthorityHost string

func init() {
DefaultClientID = os.Getenv(azureClientIDEnv)
TenantID = os.Getenv(azureTenantIDEnv)
TokenFilePath = os.Getenv(azureFederatedTokenFileEnv)
AuthorityHost = os.Getenv(azureAuthrityHostEnv)
}

// GetAzureADWorkloadIdentityToken returns the AADToken for resource
func GetAzureADWorkloadIdentityToken(ctx context.Context, identityID, resource string) (AADToken, error) {
clientID := os.Getenv(azureClientIDEnv)
tenantID := os.Getenv(azureTenantIDEnv)
tokenFilePath := os.Getenv(azureFederatedTokenFileEnv)
authorityHost := os.Getenv(azureAuthrityHostEnv)

clientID := DefaultClientID
if identityID != "" {
clientID = identityID
}

signedAssertion, err := readJWTFromFileSystem(tokenFilePath)
signedAssertion, err := readJWTFromFileSystem(TokenFilePath)
if err != nil {
return AADToken{}, fmt.Errorf("error reading service account token - %w", err)
}
Expand All @@ -69,7 +76,7 @@ func GetAzureADWorkloadIdentityToken(ctx context.Context, identityID, resource s
return signedAssertion, nil
})

authorityOption := confidential.WithAuthority(fmt.Sprintf("%s%s/oauth2/token", authorityHost, tenantID))
authorityOption := confidential.WithAuthority(fmt.Sprintf("%s%s/oauth2/token", AuthorityHost, TenantID))
confidentialClient, err := confidential.New(
clientID,
cred,
Expand Down Expand Up @@ -126,46 +133,12 @@ func (aadWiConfig ADWorkloadIdentityConfig) Authorizer() (autorest.Authorizer, e
aadWiConfig.ctx, aadWiConfig.IdentityID, aadWiConfig.Resource)), nil
}

// ADWorkloadIdentityCredential is a type that implements the TokenCredential interface.
// Once azure-sdk-for-go supports Workload Identity we can remove this and use default implementation
// https://github.com/Azure/azure-sdk-for-go/issues/15615
type ADWorkloadIdentityCredential struct {
ctx context.Context
IdentityID string
Resource string
aadToken AADToken
}

func NewADWorkloadIdentityCredential(ctx context.Context, identityID, resource string) *ADWorkloadIdentityCredential {
return &ADWorkloadIdentityCredential{ctx: ctx, IdentityID: identityID, Resource: resource}
}

func (wiCredential *ADWorkloadIdentityCredential) refresh() error {
if time.Now().Before(wiCredential.aadToken.ExpiresOnTimeObject) {
return nil
}

aadToken, err := GetAzureADWorkloadIdentityToken(wiCredential.ctx, wiCredential.IdentityID, wiCredential.Resource)
if err != nil {
return err
}

wiCredential.aadToken = aadToken
return nil
}

// GetToken is for implementing the TokenCredential interface
func (wiCredential *ADWorkloadIdentityCredential) GetToken(ctx context.Context, opts policy.TokenRequestOptions) (azcore.AccessToken, error) {
accessToken := azcore.AccessToken{}
err := wiCredential.refresh()
if err != nil {
return accessToken, err
func NewADWorkloadIdentityCredential(identityID string) (*azidentity.WorkloadIdentityCredential, error) {
clientID := DefaultClientID
if identityID != "" {
clientID = identityID
}

accessToken.Token = wiCredential.aadToken.AccessToken
accessToken.ExpiresOn = wiCredential.aadToken.ExpiresOnTimeObject

return accessToken, nil
return azidentity.NewWorkloadIdentityCredential(TenantID, clientID, TokenFilePath, &azidentity.WorkloadIdentityCredentialOptions{})
}

// ADWorkloadIdentityTokenProvider is a type that implements the adal.OAuthTokenProvider and adal.Refresher interfaces.
Expand Down
35 changes: 35 additions & 0 deletions pkg/scalers/azure/azure_azidentity_chain.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package azure

import (
"os"

"github.com/Azure/azure-sdk-for-go/sdk/azcore"
"github.com/Azure/azure-sdk-for-go/sdk/azidentity"
)

func NewChainedCredential(identityID string) (*azidentity.ChainedTokenCredential, error) {
var creds []azcore.TokenCredential

// Used for local debug based on az-cli user
// As production images don't have shell, we can't register this provider always
if _, err := os.Stat("/bin/sh"); err == nil {
cliCred, err := azidentity.NewAzureCLICredential(&azidentity.AzureCLICredentialOptions{})
if err == nil {
creds = append(creds, cliCred)
}
}

// Used for aad-pod-identity
msiCred, err := ManagedIdentityWrapperCredential(identityID)
if err == nil {
creds = append(creds, msiCred)
}

wiCred, err := NewADWorkloadIdentityCredential(identityID)
if err == nil {
creds = append(creds, wiCred)
}

// Create the chained credential based on the previous 3
return azidentity.NewChainedTokenCredential(creds, nil)
}
52 changes: 6 additions & 46 deletions pkg/scalers/azure_servicebus_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@ import (
"regexp"
"strconv"

"github.com/Azure/azure-sdk-for-go/sdk/azcore"
"github.com/Azure/azure-sdk-for-go/sdk/azidentity"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/admin"
az "github.com/Azure/go-autorest/autorest/azure"
"github.com/go-logr/logr"
Expand All @@ -44,8 +42,6 @@ const (
messageCountMetricName = "messageCount"
activationMessageCountMetricName = "activationMessageCount"
defaultTargetMessageCount = 5
// Service bus resource id is "https://servicebus.azure.net/" in all cloud environments
serviceBusResource = "https://servicebus.azure.net/"
)

type azureServiceBusScaler struct {
Expand Down Expand Up @@ -275,7 +271,7 @@ func (s *azureServiceBusScaler) GetMetricsAndActivity(ctx context.Context, metri
// Returns the length of the queue or subscription
func (s *azureServiceBusScaler) getAzureServiceBusLength(ctx context.Context) (int64, error) {
// get adminClient
adminClient, err := s.getServiceBusAdminClient(ctx)
adminClient, err := s.getServiceBusAdminClient()
if err != nil {
return -1, err
}
Expand All @@ -291,58 +287,22 @@ func (s *azureServiceBusScaler) getAzureServiceBusLength(ctx context.Context) (i
}

// Returns service bus namespace object
func (s *azureServiceBusScaler) getServiceBusAdminClient(ctx context.Context) (*admin.Client, error) {
func (s *azureServiceBusScaler) getServiceBusAdminClient() (*admin.Client, error) {
if s.client != nil {
return s.client, nil
}

var adminClient *admin.Client
var err error

switch s.podIdentity.Provider {
case "", kedav1alpha1.PodIdentityProviderNone:
adminClient, err = admin.NewClientFromConnectionString(s.metadata.connection, nil)
if err != nil {
return nil, err
}

return admin.NewClientFromConnectionString(s.metadata.connection, nil)
case kedav1alpha1.PodIdentityProviderAzure, kedav1alpha1.PodIdentityProviderAzureWorkload:
var creds []azcore.TokenCredential
options := &azidentity.DefaultAzureCredentialOptions{}

// Used for local debug based on az-cli user
cliCred, err := azidentity.NewAzureCLICredential(&azidentity.AzureCLICredentialOptions{TenantID: options.TenantID})
if err == nil {
creds = append(creds, cliCred)
}

// Once azure-sdk-for-go supports Workload Identity we can remove this and use default implementation
// https://github.com/Azure/azure-sdk-for-go/issues/15615
wiCred := azure.NewADWorkloadIdentityCredential(ctx, s.podIdentity.IdentityID, serviceBusResource)
creds = append(creds, wiCred)

// Used for aad-pod-identity
o := &azidentity.ManagedIdentityCredentialOptions{ClientOptions: options.ClientOptions}
if s.podIdentity.IdentityID != "" {
o.ID = azidentity.ClientID(s.podIdentity.IdentityID)
}
msiCred, err := azidentity.NewManagedIdentityCredential(o)
if err == nil {
creds = append(creds, msiCred)
}

// Create the chained credential based on the previous 3
chain, err := azidentity.NewChainedTokenCredential(creds, nil)
if err != nil {
return nil, err
}
adminClient, err = admin.NewClient(s.metadata.fullyQualifiedNamespace, chain, nil)
creds, err := azure.NewChainedCredential(s.podIdentity.IdentityID)
if err != nil {
return nil, err
}
return admin.NewClient(s.metadata.fullyQualifiedNamespace, creds, nil)
}

return adminClient, nil
return nil, fmt.Errorf("incorrect podIdentity type")
}

func getQueueLength(ctx context.Context, adminClient *admin.Client, meta *azureServiceBusMetadata) (int64, error) {
Expand Down
4 changes: 4 additions & 0 deletions tests/helper/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
)

const (
AzureAdPodIdentityNamespace = "azure-ad-identity-system"
AzureWorkloadIdentityNamespace = "azure-workload-identity-system"
AwsIdentityNamespace = "aws-identity-system"
GcpIdentityNamespace = "gcp-identity-system"
Expand All @@ -53,7 +54,10 @@ var random = rand.New(rand.NewSource(time.Now().UnixNano()))

// Env variables required for setup and cleanup.
var (
AzureADMsiID = os.Getenv("TF_AZURE_IDENTITY_1_APP_FULL_ID")
AzureADMsiClientID = os.Getenv("TF_AZURE_IDENTITY_1_APP_ID")
AzureADTenantID = os.Getenv("TF_AZURE_SP_TENANT")
AzureRunAadPodIdentityTests = os.Getenv("AZURE_RUN_AAD_POD_IDENTITY_TESTS")
AzureRunWorkloadIdentityTests = os.Getenv("AZURE_RUN_WORKLOAD_IDENTITY_TESTS")
AwsIdentityTests = os.Getenv("AWS_RUN_IDENTITY_TESTS")
GcpIdentityTests = os.Getenv("GCP_RUN_IDENTITY_TESTS")
Expand Down
Loading