Skip to content

Commit

Permalink
General: Bubble up AuthRef TriggerAuthentication errors as ScaledObje…
Browse files Browse the repository at this point in the history
…ct events (kedacore#5219)

Signed-off-by: Bojan Zelic <bnzelic@gmail.com>
Signed-off-by: Zbynek Roubalik <zroubalik@gmail.com>
Co-authored-by: Zbynek Roubalik <zroubalik@gmail.com>
  • Loading branch information
BojanZelic and zroubalik authored Dec 21, 2023
1 parent 4f2ea09 commit 956ddd1
Show file tree
Hide file tree
Showing 4 changed files with 210 additions and 29 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ Here is an overview of all new **experimental** features:

- **General**: Add parameter queryParameters to prometheus-scaler ([#4962](https://github.com/kedacore/keda/issues/4962))
- **General**: Add validations for replica counts when creating ScaledObjects ([#5288](https://github.com/kedacore/keda/issues/5288))
- **General**: Bubble up AuthRef TriggerAuthentication errors as ScaledObject events ([#5190](https://github.com/kedacore/keda/issues/5190))
- **General**: Support TriggerAuthentication properties from ConfigMap ([#4830](https://github.com/kedacore/keda/issues/4830))
- **General**: Use client-side round-robin load balancing for grpc calls ([#5224](https://github.com/kedacore/keda/issues/5224))
- **GCP pubsub scaler**: Support distribution-valued metrics and metrics from topics ([#5070](https://github.com/kedacore/keda/issues/5070))
Expand Down
65 changes: 37 additions & 28 deletions pkg/scaling/resolver/scale_resolvers.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,11 @@ func ResolveAuthRefAndPodIdentity(ctx context.Context, client client.Client, log
triggerAuthRef *kedav1alpha1.AuthenticationRef, podTemplateSpec *corev1.PodTemplateSpec,
namespace string, secretsLister corev1listers.SecretLister) (map[string]string, kedav1alpha1.AuthPodIdentity, error) {
if podTemplateSpec != nil {
authParams, podIdentity := resolveAuthRef(ctx, client, logger, triggerAuthRef, &podTemplateSpec.Spec, namespace, secretsLister)
authParams, podIdentity, err := resolveAuthRef(ctx, client, logger, triggerAuthRef, &podTemplateSpec.Spec, namespace, secretsLister)

if err != nil {
return authParams, podIdentity, err
}

switch podIdentity.Provider {
case kedav1alpha1.PodIdentityProviderAwsEKS:
Expand All @@ -189,7 +193,7 @@ func ResolveAuthRefAndPodIdentity(ctx context.Context, client client.Client, log
serviceAccountName = podTemplateSpec.Spec.ServiceAccountName
}
serviceAccount := &corev1.ServiceAccount{}
err := client.Get(ctx, types.NamespacedName{Name: serviceAccountName, Namespace: namespace}, serviceAccount)
err = client.Get(ctx, types.NamespacedName{Name: serviceAccountName, Namespace: namespace}, serviceAccount)
if err != nil {
return nil, kedav1alpha1.AuthPodIdentity{Provider: kedav1alpha1.PodIdentityProviderNone},
fmt.Errorf("error getting service account: '%s', error: %w", serviceAccountName, err)
Expand All @@ -210,17 +214,18 @@ func ResolveAuthRefAndPodIdentity(ctx context.Context, client client.Client, log
return authParams, podIdentity, nil
}

authParams, _ := resolveAuthRef(ctx, client, logger, triggerAuthRef, nil, namespace, secretsLister)
return authParams, kedav1alpha1.AuthPodIdentity{Provider: kedav1alpha1.PodIdentityProviderNone}, nil
authParams, _, err := resolveAuthRef(ctx, client, logger, triggerAuthRef, nil, namespace, secretsLister)
return authParams, kedav1alpha1.AuthPodIdentity{Provider: kedav1alpha1.PodIdentityProviderNone}, err
}

// resolveAuthRef provides authentication parameters needed authenticate scaler with the environment.
// based on authentication method defined in TriggerAuthentication, authParams and podIdentity is returned
func resolveAuthRef(ctx context.Context, client client.Client, logger logr.Logger,
triggerAuthRef *kedav1alpha1.AuthenticationRef, podSpec *corev1.PodSpec,
namespace string, secretsLister corev1listers.SecretLister) (map[string]string, kedav1alpha1.AuthPodIdentity) {
namespace string, secretsLister corev1listers.SecretLister) (map[string]string, kedav1alpha1.AuthPodIdentity, error) {
result := make(map[string]string)
var podIdentity kedav1alpha1.AuthPodIdentity
var err error

if namespace != "" && triggerAuthRef != nil && triggerAuthRef.Name != "" {
triggerAuthSpec, triggerNamespace, err := getTriggerAuthSpec(ctx, client, triggerAuthRef, namespace)
Expand Down Expand Up @@ -257,43 +262,47 @@ func resolveAuthRef(ctx context.Context, client client.Client, logger logr.Logge
if triggerAuthSpec.HashiCorpVault != nil && len(triggerAuthSpec.HashiCorpVault.Secrets) > 0 {
vault := NewHashicorpVaultHandler(triggerAuthSpec.HashiCorpVault)
err := vault.Initialize(logger)
defer vault.Stop()
if err != nil {
logger.Error(err, "error authenticate to Vault", "triggerAuthRef.Name", triggerAuthRef.Name)
} else {
secrets, err := vault.ResolveSecrets(triggerAuthSpec.HashiCorpVault.Secrets)
if err != nil {
logger.Error(err, "could not get secrets from vault",
"triggerAuthRef.Name", triggerAuthRef.Name,
)
} else {
for _, e := range secrets {
result[e.Parameter] = e.Value
}
}
vault.Stop()
logger.Error(err, "error authenticating to Vault", "triggerAuthRef.Name", triggerAuthRef.Name)
return result, podIdentity, err
}

secrets, err := vault.ResolveSecrets(triggerAuthSpec.HashiCorpVault.Secrets)
if err != nil {
logger.Error(err, "could not get secrets from vault",
"triggerAuthRef.Name", triggerAuthRef.Name,
)
return result, podIdentity, err
}

for _, e := range secrets {
result[e.Parameter] = e.Value
}
}
if triggerAuthSpec.AzureKeyVault != nil && len(triggerAuthSpec.AzureKeyVault.Secrets) > 0 {
vaultHandler := NewAzureKeyVaultHandler(triggerAuthSpec.AzureKeyVault)
err := vaultHandler.Initialize(ctx, client, logger, triggerNamespace, secretsLister)
if err != nil {
logger.Error(err, "error authenticating to Azure Key Vault", "triggerAuthRef.Name", triggerAuthRef.Name)
} else {
for _, secret := range triggerAuthSpec.AzureKeyVault.Secrets {
res, err := vaultHandler.Read(ctx, secret.Name, secret.Version)
if err != nil {
logger.Error(err, "error trying to read secret from Azure Key Vault", "triggerAuthRef.Name", triggerAuthRef.Name,
"secret.Name", secret.Name, "secret.Version", secret.Version)
} else {
result[secret.Parameter] = res
}
return result, podIdentity, err
}

for _, secret := range triggerAuthSpec.AzureKeyVault.Secrets {
res, err := vaultHandler.Read(ctx, secret.Name, secret.Version)
if err != nil {
logger.Error(err, "error trying to read secret from Azure Key Vault", "triggerAuthRef.Name", triggerAuthRef.Name,
"secret.Name", secret.Name, "secret.Version", secret.Version)
return result, podIdentity, err
}

result[secret.Parameter] = res
}
}
}
}

return result, podIdentity
return result, podIdentity, err
}

func getTriggerAuthSpec(ctx context.Context, client client.Client, triggerAuthRef *kedav1alpha1.AuthenticationRef, namespace string) (*kedav1alpha1.TriggerAuthenticationSpec, string, error) {
Expand Down
51 changes: 50 additions & 1 deletion pkg/scaling/resolver/scale_resolvers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,8 @@ func TestResolveAuthRef(t *testing.T) {
podSpec *corev1.PodSpec
expected map[string]string
expectedPodIdentity kedav1alpha1.AuthPodIdentity
isError bool
comment string
}{
{
name: "foo",
Expand Down Expand Up @@ -323,6 +325,44 @@ func TestResolveAuthRef(t *testing.T) {
expected: map[string]string{"host": secretData},
expectedPodIdentity: kedav1alpha1.AuthPodIdentity{Provider: kedav1alpha1.PodIdentityProviderNone},
},
{
name: "triggerauth exists but hashicorp vault can't resolve",
existing: []runtime.Object{
&kedav1alpha1.TriggerAuthentication{
ObjectMeta: metav1.ObjectMeta{
Namespace: namespace,
Name: triggerAuthenticationName,
},
Spec: kedav1alpha1.TriggerAuthenticationSpec{
HashiCorpVault: &kedav1alpha1.HashiCorpVault{
Address: "invalid-vault-address",
Authentication: "token",
Credential: &kedav1alpha1.Credential{
Token: "my-token",
},
Mount: "kubernetes",
Role: "my-role",
Secrets: []kedav1alpha1.VaultSecret{
{
Key: "password",
Parameter: "password",
Path: "secret_v2/data/my-password-path",
},
{
Key: "username",
Parameter: "username",
Path: "secret_v2/data/my-username-path",
},
},
},
},
},
},
isError: true,
comment: "\"my-vault-address-doesnt-exist/v1/auth/token/lookup-self\": unsupported protocol scheme \"\"",
soar: &kedav1alpha1.AuthenticationRef{Name: triggerAuthenticationName},
expected: map[string]string{},
},
{
name: "triggerauth exists and config map",
existing: []runtime.Object{
Expand Down Expand Up @@ -532,14 +572,23 @@ func TestResolveAuthRef(t *testing.T) {
t.Run(test.name, func(t *testing.T) {
ctx := context.Background()
os.Setenv("KEDA_CLUSTER_OBJECT_NAMESPACE", clusterNamespace) // Inject test cluster namespace.
gotMap, gotPodIdentity := resolveAuthRef(
gotMap, gotPodIdentity, err := resolveAuthRef(
ctx,
fake.NewClientBuilder().WithScheme(scheme.Scheme).WithRuntimeObjects(test.existing...).Build(),
logf.Log.WithName("test"),
test.soar,
test.podSpec,
namespace,
secretsLister)

if err != nil && !test.isError {
t.Errorf("Expected success because %s got error, %s", test.comment, err)
}

if test.isError && err == nil {
t.Errorf("Expected error because %s but got success, %#v", test.comment, test)
}

if diff := cmp.Diff(gotMap, test.expected); diff != "" {
t.Errorf("Returned authParams are different: %s", diff)
}
Expand Down
122 changes: 122 additions & 0 deletions pkg/scaling/scale_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,13 @@ import (
"github.com/antonmedv/expr"
"github.com/golang/mock/gomock"
"github.com/stretchr/testify/assert"
appsv1 "k8s.io/api/apps/v1"
v2 "k8s.io/api/autoscaling/v2"
batchv1 "k8s.io/api/batch/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/record"
"k8s.io/metrics/pkg/apis/external_metrics"

Expand Down Expand Up @@ -290,6 +292,126 @@ func TestCheckScaledObjectScalersWithError(t *testing.T) {
assert.Equal(t, true, isError)
}

func TestCheckScaledObjectScalersWithTriggerAuthError(t *testing.T) {
ctrl := gomock.NewController(t)
mockClient := mock_client.NewMockClient(ctrl)
mockExecutor := mock_executor.NewMockScaleExecutor(ctrl)
recorder := record.NewFakeRecorder(1)

scaler := mock_scalers.NewMockScaler(ctrl)
scaler.EXPECT().Close(gomock.Any())

factory := func() (scalers.Scaler, *scalers.ScalerConfig, error) {
scaler := mock_scalers.NewMockScaler(ctrl)
scaler.EXPECT().GetMetricsAndActivity(gomock.Any(), gomock.Any()).Return([]external_metrics.ExternalMetricValue{}, false, errors.New("some error"))
scaler.EXPECT().Close(gomock.Any())
return scaler, &scalers.ScalerConfig{}, nil
}

deployment := appsv1.Deployment{
ObjectMeta: metav1.ObjectMeta{
Name: "deployment-test",
Namespace: "test",
},
Spec: appsv1.DeploymentSpec{
Template: v1.PodTemplateSpec{
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Name: "container",
},
},
},
},
},
}

scaledObject := kedav1alpha1.ScaledObject{
ObjectMeta: metav1.ObjectMeta{
Name: "scaledobject-test",
Namespace: "test",
},
Spec: kedav1alpha1.ScaledObjectSpec{
ScaleTargetRef: &kedav1alpha1.ScaleTarget{
Name: deployment.Name,
},
Triggers: []kedav1alpha1.ScaleTriggers{
{
Name: triggerName1,
Type: "fake_trig1",
AuthenticationRef: &kedav1alpha1.AuthenticationRef{
Name: "triggerauth-test",
},
},
},
},
Status: kedav1alpha1.ScaledObjectStatus{
ScaleTargetGVKR: &kedav1alpha1.GroupVersionKindResource{
Group: "apps",
Kind: "Deployment",
},
ExternalMetricNames: []string{metricName1, metricName2},
},
}

triggerAuth := kedav1alpha1.TriggerAuthentication{
ObjectMeta: metav1.ObjectMeta{
Name: "triggerauth-test",
Namespace: "test",
},
Spec: kedav1alpha1.TriggerAuthenticationSpec{
HashiCorpVault: &kedav1alpha1.HashiCorpVault{
Address: "invalid-vault-address",
Authentication: "token",
Credential: &kedav1alpha1.Credential{
Token: "my-token",
},
Mount: "kubernetes",
Role: "my-role",
Secrets: []kedav1alpha1.VaultSecret{
{
Parameter: "username",
Key: "username",
Path: "secret_v2/data/my-username-path",
},
},
},
},
}

scalerCache := cache.ScalersCache{
Scalers: []cache.ScalerBuilder{{
Scaler: scaler,
Factory: factory,
}},
Recorder: recorder,
}

mockClient.EXPECT().Get(gomock.Any(), types.NamespacedName{Name: deployment.Name, Namespace: deployment.Namespace}, gomock.Any()).SetArg(2, deployment)
mockClient.EXPECT().Get(gomock.Any(), types.NamespacedName{Name: triggerAuth.Name, Namespace: triggerAuth.Namespace}, gomock.Any()).SetArg(2, triggerAuth)

sh := scaleHandler{
client: mockClient,
scaleLoopContexts: &sync.Map{},
scaleExecutor: mockExecutor,
globalHTTPTimeout: time.Duration(1000),
recorder: recorder,
scalerCaches: map[string]*cache.ScalersCache{},
scalerCachesLock: &sync.RWMutex{},
scaledObjectsMetricCache: metricscache.NewMetricsCache(),
}

isActive, isError, _, _ := sh.getScaledObjectState(context.TODO(), &scaledObject)
scalerCache.Close(context.Background())

assert.Equal(t, false, isActive)
assert.Equal(t, true, isError)

failureEvent := <-recorder.Events
assert.Contains(t, failureEvent, "KEDAScalerFailed")
assert.Contains(t, failureEvent, "unsupported protocol scheme")
}

func TestCheckScaledObjectFindFirstActiveNotIgnoreOthers(t *testing.T) {
ctrl := gomock.NewController(t)
mockClient := mock_client.NewMockClient(ctrl)
Expand Down

0 comments on commit 956ddd1

Please sign in to comment.