diff --git a/.github/workflows/test-e2e.yml b/.github/workflows/test-e2e.yml index fcdb9f4ef0..70b5083505 100644 --- a/.github/workflows/test-e2e.yml +++ b/.github/workflows/test-e2e.yml @@ -167,6 +167,7 @@ jobs: "multinamespaced", "networkpeering", "privatelink", + "private-endpoint" "project-settings", "serverless-pe", "x509auth", diff --git a/config/rbac/clusterwide/role.yaml b/config/rbac/clusterwide/role.yaml index 972ad91984..267c6e23da 100644 --- a/config/rbac/clusterwide/role.yaml +++ b/config/rbac/clusterwide/role.yaml @@ -33,6 +33,7 @@ rules: - atlasdatafederations - atlasdeployments - atlasfederatedauths + - atlasprivateendpoints - atlasprojects - atlassearchindexconfigs - atlasstreamconnections @@ -56,6 +57,7 @@ rules: - atlasdatafederations/status - atlasdeployments/status - atlasfederatedauths/status + - atlasprivateendpoints/status - atlasprojects/status - atlassearchindexconfigs/status - atlasstreamconnections/status diff --git a/config/rbac/namespaced/role.yaml b/config/rbac/namespaced/role.yaml index 75c34b6a38..5efed787b5 100644 --- a/config/rbac/namespaced/role.yaml +++ b/config/rbac/namespaced/role.yaml @@ -34,6 +34,7 @@ rules: - atlasdatafederations - atlasdeployments - atlasfederatedauths + - atlasprivateendpoints - atlasprojects - atlassearchindexconfigs - atlasstreamconnections @@ -56,6 +57,7 @@ rules: - atlasdatafederations/status - atlasdeployments/status - atlasfederatedauths/status + - atlasprivateendpoints/status - atlasprojects/status - atlassearchindexconfigs/status - atlasstreamconnections/status diff --git a/pkg/api/condition.go b/pkg/api/condition.go index 11451937f7..1a304bad4d 100644 --- a/pkg/api/condition.go +++ b/pkg/api/condition.go @@ -94,6 +94,12 @@ const ( TeamUnmanaged ConditionType = "TeamUnmanaged" ) +// Atlas Private Endpoint condition types +const ( + PrivateEndpointServiceReady ConditionType = "PrivateEndpointServiceReady" + PrivateEndpointReady ConditionType = "PrivateEndpointReady" +) + // Generic condition type const ( ResourceVersionStatus ConditionType = "ResourceVersionIsValid" @@ -116,7 +122,7 @@ type Condition struct { Message string `json:"message,omitempty"` } -// TrueCondition returns the Condition that has the 'Status' set to 'true' and 'Type' to 'conditionType'. +// TrueCondition returns the Condition that has the 'ServiceStatus' set to 'true' and 'Type' to 'conditionType'. // It explicitly omits the 'Reason' and 'Message' fields. func TrueCondition(conditionType ConditionType) Condition { return Condition{ @@ -126,7 +132,7 @@ func TrueCondition(conditionType ConditionType) Condition { } } -// FalseCondition returns the Condition that has the 'Status' set to 'false' and 'Type' to 'conditionType'. +// FalseCondition returns the Condition that has the 'ServiceStatus' set to 'false' and 'Type' to 'conditionType'. // The reason and message can be provided optionally func FalseCondition(conditionType ConditionType) Condition { condition := Condition{ diff --git a/pkg/controller/atlas/provider.go b/pkg/controller/atlas/provider.go index ba303de8dd..51d3dad9e8 100644 --- a/pkg/controller/atlas/provider.go +++ b/pkg/controller/atlas/provider.go @@ -76,7 +76,8 @@ func (p *ProductionProvider) IsResourceSupported(resource api.AtlasCustomResourc *akov2.AtlasDatabaseUser, *akov2.AtlasSearchIndexConfig, *akov2.AtlasBackupCompliancePolicy, - *akov2.AtlasFederatedAuth: + *akov2.AtlasFederatedAuth, + *akov2.AtlasPrivateEndpoint: return true case *akov2.AtlasDataFederation, *akov2.AtlasStreamInstance, diff --git a/pkg/controller/atlasprivateendpoint/atlasprivateendpoint_controller.go b/pkg/controller/atlasprivateendpoint/atlasprivateendpoint_controller.go new file mode 100644 index 0000000000..2af32b3cf4 --- /dev/null +++ b/pkg/controller/atlasprivateendpoint/atlasprivateendpoint_controller.go @@ -0,0 +1,304 @@ +/* +Copyright 2024 MongoDB. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package atlasprivateendpoint + +import ( + "context" + "errors" + "fmt" + "time" + + "go.uber.org/zap" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/tools/record" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/builder" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" + "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/manager" + "sigs.k8s.io/controller-runtime/pkg/predicate" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + + "github.com/mongodb/mongodb-atlas-kubernetes/v2/internal/pointer" + "github.com/mongodb/mongodb-atlas-kubernetes/v2/internal/translation/privateendpoint" + "github.com/mongodb/mongodb-atlas-kubernetes/v2/internal/translation/project" + "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/api" + akov2 "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/api/v1" + "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/controller/atlas" + "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/controller/customresource" + "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/controller/statushandler" + "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/controller/workflow" + "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/indexer" +) + +// AtlasPrivateEndpointReconciler reconciles a AtlasPrivateEndpoint object +type AtlasPrivateEndpointReconciler struct { + Scheme *runtime.Scheme + Client client.Client + EventRecorder record.EventRecorder + AtlasProvider atlas.Provider + GlobalPredicates []predicate.Predicate + Log *zap.SugaredLogger + privateEndpointService privateendpoint.PrivateEndpointService + + ObjectDeletionProtection bool + independentSyncPeriod time.Duration +} + +// +kubebuilder:rbac:groups=atlas.mongodb.com,resources=atlasprivateendpoints,verbs=get;list;watch;create;update;patch;delete +// +kubebuilder:rbac:groups=atlas.mongodb.com,resources=atlasprivateendpoints/status,verbs=get;update;patch +// +kubebuilder:rbac:groups=atlas.mongodb.com,namespace=default,resources=atlasprivateendpoints,verbs=get;list;watch;create;update;patch;delete +// +kubebuilder:rbac:groups=atlas.mongodb.com,namespace=default,resources=atlasprivateendpoints/status,verbs=get;update;patch +// +kubebuilder:rbac:groups="",resources=events,verbs=create;patch +// +kubebuilder:rbac:groups="",resources=secrets,verbs=get;list;watch + +func (r *AtlasPrivateEndpointReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { + r.Log.Infow("-> Starting AtlasPrivateEndpoint reconciliation") + + akoPrivateEndpoint := akov2.AtlasPrivateEndpoint{} + result := customresource.PrepareResource(ctx, r.Client, req, &akoPrivateEndpoint, r.Log) + if !result.IsOk() { + return result.ReconcileResult(), errors.New(result.GetMessage()) + } + + return r.ensureCustomResource(ctx, &akoPrivateEndpoint) +} + +func (r *AtlasPrivateEndpointReconciler) ensureCustomResource(ctx context.Context, akoPrivateEndpoint *akov2.AtlasPrivateEndpoint) (ctrl.Result, error) { + if customresource.ReconciliationShouldBeSkipped(akoPrivateEndpoint) { + return r.skip(ctx, akoPrivateEndpoint) + } + + conditions := api.InitCondition(akoPrivateEndpoint, api.FalseCondition(api.ReadyType)) + workflowCtx := workflow.NewContext(r.Log, conditions, ctx) + defer statushandler.Update(workflowCtx, r.Client, r.EventRecorder, akoPrivateEndpoint) + + isValid := customresource.ValidateResourceVersion(workflowCtx, akoPrivateEndpoint, r.Log) + if !isValid.IsOk() { + return r.invalidate(isValid) + } + + if !r.AtlasProvider.IsResourceSupported(akoPrivateEndpoint) { + return r.unsupport(workflowCtx) + } + + var atlasProject *project.Project + var err error + if akoPrivateEndpoint.Spec.ExternalProject != nil { + atlasProject, err = r.getProjectFromAtlas(ctx, akoPrivateEndpoint) + } else { + atlasProject, err = r.getProjectFromKube(ctx, akoPrivateEndpoint) + } + if err != nil { + return r.terminate(workflowCtx, akoPrivateEndpoint, nil, api.ReadyType, workflow.AtlasAPIAccessNotConfigured, err) + } + + return r.handlePrivateEndpointService(workflowCtx, atlasProject.ID, akoPrivateEndpoint) +} + +func (r *AtlasPrivateEndpointReconciler) getProjectFromAtlas(ctx context.Context, akoPrivateEndpoint *akov2.AtlasPrivateEndpoint) (*project.Project, error) { + sdkClient, _, err := r.AtlasProvider.SdkClient( + ctx, + &client.ObjectKey{Namespace: akoPrivateEndpoint.Namespace, Name: akoPrivateEndpoint.Credentials().Name}, + r.Log, + ) + if err != nil { + return nil, err + } + + projectService := project.NewProjectAPIService(sdkClient.ProjectsApi) + r.privateEndpointService = privateendpoint.NewPrivateEndpointAPI(sdkClient.PrivateEndpointServicesApi) + + atlasProject, err := projectService.GetProject(ctx, akoPrivateEndpoint.Spec.ExternalProject.ID) + if err != nil { + return nil, err + } + + return atlasProject, nil +} + +func (r *AtlasPrivateEndpointReconciler) getProjectFromKube(ctx context.Context, akoPrivateEndpoint *akov2.AtlasPrivateEndpoint) (*project.Project, error) { + atlasProject := &akov2.AtlasProject{} + if err := r.Client.Get(ctx, akoPrivateEndpoint.AtlasProjectObjectKey(), atlasProject); err != nil { + return nil, err + } + + credentialsSecret, err := customresource.ComputeSecret(atlasProject, akoPrivateEndpoint) + if err != nil { + return nil, err + } + + sdkClient, orgID, err := r.AtlasProvider.SdkClient(ctx, credentialsSecret, r.Log) + if err != nil { + return nil, err + } + + r.privateEndpointService = privateendpoint.NewPrivateEndpointAPI(sdkClient.PrivateEndpointServicesApi) + + return project.NewProject(atlasProject, orgID), nil +} + +func (r *AtlasPrivateEndpointReconciler) skip(ctx context.Context, akoPrivateEndpoint *akov2.AtlasPrivateEndpoint) (ctrl.Result, error) { + r.Log.Infow(fmt.Sprintf("-> Skipping AtlasPrivateEndpoint reconciliation as annotation %s=%s", customresource.ReconciliationPolicyAnnotation, customresource.ReconciliationPolicySkip), "spec", akoPrivateEndpoint.Spec) + if !akoPrivateEndpoint.GetDeletionTimestamp().IsZero() { + if err := customresource.ManageFinalizer(ctx, r.Client, akoPrivateEndpoint, customresource.UnsetFinalizer); err != nil { + result := workflow.Terminate(workflow.Internal, err.Error()) + r.Log.Errorw("Failed to remove finalizer", "terminate", err) + + return result.ReconcileResult(), nil + } + } + + return workflow.OK().ReconcileResult(), nil +} + +func (r *AtlasPrivateEndpointReconciler) invalidate(invalid workflow.Result) (ctrl.Result, error) { + // note: ValidateResourceVersion already set the state so we don't have to do it here. + r.Log.Debugf("AtlasPrivateEndpoint is invalid: %v", invalid) + return invalid.ReconcileResult(), nil +} + +func (r *AtlasPrivateEndpointReconciler) unsupport(ctx *workflow.Context) (ctrl.Result, error) { + unsupported := workflow.Terminate( + workflow.AtlasGovUnsupported, "the AtlasPrivateEndpoint is not supported by Atlas for government"). + WithoutRetry() + ctx.SetConditionFromResult(api.ReadyType, unsupported) + return unsupported.ReconcileResult(), nil +} + +func (r *AtlasPrivateEndpointReconciler) terminate( + ctx *workflow.Context, + akoPrivateEndpoint *akov2.AtlasPrivateEndpoint, + atlasPEService privateendpoint.EndpointService, + condition api.ConditionType, + reason workflow.ConditionReason, + err error, +) (ctrl.Result, error) { + r.Log.Errorf("resource %T(%s/%s) failed on condition %s: %s", akoPrivateEndpoint, akoPrivateEndpoint.GetNamespace(), akoPrivateEndpoint.GetName(), condition, err) + result := workflow.Terminate(reason, err.Error()) + ctx.SetConditionFalse(api.ReadyType). + SetConditionFromResult(condition, result) + + if atlasPEService != nil { + ctx.EnsureStatusOption(privateendpoint.NewPrivateEndpointStatus(atlasPEService)) + } + + return result.ReconcileResult(), nil +} + +func (r *AtlasPrivateEndpointReconciler) inProgress( + ctx *workflow.Context, + akoPrivateEndpoint *akov2.AtlasPrivateEndpoint, + atlasPEService privateendpoint.EndpointService, + condition api.ConditionType, + reason workflow.ConditionReason, + msg string, +) (ctrl.Result, error) { + if err := customresource.ManageFinalizer(ctx.Context, r.Client, akoPrivateEndpoint, customresource.SetFinalizer); err != nil { + return r.terminate(ctx, akoPrivateEndpoint, atlasPEService, api.ReadyType, workflow.AtlasFinalizerNotSet, err) + } + + result := workflow.InProgress(reason, msg) + ctx.SetConditionFalse(api.ReadyType). + SetConditionFromResult(condition, result). + EnsureStatusOption(privateendpoint.NewPrivateEndpointStatus(atlasPEService)) + + return result.ReconcileResult(), nil +} + +func (r *AtlasPrivateEndpointReconciler) ready(ctx *workflow.Context, akoPrivateEndpoint *akov2.AtlasPrivateEndpoint, atlasPEService privateendpoint.EndpointService) (ctrl.Result, error) { + if err := customresource.ManageFinalizer(ctx.Context, r.Client, akoPrivateEndpoint, customresource.SetFinalizer); err != nil { + return r.terminate(ctx, akoPrivateEndpoint, atlasPEService, api.ReadyType, workflow.AtlasFinalizerNotSet, err) + } + + ctx.SetConditionTrue(api.PrivateEndpointServiceReady). + SetConditionTrue(api.PrivateEndpointReady). + SetConditionTrue(api.ReadyType). + EnsureStatusOption(privateendpoint.NewPrivateEndpointStatus(atlasPEService)) + + if akoPrivateEndpoint.Spec.ExternalProject != nil { + return workflow.Requeue(r.independentSyncPeriod).ReconcileResult(), nil + } + + return workflow.OK().ReconcileResult(), nil +} + +func (r *AtlasPrivateEndpointReconciler) waitForConfiguration(ctx *workflow.Context, akoPrivateEndpoint *akov2.AtlasPrivateEndpoint, atlasPEService privateendpoint.EndpointService) (ctrl.Result, error) { + if err := customresource.ManageFinalizer(ctx.Context, r.Client, akoPrivateEndpoint, customresource.SetFinalizer); err != nil { + return r.terminate(ctx, akoPrivateEndpoint, atlasPEService, api.ReadyType, workflow.AtlasFinalizerNotSet, err) + } + + result := workflow.InProgress(workflow.PrivateEndpointConfigurationPending, "waiting for private endpoint configuration from customer side"). + WithoutRetry() + ctx.SetConditionFalse(api.ReadyType). + SetConditionTrue(api.PrivateEndpointServiceReady). + SetConditionFromResult(api.PrivateEndpointReady, result). + EnsureStatusOption(privateendpoint.NewPrivateEndpointStatus(atlasPEService)) + + return result.ReconcileResult(), nil +} + +func (r *AtlasPrivateEndpointReconciler) unmanage(ctx *workflow.Context, akoPrivateEndpoint *akov2.AtlasPrivateEndpoint) (ctrl.Result, error) { + if err := customresource.ManageFinalizer(ctx.Context, r.Client, akoPrivateEndpoint, customresource.UnsetFinalizer); err != nil { + return r.terminate(ctx, akoPrivateEndpoint, nil, api.ReadyType, workflow.AtlasFinalizerNotRemoved, err) + } + + return workflow.Deleted().ReconcileResult(), nil +} + +func (r *AtlasPrivateEndpointReconciler) SetupWithManager(mgr ctrl.Manager, skipNameValidation bool) error { + return ctrl.NewControllerManagedBy(mgr). + Named("AtlasPrivateEndpoint"). + For(&akov2.AtlasPrivateEndpoint{}, builder.WithPredicates(r.GlobalPredicates...)). + Watches( + &corev1.Secret{}, + handler.EnqueueRequestsFromMapFunc(r.privateEndpointForCredentialMapFunc()), + builder.WithPredicates(predicate.ResourceVersionChangedPredicate{}), + ). + WithOptions(controller.TypedOptions[reconcile.Request]{SkipNameValidation: pointer.MakePtr(skipNameValidation)}). + Complete(r) +} + +func (r *AtlasPrivateEndpointReconciler) privateEndpointForCredentialMapFunc() handler.MapFunc { + return indexer.CredentialsIndexMapperFunc( + indexer.AtlasPrivateEndpointCredentialsIndex, + func() *akov2.AtlasPrivateEndpointList { return &akov2.AtlasPrivateEndpointList{} }, + indexer.PrivateEndpointRequests, + r.Client, + r.Log, + ) +} + +func NewAtlasPrivateEndpointReconciler( + mgr manager.Manager, + predicates []predicate.Predicate, + atlasProvider atlas.Provider, + deletionProtection bool, + logger *zap.Logger, +) *AtlasPrivateEndpointReconciler { + return &AtlasPrivateEndpointReconciler{ + Scheme: mgr.GetScheme(), + Client: mgr.GetClient(), + EventRecorder: mgr.GetEventRecorderFor("AtlasPrivateEndpoint"), + AtlasProvider: atlasProvider, + GlobalPredicates: predicates, + Log: logger.Named("controllers").Named("AtlasPrivateEndpoint").Sugar(), + ObjectDeletionProtection: deletionProtection, + } +} diff --git a/pkg/controller/atlasprivateendpoint/atlasprivateendpoint_controller_test.go b/pkg/controller/atlasprivateendpoint/atlasprivateendpoint_controller_test.go new file mode 100644 index 0000000000..c3d2d693d7 --- /dev/null +++ b/pkg/controller/atlasprivateendpoint/atlasprivateendpoint_controller_test.go @@ -0,0 +1,596 @@ +package atlasprivateendpoint + +import ( + "context" + "errors" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + "go.mongodb.org/atlas-sdk/v20231115008/admin" + "go.mongodb.org/atlas-sdk/v20231115008/mockadmin" + "go.uber.org/zap" + "go.uber.org/zap/zaptest" + "go.uber.org/zap/zaptest/observer" + k8serrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/tools/record" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/client/fake" + "sigs.k8s.io/controller-runtime/pkg/client/interceptor" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + + atlasmock "github.com/mongodb/mongodb-atlas-kubernetes/v2/internal/mocks/atlas" + "github.com/mongodb/mongodb-atlas-kubernetes/v2/internal/pointer" + "github.com/mongodb/mongodb-atlas-kubernetes/v2/internal/translation/privateendpoint" + "github.com/mongodb/mongodb-atlas-kubernetes/v2/internal/translation/project" + "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/api" + akov2 "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/api/v1" + "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/api/v1/common" + "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/api/v1/status" + "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/controller/atlas" + "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/controller/customresource" + "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/controller/workflow" +) + +func TestReconcile(t *testing.T) { + ctx := context.Background() + projectID := "project-id" + + testScheme := runtime.NewScheme() + require.NoError(t, akov2.AddToScheme(testScheme)) + + tests := map[string]struct { + request reconcile.Request + expectedResult reconcile.Result + expectedLogs []string + }{ + "failed to prepare resource": { + request: reconcile.Request{NamespacedName: types.NamespacedName{Namespace: "default", Name: "pe2"}}, + expectedResult: reconcile.Result{}, + expectedLogs: []string{ + "-> Starting AtlasPrivateEndpoint reconciliation", + "Object default/pe2 doesn't exist, was it deleted after reconcile request?", + }, + }, + "prepare resource for reconciliation": { + request: reconcile.Request{NamespacedName: types.NamespacedName{Namespace: "default", Name: "pe1"}}, + expectedResult: reconcile.Result{}, + expectedLogs: []string{ + "-> Starting AtlasPrivateEndpoint reconciliation", + "-> Skipping AtlasPrivateEndpoint reconciliation as annotation mongodb.com/atlas-reconciliation-policy=skip", + }, + }, + } + for name, tt := range tests { + t.Run(name, func(t *testing.T) { + pe := &akov2.AtlasPrivateEndpoint{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pe1", + Namespace: "default", + Annotations: map[string]string{ + customresource.ReconciliationPolicyAnnotation: customresource.ReconciliationPolicySkip, + }, + }, + Spec: akov2.AtlasPrivateEndpointSpec{ + ExternalProject: &akov2.ExternalProjectReference{ + ID: projectID, + }, + LocalCredentialHolder: api.LocalCredentialHolder{}, + Provider: "AWS", + Region: "US_EAST_1", + }, + Status: status.AtlasPrivateEndpointStatus{ + ServiceID: "pe-service-id", + ServiceStatus: "AVAILABLE", + }, + } + + core, logs := observer.New(zap.DebugLevel) + fakeClient := fake.NewClientBuilder(). + WithScheme(testScheme). + WithObjects(pe). + Build() + r := &AtlasPrivateEndpointReconciler{ + Client: fakeClient, + Log: zap.New(core).Sugar(), + } + result, _ := r.Reconcile(ctx, tt.request) + assert.Equal(t, tt.expectedResult, result) + assert.Equal(t, len(tt.expectedLogs), logs.Len()) + for i, logMsg := range tt.expectedLogs { + assert.Equal(t, logMsg, logs.All()[i].Message) + } + }) + } +} + +func TestEnsureCustomResource(t *testing.T) { + ctx := context.Background() + projectID := "project-id" + + testScheme := runtime.NewScheme() + require.NoError(t, akov2.AddToScheme(testScheme)) + + tests := map[string]struct { + atlasPrivateEndpoint *akov2.AtlasPrivateEndpoint + provider atlas.Provider + expectedResult reconcile.Result + expectedLogs []string + }{ + "skip custom resource reconciliation": { + atlasPrivateEndpoint: &akov2.AtlasPrivateEndpoint{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pe1", + Namespace: "default", + Annotations: map[string]string{ + customresource.ReconciliationPolicyAnnotation: customresource.ReconciliationPolicySkip, + }, + }, + Spec: akov2.AtlasPrivateEndpointSpec{ + ExternalProject: &akov2.ExternalProjectReference{ + ID: projectID, + }, + LocalCredentialHolder: api.LocalCredentialHolder{}, + Provider: "AWS", + Region: "US_EAST_1", + }, + Status: status.AtlasPrivateEndpointStatus{ + ServiceID: "pe-service-id", + ServiceStatus: "AVAILABLE", + }, + }, + expectedResult: reconcile.Result{}, + expectedLogs: []string{ + "-> Skipping AtlasPrivateEndpoint reconciliation as annotation mongodb.com/atlas-reconciliation-policy=skip", + }, + }, + "custom resource version is invalid": { + atlasPrivateEndpoint: &akov2.AtlasPrivateEndpoint{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pe1", + Namespace: "default", + Labels: map[string]string{ + customresource.ResourceVersion: "wrong", + }, + }, + Spec: akov2.AtlasPrivateEndpointSpec{ + ExternalProject: &akov2.ExternalProjectReference{ + ID: projectID, + }, + LocalCredentialHolder: api.LocalCredentialHolder{}, + Provider: "AWS", + Region: "US_EAST_1", + }, + Status: status.AtlasPrivateEndpointStatus{ + ServiceID: "pe-service-id", + ServiceStatus: "AVAILABLE", + }, + }, + expectedResult: reconcile.Result{RequeueAfter: workflow.DefaultRetry}, + expectedLogs: []string{ + "resource version for 'pe1' is invalid", + "AtlasPrivateEndpoint is invalid: {true 10000000000 wrong is not a valid semver version for label mongodb.com/atlas-resource-version AtlasResourceVersionIsInvalid true false}", + "Status update", + }, + }, + "custom resource is not supported": { + atlasPrivateEndpoint: &akov2.AtlasPrivateEndpoint{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pe1", + Namespace: "default", + }, + Spec: akov2.AtlasPrivateEndpointSpec{ + ExternalProject: &akov2.ExternalProjectReference{ + ID: projectID, + }, + LocalCredentialHolder: api.LocalCredentialHolder{}, + Provider: "AWS", + Region: "US_EAST_1", + }, + Status: status.AtlasPrivateEndpointStatus{ + ServiceID: "pe-service-id", + ServiceStatus: "AVAILABLE", + }, + }, + provider: &atlasmock.TestProvider{ + IsCloudGovFunc: func() bool { + return true + }, + IsSupportedFunc: func() bool { + return false + }, + }, + expectedResult: reconcile.Result{}, + expectedLogs: []string{ + "resource 'pe1' version is valid", + "Status update", + }, + }, + "failed to get project from atlas": { + atlasPrivateEndpoint: &akov2.AtlasPrivateEndpoint{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pe1", + Namespace: "default", + }, + Spec: akov2.AtlasPrivateEndpointSpec{ + ExternalProject: &akov2.ExternalProjectReference{ + ID: projectID, + }, + LocalCredentialHolder: api.LocalCredentialHolder{ + ConnectionSecret: &api.LocalObjectReference{ + Name: "my-secret", + }, + }, + Provider: "AWS", + Region: "US_EAST_1", + }, + Status: status.AtlasPrivateEndpointStatus{ + ServiceID: "pe-service-id", + ServiceStatus: "AVAILABLE", + }, + }, + provider: &atlasmock.TestProvider{ + IsCloudGovFunc: func() bool { + return true + }, + IsSupportedFunc: func() bool { + return true + }, + SdkClientFunc: func(secretRef *client.ObjectKey, log *zap.SugaredLogger) (*admin.APIClient, string, error) { + return nil, "", errors.New("failed to create sdk client") + }, + }, + expectedResult: reconcile.Result{RequeueAfter: workflow.DefaultRetry}, + expectedLogs: []string{ + "resource 'pe1' version is valid", + "resource *v1.AtlasPrivateEndpoint(default/pe1) failed on condition Ready: failed to create sdk client", + "Status update", + }, + }, + "failed to get project from cluster": { + atlasPrivateEndpoint: &akov2.AtlasPrivateEndpoint{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pe1", + Namespace: "default", + }, + Spec: akov2.AtlasPrivateEndpointSpec{ + Project: &common.ResourceRefNamespaced{ + Name: "my-project", + Namespace: "default", + }, + Provider: "AWS", + Region: "US_EAST_1", + }, + Status: status.AtlasPrivateEndpointStatus{ + ServiceID: "pe-service-id", + ServiceStatus: "AVAILABLE", + }, + }, + provider: &atlasmock.TestProvider{ + IsCloudGovFunc: func() bool { + return true + }, + IsSupportedFunc: func() bool { + return true + }, + }, + expectedResult: reconcile.Result{RequeueAfter: workflow.DefaultRetry}, + expectedLogs: []string{ + "resource 'pe1' version is valid", + "resource *v1.AtlasPrivateEndpoint(default/pe1) failed on condition Ready: atlasprojects.atlas.mongodb.com \"my-project\" not found", + "Status update", + }, + }, + "custom resource is ready for reconciliation": { + atlasPrivateEndpoint: &akov2.AtlasPrivateEndpoint{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pe1", + Namespace: "default", + }, + Spec: akov2.AtlasPrivateEndpointSpec{ + ExternalProject: &akov2.ExternalProjectReference{ + ID: projectID, + }, + LocalCredentialHolder: api.LocalCredentialHolder{ + ConnectionSecret: &api.LocalObjectReference{ + Name: "my-secret", + }, + }, + Provider: "AWS", + Region: "US_EAST_1", + }, + Status: status.AtlasPrivateEndpointStatus{ + ServiceID: "pe-service-id", + ServiceStatus: "INITIATING", + }, + }, + provider: &atlasmock.TestProvider{ + IsCloudGovFunc: func() bool { + return true + }, + IsSupportedFunc: func() bool { + return true + }, + SdkClientFunc: func(secretRef *client.ObjectKey, log *zap.SugaredLogger) (*admin.APIClient, string, error) { + projectAPI := mockadmin.NewProjectsApi(t) + projectAPI.EXPECT().GetProject(ctx, projectID).Return(admin.GetProjectApiRequest{ApiService: projectAPI}) + projectAPI.EXPECT().GetProjectExecute(mock.AnythingOfType("admin.GetProjectApiRequest")). + Return(&admin.Group{Id: &projectID}, nil, nil) + + peAPI := mockadmin.NewPrivateEndpointServicesApi(t) + peAPI.EXPECT().GetPrivateEndpointService(ctx, projectID, "AWS", "pe-service-id"). + Return(admin.GetPrivateEndpointServiceApiRequest{ApiService: peAPI}) + peAPI.EXPECT().GetPrivateEndpointServiceExecute(mock.AnythingOfType("admin.GetPrivateEndpointServiceApiRequest")). + Return( + &admin.EndpointService{ + Id: pointer.MakePtr("pe-service-id"), + CloudProvider: "AWS", + RegionName: pointer.MakePtr("US_EAST_1"), + Status: pointer.MakePtr("INITIATING"), + }, + nil, + nil, + ) + + return &admin.APIClient{ProjectsApi: projectAPI, PrivateEndpointServicesApi: peAPI}, "", nil + }, + }, + expectedResult: reconcile.Result{RequeueAfter: workflow.DefaultRetry}, + expectedLogs: []string{ + "resource 'pe1' version is valid", + "Status update", + }, + }, + } + for name, tt := range tests { + t.Run(name, func(t *testing.T) { + core, logs := observer.New(zap.DebugLevel) + fakeClient := fake.NewClientBuilder(). + WithScheme(testScheme). + WithObjects(tt.atlasPrivateEndpoint). + WithStatusSubresource(tt.atlasPrivateEndpoint). + Build() + r := &AtlasPrivateEndpointReconciler{ + Client: fakeClient, + AtlasProvider: tt.provider, + EventRecorder: record.NewFakeRecorder(10), + Log: zap.New(core).Sugar(), + } + result, err := r.ensureCustomResource(ctx, tt.atlasPrivateEndpoint) + assert.NoError(t, err) + assert.Equal(t, tt.expectedResult, result) + assert.Equal(t, len(tt.expectedLogs), logs.Len()) + for i, logMsg := range tt.expectedLogs { + assert.Equal(t, logMsg, logs.All()[i].Message) + } + }) + } +} + +func TestGetProjectFromKube(t *testing.T) { + ctx := context.Background() + projectID := "project-id" + + testScheme := runtime.NewScheme() + require.NoError(t, akov2.AddToScheme(testScheme)) + + tests := map[string]struct { + atlasPrivateEndpoint *akov2.AtlasPrivateEndpoint + provider atlas.Provider + expectedProject *project.Project + expectedErr error + }{ + "failed to resolve secret": { + atlasPrivateEndpoint: &akov2.AtlasPrivateEndpoint{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pe1", + Namespace: "default", + }, + Spec: akov2.AtlasPrivateEndpointSpec{ + Project: &common.ResourceRefNamespaced{ + Name: "my-missing-project", + Namespace: "default", + }, + Provider: "AWS", + Region: "US_EAST_1", + }, + Status: status.AtlasPrivateEndpointStatus{ + ServiceID: "pe-service-id", + ServiceStatus: "AVAILABLE", + }, + }, + expectedErr: &k8serrors.StatusError{ + ErrStatus: metav1.Status{ + Status: "Failure", + Message: "atlasprojects.atlas.mongodb.com \"my-missing-project\" not found", + Reason: "NotFound", + Details: &metav1.StatusDetails{ + Name: "my-missing-project", + Group: "atlas.mongodb.com", + Kind: "atlasprojects", + }, + Code: 404, + }, + }, + }, + "failed to create sdk client": { + atlasPrivateEndpoint: &akov2.AtlasPrivateEndpoint{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pe1", + Namespace: "default", + }, + Spec: akov2.AtlasPrivateEndpointSpec{ + Project: &common.ResourceRefNamespaced{ + Name: "my-project", + Namespace: "default", + }, + Provider: "AWS", + Region: "US_EAST_1", + }, + Status: status.AtlasPrivateEndpointStatus{ + ServiceID: "pe-service-id", + ServiceStatus: "AVAILABLE", + }, + }, + provider: &atlasmock.TestProvider{ + SdkClientFunc: func(secretRef *client.ObjectKey, log *zap.SugaredLogger) (*admin.APIClient, string, error) { + return nil, "", errors.New("failed to create sdk client") + }, + }, + expectedErr: errors.New("failed to create sdk client"), + }, + "get project from cluster": { + atlasPrivateEndpoint: &akov2.AtlasPrivateEndpoint{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pe1", + Namespace: "default", + }, + Spec: akov2.AtlasPrivateEndpointSpec{ + Project: &common.ResourceRefNamespaced{ + Name: "my-project", + Namespace: "default", + }, + Provider: "AWS", + Region: "US_EAST_1", + }, + Status: status.AtlasPrivateEndpointStatus{ + ServiceID: "pe-service-id", + ServiceStatus: "AVAILABLE", + }, + }, + provider: &atlasmock.TestProvider{ + SdkClientFunc: func(secretRef *client.ObjectKey, log *zap.SugaredLogger) (*admin.APIClient, string, error) { + return &admin.APIClient{}, "org-id", nil + }, + }, + expectedProject: &project.Project{ + OrgID: "org-id", + ID: "project-id", + Name: "My Project", + }, + }, + } + for name, tt := range tests { + t.Run(name, func(t *testing.T) { + akoProject := &akov2.AtlasProject{ + ObjectMeta: metav1.ObjectMeta{ + Name: "my-project", + Namespace: "default", + }, + Spec: akov2.AtlasProjectSpec{ + Name: "My Project", + }, + Status: status.AtlasProjectStatus{ + ID: projectID, + }, + } + + fakeClient := fake.NewClientBuilder(). + WithScheme(testScheme). + WithObjects(akoProject). + Build() + r := &AtlasPrivateEndpointReconciler{ + Client: fakeClient, + AtlasProvider: tt.provider, + } + p, err := r.getProjectFromKube(ctx, tt.atlasPrivateEndpoint) + assert.Equal(t, tt.expectedProject, p) + assert.Equal(t, tt.expectedErr, err) + }) + } +} + +func TestFailManageFinalizer(t *testing.T) { + testScheme := runtime.NewScheme() + require.NoError(t, akov2.AddToScheme(testScheme)) + pe := &akov2.AtlasPrivateEndpoint{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pe1", + Namespace: "default", + }, + Spec: akov2.AtlasPrivateEndpointSpec{ + ExternalProject: &akov2.ExternalProjectReference{ + ID: "project-id", + }, + LocalCredentialHolder: api.LocalCredentialHolder{}, + Provider: "AWS", + Region: "US_EAST_1", + }, + Status: status.AtlasPrivateEndpointStatus{ + ServiceID: "pe-service-id", + ServiceStatus: "AVAILABLE", + }, + } + atlasPE := &privateendpoint.AWSService{ + CommonEndpointService: privateendpoint.CommonEndpointService{ + ID: "pe-service-id", + CloudRegion: "US_EAST_1", + ServiceStatus: "AVAILABLE", + }, + ServiceName: "aws/service/name", + } + + fakeClient := fake.NewClientBuilder(). + WithScheme(testScheme). + WithObjects(pe). + WithStatusSubresource(pe). + WithInterceptorFuncs(interceptor.Funcs{ + Patch: func(ctx context.Context, client client.WithWatch, obj client.Object, patch client.Patch, opts ...client.PatchOption) error { + return errors.New("failed to manage finalizer") + }, + }). + Build() + + logger := zaptest.NewLogger(t).Sugar() + workflowCtx := &workflow.Context{ + Log: logger, + } + + tests := map[string]struct { + transition func(r *AtlasPrivateEndpointReconciler) (reconcile.Result, error) + }{ + "failed to manage finalizer when transitioning to in progress": { + transition: func(r *AtlasPrivateEndpointReconciler) (reconcile.Result, error) { + return r.inProgress( + workflowCtx, + pe, + atlasPE, + api.PrivateEndpointServiceReady, + workflow.PrivateEndpointServiceInitializing, + "testing transition", + ) + }, + }, + "failed to manage finalizer when transitioning to ready": { + transition: func(r *AtlasPrivateEndpointReconciler) (reconcile.Result, error) { + return r.ready(workflowCtx, pe, atlasPE) + }, + }, + "failed to manage finalizer when transitioning to waiting configuration": { + transition: func(r *AtlasPrivateEndpointReconciler) (reconcile.Result, error) { + return r.waitForConfiguration(workflowCtx, pe, atlasPE) + }, + }, + "failed to manage finalizer when transitioning to unmanage": { + transition: func(r *AtlasPrivateEndpointReconciler) (reconcile.Result, error) { + return r.unmanage(workflowCtx, pe) + }, + }, + } + for name, tt := range tests { + t.Run(name, func(t *testing.T) { + r := &AtlasPrivateEndpointReconciler{ + Client: fakeClient, + Log: zaptest.NewLogger(t).Sugar(), + } + + result, err := tt.transition(r) + assert.NoError(t, err) + assert.Equal(t, reconcile.Result{RequeueAfter: workflow.DefaultRetry}, result) + }) + } +} diff --git a/pkg/controller/atlasprivateendpoint/privateendpoint.go b/pkg/controller/atlasprivateendpoint/privateendpoint.go new file mode 100644 index 0000000000..e2b40bbdb7 --- /dev/null +++ b/pkg/controller/atlasprivateendpoint/privateendpoint.go @@ -0,0 +1,220 @@ +/* +Copyright 2024 MongoDB. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package atlasprivateendpoint + +import ( + "context" + "errors" + "reflect" + + ctrl "sigs.k8s.io/controller-runtime" + + "github.com/mongodb/mongodb-atlas-kubernetes/v2/internal/translation/privateendpoint" + "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/api" + akov2 "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/api/v1" + "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/controller/customresource" + "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/controller/workflow" +) + +func (r *AtlasPrivateEndpointReconciler) handlePrivateEndpointService( + ctx *workflow.Context, + projectID string, + akoPrivateEndpoint *akov2.AtlasPrivateEndpoint, +) (ctrl.Result, error) { + akoPEService := privateendpoint.NewPrivateEndpoint(akoPrivateEndpoint) + atlasPEService, err := r.getPrivateEndpointService(ctx.Context, projectID, akoPrivateEndpoint) + if err != nil { + return r.terminate(ctx, akoPrivateEndpoint, atlasPEService, api.ReadyType, workflow.Internal, err) + } + + wasDeleted := !akoPrivateEndpoint.GetDeletionTimestamp().IsZero() + existInAtlas := atlasPEService != nil + + switch { + case !existInAtlas && !wasDeleted: + atlasPEService, err = r.privateEndpointService.CreatePrivateEndpointService(ctx.Context, projectID, akoPEService) + if err != nil { + return r.terminate(ctx, akoPrivateEndpoint, atlasPEService, api.PrivateEndpointServiceReady, workflow.PrivateEndpointServiceFailedToCreate, err) + } + case !existInAtlas && wasDeleted: + return r.unmanage(ctx, akoPrivateEndpoint) + case existInAtlas && wasDeleted: + if customresource.IsResourcePolicyKeepOrDefault(akoPrivateEndpoint, r.ObjectDeletionProtection) { + return r.unmanage(ctx, akoPrivateEndpoint) + } + + atlasPEService, err = r.deletePrivateEndpoint(ctx.Context, projectID, atlasPEService) + if err != nil { + return r.terminate(ctx, akoPrivateEndpoint, atlasPEService, api.PrivateEndpointServiceReady, workflow.PrivateEndpointFailedToDelete, err) + } + } + + switch atlasPEService.Status() { + case privateendpoint.StatusInitiating: + return r.inProgress(ctx, akoPrivateEndpoint, atlasPEService, api.PrivateEndpointServiceReady, workflow.PrivateEndpointServiceInitializing, "Private Endpoint is being initialized") + case privateendpoint.StatusPending, privateendpoint.StatusPendingAcceptance, privateendpoint.StatusWaitingForUser, privateendpoint.StatusVerified: + return r.inProgress(ctx, akoPrivateEndpoint, atlasPEService, api.PrivateEndpointServiceReady, workflow.PrivateEndpointServiceInitializing, "Private Endpoint is waiting for human action") + case privateendpoint.StatusFailed, privateendpoint.StatusRejected: + return r.terminate(ctx, akoPrivateEndpoint, atlasPEService, api.PrivateEndpointServiceReady, workflow.PrivateEndpointServiceFailedToConfigure, errors.New(atlasPEService.ErrorMessage())) + case privateendpoint.StatusDeleting: + return r.inProgress(ctx, akoPrivateEndpoint, atlasPEService, api.PrivateEndpointServiceReady, workflow.PrivateEndpointServiceDeleting, "Private Endpoint is being deleted") + } + + ctx.SetConditionTrue(api.PrivateEndpointServiceReady) + r.EventRecorder.Event(akoPrivateEndpoint, "Normal", string(workflow.PrivateEndpointServiceCreated), "Private Endpoint Service is available") + + return r.handlePrivateEndpointInterface(ctx, projectID, akoPrivateEndpoint, akoPEService, atlasPEService) +} + +func (r *AtlasPrivateEndpointReconciler) handlePrivateEndpointInterface( + ctx *workflow.Context, + projectID string, + akoPrivateEndpoint *akov2.AtlasPrivateEndpoint, + akoPEService privateendpoint.EndpointService, + atlasPEService privateendpoint.EndpointService, +) (ctrl.Result, error) { + if len(akoPEService.EndpointInterfaces()) == 0 && len(atlasPEService.EndpointInterfaces()) == 0 { + return r.waitForConfiguration(ctx, akoPrivateEndpoint, atlasPEService) + } + + inProgress := false + for _, akoPEInterface := range akoPEService.EndpointInterfaces() { + atlasPEInterfaces := atlasPEService.EndpointInterfaces() + atlasPEInterface := atlasPEInterfaces.Get(akoPEInterface.InterfaceID()) + existInAtlas := atlasPEInterface != nil + inProgress = isInterfaceInProgress(akoPEInterface.Status()) + var err error + + if !existInAtlas { + gcpProjectID := getGCPProjectID(akoPrivateEndpoint, akoPEInterface.InterfaceID()) + _, err = r.privateEndpointService.CreatePrivateEndpointInterface( + ctx.Context, + projectID, + akoPrivateEndpoint.Spec.Provider, + akoPrivateEndpoint.Status.ServiceID, + gcpProjectID, + akoPEInterface, + ) + if err != nil { + return r.terminate(ctx, akoPrivateEndpoint, atlasPEService, api.PrivateEndpointReady, workflow.PrivateEndpointFailedToCreate, err) + } + + inProgress = true + } + } + + for _, atlasPEInterface := range atlasPEService.EndpointInterfaces() { + akoPEInterfaces := akoPEService.EndpointInterfaces() + akoPEInterface := akoPEInterfaces.Get(atlasPEInterface.InterfaceID()) + wasDeleted := akoPEInterface == nil + inProgress = isInterfaceInProgress(atlasPEInterface.Status()) + var err error + + if wasDeleted && atlasPEInterface.Status() != privateendpoint.StatusDeleting { + err = r.privateEndpointService.DeleteEndpointInterface(ctx.Context, projectID, akoPrivateEndpoint.Spec.Provider, akoPrivateEndpoint.Status.ServiceID, atlasPEInterface.InterfaceID()) + if err != nil { + return r.terminate(ctx, akoPrivateEndpoint, atlasPEService, api.PrivateEndpointReady, workflow.PrivateEndpointFailedToDelete, err) + } + + inProgress = true + + continue + } + + if hasInterfaceFailed(atlasPEInterface.Status()) { + return r.terminate(ctx, akoPrivateEndpoint, atlasPEService, api.PrivateEndpointReady, workflow.PrivateEndpointFailedToConfigure, errors.New(atlasPEInterface.ErrorMessage())) + } + } + + if !inProgress && reflect.DeepEqual(akoPEService.EndpointInterfaces(), atlasPEService.EndpointInterfaces()) { + return r.ready(ctx, akoPrivateEndpoint, atlasPEService) + } + + return r.inProgress(ctx, akoPrivateEndpoint, atlasPEService, api.PrivateEndpointReady, workflow.PrivateEndpointUpdating, "Private Endpoints are being updated") +} + +func (r *AtlasPrivateEndpointReconciler) getPrivateEndpointService( + ctx context.Context, + projectID string, + akoPrivateEndpoint *akov2.AtlasPrivateEndpoint, +) (privateendpoint.EndpointService, error) { + if akoPrivateEndpoint.Status.ServiceID != "" { + return r.privateEndpointService.GetPrivateEndpoint(ctx, projectID, akoPrivateEndpoint.Spec.Provider, akoPrivateEndpoint.Status.ServiceID) + } + + endpointServices, err := r.privateEndpointService.ListPrivateEndpoints(ctx, projectID, akoPrivateEndpoint.Spec.Provider) + if err != nil { + return nil, err + } + + for _, endpointService := range endpointServices { + if endpointService.Region() == akoPrivateEndpoint.Spec.Region { + return endpointService, err + } + } + + return nil, nil +} + +func (r *AtlasPrivateEndpointReconciler) deletePrivateEndpoint( + ctx context.Context, + projectID string, + atlasPEService privateendpoint.EndpointService, +) (privateendpoint.EndpointService, error) { + if len(atlasPEService.EndpointInterfaces()) == 0 && atlasPEService.Status() != privateendpoint.StatusDeleting { + err := r.privateEndpointService.DeleteEndpointService(ctx, projectID, atlasPEService.Provider(), atlasPEService.ServiceID()) + if err != nil { + return atlasPEService, err + } + } + + for _, i := range atlasPEService.EndpointInterfaces() { + if i.Status() != privateendpoint.StatusDeleting { + err := r.privateEndpointService.DeleteEndpointInterface(ctx, projectID, atlasPEService.Provider(), atlasPEService.ServiceID(), i.InterfaceID()) + if err != nil { + return atlasPEService, err + } + } + } + + return r.privateEndpointService.GetPrivateEndpoint(ctx, projectID, atlasPEService.Provider(), atlasPEService.ServiceID()) +} + +func isInterfaceInProgress(status string) bool { + return status == privateendpoint.StatusInitiating || + status == privateendpoint.StatusPending || + status == privateendpoint.StatusPendingAcceptance || + status == privateendpoint.StatusWaitingForUser || + status == privateendpoint.StatusVerified || + status == privateendpoint.StatusDeleting +} + +func hasInterfaceFailed(status string) bool { + return status == privateendpoint.StatusFailed || status == privateendpoint.StatusRejected +} + +func getGCPProjectID(akoPrivateEndpoint *akov2.AtlasPrivateEndpoint, interfaceID string) string { + if akoPrivateEndpoint.Spec.Provider == privateendpoint.ProviderGCP && len(akoPrivateEndpoint.Spec.GCPConfiguration) > 0 { + for _, config := range akoPrivateEndpoint.Spec.GCPConfiguration { + if config.GroupName == interfaceID { + return config.ProjectID + } + } + } + + return "" +} diff --git a/pkg/controller/atlasprivateendpoint/privateendpoint_test.go b/pkg/controller/atlasprivateendpoint/privateendpoint_test.go new file mode 100644 index 0000000000..39ade72418 --- /dev/null +++ b/pkg/controller/atlasprivateendpoint/privateendpoint_test.go @@ -0,0 +1,1185 @@ +package atlasprivateendpoint + +import ( + "context" + "errors" + "testing" + + "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + "go.uber.org/zap/zaptest" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/tools/record" + "sigs.k8s.io/controller-runtime/pkg/client/fake" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + + "github.com/mongodb/mongodb-atlas-kubernetes/v2/internal/mocks/translation" + "github.com/mongodb/mongodb-atlas-kubernetes/v2/internal/translation/privateendpoint" + "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/api" + akov2 "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/api/v1" + "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/api/v1/status" + "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/controller/customresource" + "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/controller/workflow" +) + +func TestHandlePrivateEndpointService(t *testing.T) { + ctx := context.Background() + projectID := "project-id" + deletionTime := metav1.Now() + + testScheme := runtime.NewScheme() + require.NoError(t, akov2.AddToScheme(testScheme)) + + tests := map[string]struct { + atlasPrivateEndpoint *akov2.AtlasPrivateEndpoint + peClient func() privateendpoint.PrivateEndpointService + expectedResult reconcile.Result + expectedConditions []api.Condition + }{ + "failed to retrieve private endpoint": { + atlasPrivateEndpoint: &akov2.AtlasPrivateEndpoint{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pe1", + Namespace: "default", + }, + Spec: akov2.AtlasPrivateEndpointSpec{ + ExternalProject: &akov2.ExternalProjectReference{ + ID: projectID, + }, + LocalCredentialHolder: api.LocalCredentialHolder{}, + Provider: "AWS", + Region: "US_EAST_1", + }, + Status: status.AtlasPrivateEndpointStatus{ + ServiceID: "pe-service-id", + ServiceStatus: "AVAILABLE", + }, + }, + peClient: func() privateendpoint.PrivateEndpointService { + c := translation.NewPrivateEndpointServiceMock(t) + c.EXPECT().GetPrivateEndpoint(ctx, projectID, "AWS", "pe-service-id"). + Return(nil, errors.New("failed to get private endpoint")) + + return c + }, + expectedResult: reconcile.Result{RequeueAfter: workflow.DefaultRetry}, + expectedConditions: []api.Condition{ + api.FalseCondition(api.ReadyType). + WithReason(string(workflow.Internal)). + WithMessageRegexp("failed to get private endpoint"), + }, + }, + "failed to create private endpoint service": { + atlasPrivateEndpoint: &akov2.AtlasPrivateEndpoint{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pe1", + Namespace: "default", + }, + Spec: akov2.AtlasPrivateEndpointSpec{ + ExternalProject: &akov2.ExternalProjectReference{ + ID: projectID, + }, + LocalCredentialHolder: api.LocalCredentialHolder{}, + Provider: "AWS", + Region: "US_EAST_1", + }, + }, + peClient: func() privateendpoint.PrivateEndpointService { + c := translation.NewPrivateEndpointServiceMock(t) + c.EXPECT().ListPrivateEndpoints(ctx, projectID, "AWS"). + Return(nil, nil) + c.EXPECT().CreatePrivateEndpointService(ctx, projectID, mock.AnythingOfType("*privateendpoint.AWSService")). + Return(nil, errors.New("failed to create private endpoint")) + + return c + }, + expectedResult: reconcile.Result{RequeueAfter: workflow.DefaultRetry}, + expectedConditions: []api.Condition{ + api.FalseCondition(api.ReadyType), + api.FalseCondition(api.PrivateEndpointServiceReady). + WithReason(string(workflow.PrivateEndpointServiceFailedToCreate)). + WithMessageRegexp("failed to create private endpoint"), + }, + }, + "unmanage already deleted private endpoint": { + atlasPrivateEndpoint: &akov2.AtlasPrivateEndpoint{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pe1", + Namespace: "default", + DeletionTimestamp: &deletionTime, + Finalizers: []string{customresource.FinalizerLabel}, + }, + Spec: akov2.AtlasPrivateEndpointSpec{ + ExternalProject: &akov2.ExternalProjectReference{ + ID: projectID, + }, + LocalCredentialHolder: api.LocalCredentialHolder{}, + Provider: "AWS", + Region: "US_EAST_1", + }, + Status: status.AtlasPrivateEndpointStatus{ + ServiceID: "pe-service-id", + ServiceStatus: "AVAILABLE", + }, + }, + peClient: func() privateendpoint.PrivateEndpointService { + c := translation.NewPrivateEndpointServiceMock(t) + c.EXPECT().GetPrivateEndpoint(ctx, projectID, "AWS", "pe-service-id"). + Return(nil, nil) + + return c + }, + expectedResult: reconcile.Result{}, + }, + "unmanage protected private endpoint": { + atlasPrivateEndpoint: &akov2.AtlasPrivateEndpoint{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pe1", + Namespace: "default", + DeletionTimestamp: &deletionTime, + Finalizers: []string{customresource.FinalizerLabel}, + Annotations: map[string]string{ + customresource.ResourcePolicyAnnotation: customresource.ResourcePolicyKeep, + }, + }, + Spec: akov2.AtlasPrivateEndpointSpec{ + ExternalProject: &akov2.ExternalProjectReference{ + ID: projectID, + }, + LocalCredentialHolder: api.LocalCredentialHolder{}, + Provider: "AWS", + Region: "US_EAST_1", + }, + Status: status.AtlasPrivateEndpointStatus{ + ServiceID: "pe-service-id", + ServiceStatus: "AVAILABLE", + }, + }, + peClient: func() privateendpoint.PrivateEndpointService { + c := translation.NewPrivateEndpointServiceMock(t) + c.EXPECT().GetPrivateEndpoint(ctx, projectID, "AWS", "pe-service-id"). + Return(&privateendpoint.AWSService{}, nil) + + return c + }, + expectedResult: reconcile.Result{}, + }, + "failed to delete private endpoint": { + atlasPrivateEndpoint: &akov2.AtlasPrivateEndpoint{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pe1", + Namespace: "default", + DeletionTimestamp: &deletionTime, + Finalizers: []string{customresource.FinalizerLabel}, + }, + Spec: akov2.AtlasPrivateEndpointSpec{ + ExternalProject: &akov2.ExternalProjectReference{ + ID: projectID, + }, + LocalCredentialHolder: api.LocalCredentialHolder{}, + Provider: "AWS", + Region: "US_EAST_1", + }, + Status: status.AtlasPrivateEndpointStatus{ + ServiceID: "pe-service-id", + ServiceStatus: "AVAILABLE", + }, + }, + peClient: func() privateendpoint.PrivateEndpointService { + awsService := &privateendpoint.AWSService{ + CommonEndpointService: privateendpoint.CommonEndpointService{ + ID: "pe-service-id", + CloudRegion: "US_EAST_1", + ServiceStatus: "AVAILABLE", + Interfaces: privateendpoint.EndpointInterfaces{}, + }, + } + + c := translation.NewPrivateEndpointServiceMock(t) + c.EXPECT().GetPrivateEndpoint(ctx, projectID, "AWS", "pe-service-id"). + Return(awsService, nil) + c.EXPECT().DeleteEndpointService(ctx, projectID, "AWS", "pe-service-id"). + Return(errors.New("failed to delete private endpoint")) + + return c + }, + expectedResult: reconcile.Result{RequeueAfter: workflow.DefaultRetry}, + expectedConditions: []api.Condition{ + api.FalseCondition(api.ReadyType), + api.FalseCondition(api.PrivateEndpointServiceReady). + WithReason(string(workflow.PrivateEndpointFailedToDelete)). + WithMessageRegexp("failed to delete private endpoint"), + }, + }, + "private endpoint service is initiating": { + atlasPrivateEndpoint: &akov2.AtlasPrivateEndpoint{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pe1", + Namespace: "default", + }, + Spec: akov2.AtlasPrivateEndpointSpec{ + ExternalProject: &akov2.ExternalProjectReference{ + ID: projectID, + }, + LocalCredentialHolder: api.LocalCredentialHolder{}, + Provider: "AWS", + Region: "US_EAST_1", + }, + Status: status.AtlasPrivateEndpointStatus{ + ServiceID: "pe-service-id", + ServiceStatus: "INITIATING", + }, + }, + peClient: func() privateendpoint.PrivateEndpointService { + awsService := &privateendpoint.AWSService{ + CommonEndpointService: privateendpoint.CommonEndpointService{ + ID: "pe-service-id", + CloudRegion: "US_EAST_1", + ServiceStatus: "INITIATING", + Interfaces: privateendpoint.EndpointInterfaces{}, + }, + } + + c := translation.NewPrivateEndpointServiceMock(t) + c.EXPECT().GetPrivateEndpoint(ctx, projectID, "AWS", "pe-service-id"). + Return(awsService, nil) + + return c + }, + expectedResult: reconcile.Result{RequeueAfter: workflow.DefaultRetry}, + expectedConditions: []api.Condition{ + api.FalseCondition(api.ReadyType), + api.FalseCondition(api.PrivateEndpointServiceReady). + WithReason(string(workflow.PrivateEndpointServiceInitializing)). + WithMessageRegexp("Private Endpoint is being initialized"), + }, + }, + "private endpoint service is pending": { + atlasPrivateEndpoint: &akov2.AtlasPrivateEndpoint{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pe1", + Namespace: "default", + }, + Spec: akov2.AtlasPrivateEndpointSpec{ + ExternalProject: &akov2.ExternalProjectReference{ + ID: projectID, + }, + LocalCredentialHolder: api.LocalCredentialHolder{}, + Provider: "AWS", + Region: "US_EAST_1", + }, + Status: status.AtlasPrivateEndpointStatus{ + ServiceID: "pe-service-id", + ServiceStatus: "PENDING", + }, + }, + peClient: func() privateendpoint.PrivateEndpointService { + awsService := &privateendpoint.AWSService{ + CommonEndpointService: privateendpoint.CommonEndpointService{ + ID: "pe-service-id", + CloudRegion: "US_EAST_1", + ServiceStatus: "PENDING", + Interfaces: privateendpoint.EndpointInterfaces{}, + }, + } + + c := translation.NewPrivateEndpointServiceMock(t) + c.EXPECT().GetPrivateEndpoint(ctx, projectID, "AWS", "pe-service-id"). + Return(awsService, nil) + + return c + }, + expectedResult: reconcile.Result{RequeueAfter: workflow.DefaultRetry}, + expectedConditions: []api.Condition{ + api.FalseCondition(api.ReadyType), + api.FalseCondition(api.PrivateEndpointServiceReady). + WithReason(string(workflow.PrivateEndpointServiceInitializing)). + WithMessageRegexp("Private Endpoint is waiting for human action"), + }, + }, + "private endpoint service was rejected": { + atlasPrivateEndpoint: &akov2.AtlasPrivateEndpoint{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pe1", + Namespace: "default", + }, + Spec: akov2.AtlasPrivateEndpointSpec{ + ExternalProject: &akov2.ExternalProjectReference{ + ID: projectID, + }, + LocalCredentialHolder: api.LocalCredentialHolder{}, + Provider: "AWS", + Region: "US_EAST_1", + }, + Status: status.AtlasPrivateEndpointStatus{ + ServiceID: "pe-service-id", + ServiceStatus: "REJECTED", + }, + }, + peClient: func() privateendpoint.PrivateEndpointService { + awsService := &privateendpoint.AWSService{ + CommonEndpointService: privateendpoint.CommonEndpointService{ + ID: "pe-service-id", + CloudRegion: "US_EAST_1", + ServiceStatus: "REJECTED", + Error: "atlas could not connect the private endpoint", + Interfaces: privateendpoint.EndpointInterfaces{}, + }, + } + + c := translation.NewPrivateEndpointServiceMock(t) + c.EXPECT().GetPrivateEndpoint(ctx, projectID, "AWS", "pe-service-id"). + Return(awsService, nil) + + return c + }, + expectedResult: reconcile.Result{RequeueAfter: workflow.DefaultRetry}, + expectedConditions: []api.Condition{ + api.FalseCondition(api.ReadyType), + api.FalseCondition(api.PrivateEndpointServiceReady). + WithReason(string(workflow.PrivateEndpointServiceFailedToConfigure)). + WithMessageRegexp("atlas could not connect the private endpoint"), + }, + }, + "private endpoint service is being deleted": { + atlasPrivateEndpoint: &akov2.AtlasPrivateEndpoint{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pe1", + Namespace: "default", + }, + Spec: akov2.AtlasPrivateEndpointSpec{ + ExternalProject: &akov2.ExternalProjectReference{ + ID: projectID, + }, + LocalCredentialHolder: api.LocalCredentialHolder{}, + Provider: "AWS", + Region: "US_EAST_1", + }, + Status: status.AtlasPrivateEndpointStatus{ + ServiceID: "pe-service-id", + ServiceStatus: "DELETING", + }, + }, + peClient: func() privateendpoint.PrivateEndpointService { + awsService := &privateendpoint.AWSService{ + CommonEndpointService: privateendpoint.CommonEndpointService{ + ID: "pe-service-id", + CloudRegion: "US_EAST_1", + ServiceStatus: "DELETING", + Interfaces: privateendpoint.EndpointInterfaces{}, + }, + } + + c := translation.NewPrivateEndpointServiceMock(t) + c.EXPECT().GetPrivateEndpoint(ctx, projectID, "AWS", "pe-service-id"). + Return(awsService, nil) + + return c + }, + expectedResult: reconcile.Result{RequeueAfter: workflow.DefaultRetry}, + expectedConditions: []api.Condition{ + api.FalseCondition(api.ReadyType), + api.FalseCondition(api.PrivateEndpointServiceReady). + WithReason(string(workflow.PrivateEndpointServiceDeleting)). + WithMessageRegexp("Private Endpoint is being deleted"), + }, + }, + "private endpoint service is available": { + atlasPrivateEndpoint: &akov2.AtlasPrivateEndpoint{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pe1", + Namespace: "default", + }, + Spec: akov2.AtlasPrivateEndpointSpec{ + ExternalProject: &akov2.ExternalProjectReference{ + ID: projectID, + }, + LocalCredentialHolder: api.LocalCredentialHolder{}, + Provider: "AWS", + Region: "US_EAST_1", + }, + Status: status.AtlasPrivateEndpointStatus{ + ServiceID: "pe-service-id", + ServiceStatus: "AVAILABLE", + }, + }, + peClient: func() privateendpoint.PrivateEndpointService { + awsService := &privateendpoint.AWSService{ + CommonEndpointService: privateendpoint.CommonEndpointService{ + ID: "pe-service-id", + CloudRegion: "US_EAST_1", + ServiceStatus: "AVAILABLE", + Interfaces: privateendpoint.EndpointInterfaces{}, + }, + } + + c := translation.NewPrivateEndpointServiceMock(t) + c.EXPECT().GetPrivateEndpoint(ctx, projectID, "AWS", "pe-service-id"). + Return(awsService, nil) + + return c + }, + expectedResult: reconcile.Result{}, + expectedConditions: []api.Condition{ + api.TrueCondition(api.PrivateEndpointServiceReady), + api.FalseCondition(api.ReadyType), + api.FalseCondition(api.PrivateEndpointReady). + WithReason(string(workflow.PrivateEndpointConfigurationPending)). + WithMessageRegexp("waiting for private endpoint configuration from customer side"), + }, + }, + } + for name, tt := range tests { + t.Run(name, func(t *testing.T) { + fakeClient := fake.NewClientBuilder(). + WithScheme(testScheme). + WithObjects(tt.atlasPrivateEndpoint). + WithStatusSubresource(tt.atlasPrivateEndpoint). + Build() + + logger := zaptest.NewLogger(t).Sugar() + r := &AtlasPrivateEndpointReconciler{ + Client: fakeClient, + EventRecorder: record.NewFakeRecorder(10), + Log: logger, + privateEndpointService: tt.peClient(), + } + workflowCtx := workflow.Context{ + Context: ctx, + Log: logger, + } + + result, err := r.handlePrivateEndpointService(&workflowCtx, projectID, tt.atlasPrivateEndpoint) + assert.NoError(t, err) + assert.Equal(t, tt.expectedResult, result) + assert.True( + t, + cmp.Equal( + tt.expectedConditions, + workflowCtx.Conditions(), + cmpopts.IgnoreFields(api.Condition{}, "LastTransitionTime"), + ), + ) + }) + } +} + +func TestHandlePrivateEndpointInterfaces(t *testing.T) { + ctx := context.Background() + projectID := "project-id" + + testScheme := runtime.NewScheme() + require.NoError(t, akov2.AddToScheme(testScheme)) + + tests := map[string]struct { + atlasPrivateEndpoint *akov2.AtlasPrivateEndpoint + atlasPEService func() privateendpoint.EndpointService + peClient func() privateendpoint.PrivateEndpointService + expectedResult reconcile.Result + expectedConditions []api.Condition + }{ + "failed to create private endpoint interface": { + atlasPrivateEndpoint: &akov2.AtlasPrivateEndpoint{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pe1", + Namespace: "default", + }, + Spec: akov2.AtlasPrivateEndpointSpec{ + ExternalProject: &akov2.ExternalProjectReference{ + ID: projectID, + }, + LocalCredentialHolder: api.LocalCredentialHolder{}, + Provider: "AWS", + Region: "US_EAST_1", + AWSConfiguration: []akov2.AWSPrivateEndpointConfiguration{ + { + ID: "vpcpe-123456", + }, + }, + }, + Status: status.AtlasPrivateEndpointStatus{ + Common: api.Common{ + Conditions: []api.Condition{ + { + Type: api.ReadyType, + Status: corev1.ConditionFalse, + LastTransitionTime: metav1.Now(), + }, + { + Type: api.PrivateEndpointServiceReady, + Status: corev1.ConditionTrue, + LastTransitionTime: metav1.Now(), + }, + { + Type: api.PrivateEndpointReady, + Status: corev1.ConditionFalse, + Reason: string(workflow.PrivateEndpointConfigurationPending), + Message: "waiting for private endpoint configuration from customer side", + LastTransitionTime: metav1.Now(), + }, + }, + }, + ServiceID: "pe-service-id", + ServiceName: "aws/service/name", + ServiceStatus: "AVAILABLE", + }, + }, + atlasPEService: func() privateendpoint.EndpointService { + awsService := &privateendpoint.AWSService{ + CommonEndpointService: privateendpoint.CommonEndpointService{ + Interfaces: privateendpoint.EndpointInterfaces{}, + }, + } + + return awsService + }, + peClient: func() privateendpoint.PrivateEndpointService { + c := translation.NewPrivateEndpointServiceMock(t) + c.EXPECT(). + CreatePrivateEndpointInterface(ctx, projectID, "AWS", "pe-service-id", "", mock.AnythingOfType("*privateendpoint.AWSInterface")). + Return(nil, errors.New("failed to create private endpoint interface")) + + return c + }, + expectedResult: reconcile.Result{RequeueAfter: workflow.DefaultRetry}, + expectedConditions: []api.Condition{ + api.FalseCondition(api.ReadyType), + api.FalseCondition(api.PrivateEndpointReady). + WithReason(string(workflow.PrivateEndpointFailedToCreate)). + WithMessageRegexp("failed to create private endpoint interface"), + }, + }, + "create private endpoint interface": { + atlasPrivateEndpoint: &akov2.AtlasPrivateEndpoint{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pe1", + Namespace: "default", + }, + Spec: akov2.AtlasPrivateEndpointSpec{ + ExternalProject: &akov2.ExternalProjectReference{ + ID: projectID, + }, + LocalCredentialHolder: api.LocalCredentialHolder{}, + Provider: "GCP", + Region: "EUROPE_WEST_3", + GCPConfiguration: []akov2.GCPPrivateEndpointConfiguration{ + { + ProjectID: "customer-project-id1", + GroupName: "group-name1", + Endpoints: []akov2.GCPPrivateEndpoint{ + { + Name: "group-name1-pe1", + IP: "10.0.0.1", + }, + { + Name: "group-name1-pe2", + IP: "10.0.0.2", + }, + { + Name: "group-name1-pe3", + IP: "10.0.0.3", + }, + }, + }, + }, + }, + Status: status.AtlasPrivateEndpointStatus{ + Common: api.Common{ + Conditions: []api.Condition{ + { + Type: api.ReadyType, + Status: corev1.ConditionFalse, + LastTransitionTime: metav1.Now(), + }, + { + Type: api.PrivateEndpointServiceReady, + Status: corev1.ConditionTrue, + LastTransitionTime: metav1.Now(), + }, + { + Type: api.PrivateEndpointReady, + Status: corev1.ConditionFalse, + Reason: string(workflow.PrivateEndpointConfigurationPending), + Message: "waiting for private endpoint configuration from customer side", + LastTransitionTime: metav1.Now(), + }, + }, + }, + ServiceID: "pe-service-id", + ServiceAttachmentNames: []string{ + "atls/service/attachment/name/1", + "atls/service/attachment/name/2", + "atls/service/attachment/name/3", + }, + ServiceStatus: "AVAILABLE", + }, + }, + atlasPEService: func() privateendpoint.EndpointService { + awsService := &privateendpoint.AWSService{ + CommonEndpointService: privateendpoint.CommonEndpointService{ + Interfaces: privateendpoint.EndpointInterfaces{}, + }, + } + + return awsService + }, + peClient: func() privateendpoint.PrivateEndpointService { + c := translation.NewPrivateEndpointServiceMock(t) + c.EXPECT(). + CreatePrivateEndpointInterface( + ctx, + projectID, + "GCP", + "pe-service-id", + "customer-project-id1", + mock.AnythingOfType("*privateendpoint.GCPInterface"), + ). + Return(nil, nil) + + return c + }, + expectedResult: reconcile.Result{RequeueAfter: workflow.DefaultRetry}, + expectedConditions: []api.Condition{ + api.FalseCondition(api.ReadyType), + api.FalseCondition(api.PrivateEndpointReady). + WithReason(string(workflow.PrivateEndpointUpdating)). + WithMessageRegexp("Private Endpoints are being updated"), + }, + }, + "failed to configure private endpoint interface": { + atlasPrivateEndpoint: &akov2.AtlasPrivateEndpoint{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pe1", + Namespace: "default", + }, + Spec: akov2.AtlasPrivateEndpointSpec{ + ExternalProject: &akov2.ExternalProjectReference{ + ID: projectID, + }, + LocalCredentialHolder: api.LocalCredentialHolder{}, + Provider: "AWS", + Region: "US_EAST_1", + AWSConfiguration: []akov2.AWSPrivateEndpointConfiguration{ + { + ID: "vpcpe-123456", + }, + }, + }, + Status: status.AtlasPrivateEndpointStatus{ + Common: api.Common{ + Conditions: []api.Condition{ + { + Type: api.ReadyType, + Status: corev1.ConditionFalse, + LastTransitionTime: metav1.Now(), + }, + { + Type: api.PrivateEndpointServiceReady, + Status: corev1.ConditionTrue, + LastTransitionTime: metav1.Now(), + }, + { + Type: api.PrivateEndpointReady, + Status: corev1.ConditionFalse, + Reason: string(workflow.PrivateEndpointConfigurationPending), + Message: "waiting for private endpoint configuration from customer side", + LastTransitionTime: metav1.Now(), + }, + }, + }, + ServiceID: "pe-service-id", + ServiceName: "aws/service/name", + ServiceStatus: "AVAILABLE", + Endpoints: []status.EndpointInterfaceStatus{ + { + ID: "vpcpe-123456", + Status: "INITIATING", + }, + }, + }, + }, + atlasPEService: func() privateendpoint.EndpointService { + awsInterface := &privateendpoint.AWSInterface{ + CommonEndpointInterface: privateendpoint.CommonEndpointInterface{ + ID: "vpcpe-123456", + InterfaceStatus: "REJECTED", + Error: "failed to configure private endpoint interface", + }, + } + awsService := &privateendpoint.AWSService{ + CommonEndpointService: privateendpoint.CommonEndpointService{ + Interfaces: privateendpoint.EndpointInterfaces{awsInterface}, + }, + } + + return awsService + }, + peClient: func() privateendpoint.PrivateEndpointService { + c := translation.NewPrivateEndpointServiceMock(t) + + return c + }, + expectedResult: reconcile.Result{RequeueAfter: workflow.DefaultRetry}, + expectedConditions: []api.Condition{ + api.FalseCondition(api.ReadyType), + api.FalseCondition(api.PrivateEndpointReady). + WithReason(string(workflow.PrivateEndpointFailedToConfigure)). + WithMessageRegexp("failed to configure private endpoint interface"), + }, + }, + "failed to delete private endpoint interface": { + atlasPrivateEndpoint: &akov2.AtlasPrivateEndpoint{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pe1", + Namespace: "default", + }, + Spec: akov2.AtlasPrivateEndpointSpec{ + ExternalProject: &akov2.ExternalProjectReference{ + ID: projectID, + }, + LocalCredentialHolder: api.LocalCredentialHolder{}, + Provider: "AWS", + Region: "US_EAST_1", + }, + Status: status.AtlasPrivateEndpointStatus{ + Common: api.Common{ + Conditions: []api.Condition{ + { + Type: api.ReadyType, + Status: corev1.ConditionFalse, + LastTransitionTime: metav1.Now(), + }, + { + Type: api.PrivateEndpointServiceReady, + Status: corev1.ConditionTrue, + LastTransitionTime: metav1.Now(), + }, + { + Type: api.PrivateEndpointReady, + Status: corev1.ConditionFalse, + Reason: string(workflow.PrivateEndpointConfigurationPending), + Message: "waiting for private endpoint configuration from customer side", + LastTransitionTime: metav1.Now(), + }, + }, + }, + ServiceID: "pe-service-id", + ServiceName: "aws/service/name", + ServiceStatus: "AVAILABLE", + Endpoints: []status.EndpointInterfaceStatus{ + { + ID: "vpcpe-123456", + Status: "AVAILABLE", + }, + }, + }, + }, + atlasPEService: func() privateendpoint.EndpointService { + awsInterface := &privateendpoint.AWSInterface{ + CommonEndpointInterface: privateendpoint.CommonEndpointInterface{ + ID: "vpcpe-123456", + InterfaceStatus: "AVAILABLE", + }, + } + awsService := &privateendpoint.AWSService{ + CommonEndpointService: privateendpoint.CommonEndpointService{ + Interfaces: privateendpoint.EndpointInterfaces{awsInterface}, + }, + } + + return awsService + }, + peClient: func() privateendpoint.PrivateEndpointService { + c := translation.NewPrivateEndpointServiceMock(t) + c.EXPECT(). + DeleteEndpointInterface(ctx, projectID, "AWS", "pe-service-id", "vpcpe-123456"). + Return(errors.New("failed to delete private endpoint interface")) + + return c + }, + expectedResult: reconcile.Result{RequeueAfter: workflow.DefaultRetry}, + expectedConditions: []api.Condition{ + api.FalseCondition(api.ReadyType), + api.FalseCondition(api.PrivateEndpointReady). + WithReason(string(workflow.PrivateEndpointFailedToDelete)). + WithMessageRegexp("failed to delete private endpoint interface"), + }, + }, + "delete private endpoint interface": { + atlasPrivateEndpoint: &akov2.AtlasPrivateEndpoint{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pe1", + Namespace: "default", + }, + Spec: akov2.AtlasPrivateEndpointSpec{ + ExternalProject: &akov2.ExternalProjectReference{ + ID: projectID, + }, + LocalCredentialHolder: api.LocalCredentialHolder{}, + Provider: "AWS", + Region: "US_EAST_1", + }, + Status: status.AtlasPrivateEndpointStatus{ + Common: api.Common{ + Conditions: []api.Condition{ + { + Type: api.ReadyType, + Status: corev1.ConditionTrue, + LastTransitionTime: metav1.Now(), + }, + { + Type: api.PrivateEndpointServiceReady, + Status: corev1.ConditionTrue, + LastTransitionTime: metav1.Now(), + }, + { + Type: api.PrivateEndpointReady, + Status: corev1.ConditionTrue, + LastTransitionTime: metav1.Now(), + }, + }, + }, + ServiceID: "pe-service-id", + ServiceName: "aws/service/name", + ServiceStatus: "AVAILABLE", + Endpoints: []status.EndpointInterfaceStatus{ + { + ID: "vpcpe-123456", + Status: "AVAILABLE", + }, + }, + }, + }, + atlasPEService: func() privateendpoint.EndpointService { + awsInterface := &privateendpoint.AWSInterface{ + CommonEndpointInterface: privateendpoint.CommonEndpointInterface{ + ID: "vpcpe-123456", + InterfaceStatus: "AVAILABLE", + }, + } + awsService := &privateendpoint.AWSService{ + CommonEndpointService: privateendpoint.CommonEndpointService{ + Interfaces: privateendpoint.EndpointInterfaces{awsInterface}, + }, + } + + return awsService + }, + peClient: func() privateendpoint.PrivateEndpointService { + c := translation.NewPrivateEndpointServiceMock(t) + c.EXPECT(). + DeleteEndpointInterface(ctx, projectID, "AWS", "pe-service-id", "vpcpe-123456"). + Return(nil) + + return c + }, + expectedResult: reconcile.Result{RequeueAfter: workflow.DefaultRetry}, + expectedConditions: []api.Condition{ + api.FalseCondition(api.ReadyType), + api.FalseCondition(api.PrivateEndpointReady). + WithReason(string(workflow.PrivateEndpointUpdating)). + WithMessageRegexp("Private Endpoints are being updated"), + }, + }, + "private endpoints are ready": { + atlasPrivateEndpoint: &akov2.AtlasPrivateEndpoint{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pe1", + Namespace: "default", + }, + Spec: akov2.AtlasPrivateEndpointSpec{ + ExternalProject: &akov2.ExternalProjectReference{ + ID: projectID, + }, + LocalCredentialHolder: api.LocalCredentialHolder{}, + Provider: "AZURE", + Region: "GERMANY_NORTH", + AzureConfiguration: []akov2.AzurePrivateEndpointConfiguration{ + { + ID: "azure/resource/id", + IP: "10.0.0.2", + }, + }, + }, + Status: status.AtlasPrivateEndpointStatus{ + Common: api.Common{ + Conditions: []api.Condition{ + { + Type: api.ReadyType, + Status: corev1.ConditionTrue, + LastTransitionTime: metav1.Now(), + }, + { + Type: api.PrivateEndpointServiceReady, + Status: corev1.ConditionTrue, + LastTransitionTime: metav1.Now(), + }, + { + Type: api.PrivateEndpointReady, + Status: corev1.ConditionTrue, + LastTransitionTime: metav1.Now(), + }, + }, + }, + ServiceID: "pe-service-id", + ResourceID: "atlas/azure/resource/id", + ServiceStatus: "AVAILABLE", + Endpoints: []status.EndpointInterfaceStatus{ + { + ID: "azure/resource/id", + ConnectionName: "atlas-connection-name", + Status: "AVAILABLE", + }, + }, + }, + }, + atlasPEService: func() privateendpoint.EndpointService { + azureInterface := &privateendpoint.AzureInterface{ + CommonEndpointInterface: privateendpoint.CommonEndpointInterface{ + ID: "azure/resource/id", + InterfaceStatus: "AVAILABLE", + }, + IP: "10.0.0.2", + ConnectionName: "atlas-connection-name", + } + azureService := &privateendpoint.AzureService{ + CommonEndpointService: privateendpoint.CommonEndpointService{ + ID: "pe-service-id", + CloudRegion: "GERMANY_NORTH", + ServiceStatus: "AVAILABLE", + Interfaces: privateendpoint.EndpointInterfaces{azureInterface}, + }, + ServiceName: "atlas/azure/service/name", + ResourceID: "atlas/azure/resource/id", + } + + return azureService + }, + peClient: func() privateendpoint.PrivateEndpointService { + c := translation.NewPrivateEndpointServiceMock(t) + + return c + }, + expectedResult: reconcile.Result{}, + expectedConditions: []api.Condition{ + api.TrueCondition(api.PrivateEndpointServiceReady), + api.TrueCondition(api.PrivateEndpointReady), + api.TrueCondition(api.ReadyType), + }, + }, + } + for name, tt := range tests { + t.Run(name, func(t *testing.T) { + fakeClient := fake.NewClientBuilder(). + WithScheme(testScheme). + WithObjects(tt.atlasPrivateEndpoint). + WithStatusSubresource(tt.atlasPrivateEndpoint). + Build() + + logger := zaptest.NewLogger(t).Sugar() + r := &AtlasPrivateEndpointReconciler{ + Client: fakeClient, + EventRecorder: record.NewFakeRecorder(10), + Log: logger, + privateEndpointService: tt.peClient(), + } + workflowCtx := workflow.Context{ + Context: ctx, + Log: logger, + } + + akoPEService := privateendpoint.NewPrivateEndpoint(tt.atlasPrivateEndpoint) + result, err := r.handlePrivateEndpointInterface(&workflowCtx, projectID, tt.atlasPrivateEndpoint, akoPEService, tt.atlasPEService()) + assert.NoError(t, err) + assert.Equal(t, tt.expectedResult, result) + assert.True( + t, + cmp.Equal( + tt.expectedConditions, + workflowCtx.Conditions(), + cmpopts.IgnoreFields(api.Condition{}, "LastTransitionTime"), + ), + ) + }) + } +} + +func TestGetPrivateEndpointService(t *testing.T) { + ctx := context.Background() + projectID := "project-id" + + testScheme := runtime.NewScheme() + require.NoError(t, akov2.AddToScheme(testScheme)) + + tests := map[string]struct { + atlasPrivateEndpoint *akov2.AtlasPrivateEndpoint + peClient func() privateendpoint.PrivateEndpointService + expectedResult privateendpoint.EndpointService + expectedErr error + }{ + "failed to list private endpoint to match": { + atlasPrivateEndpoint: &akov2.AtlasPrivateEndpoint{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pe1", + Namespace: "default", + }, + Spec: akov2.AtlasPrivateEndpointSpec{ + ExternalProject: &akov2.ExternalProjectReference{ + ID: projectID, + }, + LocalCredentialHolder: api.LocalCredentialHolder{}, + Provider: "AWS", + Region: "US_EAST_1", + }, + }, + peClient: func() privateendpoint.PrivateEndpointService { + c := translation.NewPrivateEndpointServiceMock(t) + c.EXPECT().ListPrivateEndpoints(ctx, projectID, "AWS"). + Return(nil, errors.New("failed to list private endpoints")) + + return c + }, + expectedErr: errors.New("failed to list private endpoints"), + }, + "match private endpoint from the list": { + atlasPrivateEndpoint: &akov2.AtlasPrivateEndpoint{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pe1", + Namespace: "default", + }, + Spec: akov2.AtlasPrivateEndpointSpec{ + ExternalProject: &akov2.ExternalProjectReference{ + ID: projectID, + }, + LocalCredentialHolder: api.LocalCredentialHolder{}, + Provider: "AWS", + Region: "US_EAST_1", + }, + }, + peClient: func() privateendpoint.PrivateEndpointService { + c := translation.NewPrivateEndpointServiceMock(t) + c.EXPECT().ListPrivateEndpoints(ctx, projectID, "AWS"). + Return( + []privateendpoint.EndpointService{ + &privateendpoint.AWSService{ + CommonEndpointService: privateendpoint.CommonEndpointService{ + ID: "pe-service-id-1", + CloudRegion: "EU_CENTRAL_1", + ServiceStatus: "AVAILABLE", + }, + }, + &privateendpoint.AWSService{ + CommonEndpointService: privateendpoint.CommonEndpointService{ + ID: "pe-service-id-2", + CloudRegion: "US_EAST_1", + ServiceStatus: "AVAILABLE", + }, + }, + }, + nil, + ) + + return c + }, + expectedResult: &privateendpoint.AWSService{ + CommonEndpointService: privateendpoint.CommonEndpointService{ + ID: "pe-service-id-2", + CloudRegion: "US_EAST_1", + ServiceStatus: "AVAILABLE", + }, + }, + }, + } + for name, tt := range tests { + t.Run(name, func(t *testing.T) { + r := &AtlasPrivateEndpointReconciler{ + privateEndpointService: tt.peClient(), + } + + result, err := r.getPrivateEndpointService(ctx, projectID, tt.atlasPrivateEndpoint) + assert.Equal(t, tt.expectedErr, err) + assert.Equal(t, tt.expectedResult, result) + }) + } +} + +func TestDeletePrivateEndpoint(t *testing.T) { + ctx := context.Background() + projectID := "project-id" + + testScheme := runtime.NewScheme() + require.NoError(t, akov2.AddToScheme(testScheme)) + + tests := map[string]struct { + peService privateendpoint.EndpointService + peClient func() privateendpoint.PrivateEndpointService + expectedErr error + }{ + "failed to delete private endpoint interface": { + peService: &privateendpoint.AWSService{ + CommonEndpointService: privateendpoint.CommonEndpointService{ + ID: "pe-service-id", + CloudRegion: "US_EAST_1", + ServiceStatus: "AVAILABLE", + Interfaces: privateendpoint.EndpointInterfaces{ + &privateendpoint.AWSInterface{ + CommonEndpointInterface: privateendpoint.CommonEndpointInterface{ + ID: "vpcpe-123456", + InterfaceStatus: "AVAILABLE", + }, + }, + }, + }, + }, + peClient: func() privateendpoint.PrivateEndpointService { + c := translation.NewPrivateEndpointServiceMock(t) + c.EXPECT().DeleteEndpointInterface(ctx, projectID, "AWS", "pe-service-id", "vpcpe-123456"). + Return(errors.New("failed to delete private endpoint interface")) + + return c + }, + expectedErr: errors.New("failed to delete private endpoint interface"), + }, + "delete private endpoint interface": { + peService: &privateendpoint.AWSService{ + CommonEndpointService: privateendpoint.CommonEndpointService{ + ID: "pe-service-id", + CloudRegion: "US_EAST_1", + ServiceStatus: "AVAILABLE", + Interfaces: privateendpoint.EndpointInterfaces{ + &privateendpoint.AWSInterface{ + CommonEndpointInterface: privateendpoint.CommonEndpointInterface{ + ID: "vpcpe-123456", + InterfaceStatus: "AVAILABLE", + }, + }, + }, + }, + }, + peClient: func() privateendpoint.PrivateEndpointService { + c := translation.NewPrivateEndpointServiceMock(t) + c.EXPECT().DeleteEndpointInterface(ctx, projectID, "AWS", "pe-service-id", "vpcpe-123456"). + Return(nil) + c.EXPECT().GetPrivateEndpoint(ctx, projectID, "AWS", "pe-service-id"). + Return(&privateendpoint.AWSService{}, nil) + + return c + }, + }, + } + for name, tt := range tests { + t.Run(name, func(t *testing.T) { + r := &AtlasPrivateEndpointReconciler{ + privateEndpointService: tt.peClient(), + } + + _, err := r.deletePrivateEndpoint(ctx, projectID, tt.peService) + assert.Equal(t, tt.expectedErr, err) + }) + } +} diff --git a/pkg/controller/atlasproject/atlasproject_controller.go b/pkg/controller/atlasproject/atlasproject_controller.go index 7419f2f2d8..d7a1880d9b 100644 --- a/pkg/controller/atlasproject/atlasproject_controller.go +++ b/pkg/controller/atlasproject/atlasproject_controller.go @@ -99,6 +99,13 @@ func (r *AtlasProjectReconciler) Reconcile(ctx context.Context, req ctrl.Request return result.ReconcileResult(), nil } } + + err := customresource.ApplyLastConfigSkipped(ctx, atlasProject, r.Client) + if err != nil { + log.Errorw("Failed to apply last skipped config", "error", err) + return workflow.Terminate(workflow.Internal, err.Error()).ReconcileResult(), nil + } + return workflow.OK().ReconcileResult(), nil } diff --git a/pkg/controller/atlasproject/private_endpoint.go b/pkg/controller/atlasproject/private_endpoint.go index 9402f9f400..8e7c2a4dea 100644 --- a/pkg/controller/atlasproject/private_endpoint.go +++ b/pkg/controller/atlasproject/private_endpoint.go @@ -2,7 +2,9 @@ package atlasproject import ( "context" + "encoding/json" "errors" + "fmt" "net/http" "go.mongodb.org/atlas-sdk/v20231115008/admin" @@ -14,10 +16,24 @@ import ( akov2 "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/api/v1" "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/api/v1/provider" "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/api/v1/status" + "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/controller/customresource" "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/controller/workflow" ) func ensurePrivateEndpoint(workflowCtx *workflow.Context, project *akov2.AtlasProject) workflow.Result { + skipped, err := hasSkippedPrivateEndpointConfiguration(project) + if err != nil { + return workflow.Terminate(workflow.Internal, err.Error()) + } + + if skipped { + workflowCtx.EnsureStatusOption(status.AtlasProjectAddPrivateEndpointsOption(nil)) + workflowCtx.UnsetCondition(api.PrivateEndpointServiceReadyType) + workflowCtx.UnsetCondition(api.PrivateEndpointReadyType) + + return workflow.OK() + } + specPEs := project.Spec.DeepCopy().PrivateEndpoints atlasPEs, err := getAllPrivateEndpoints(workflowCtx.Context, workflowCtx.SdkClient, project.ID()) @@ -555,3 +571,17 @@ type intersectionPair struct { spec akov2.PrivateEndpoint atlas atlasPE } + +func hasSkippedPrivateEndpointConfiguration(atlasProject *akov2.AtlasProject) (bool, error) { + lastSkippedSpec := akov2.AtlasProjectSpec{} + lastSkippedSpecString, ok := atlasProject.Annotations[customresource.AnnotationLastSkippedConfiguration] + if ok { + if err := json.Unmarshal([]byte(lastSkippedSpecString), &lastSkippedSpec); err != nil { + return false, fmt.Errorf("failed to parse last skipped configuration: %w", err) + } + + return len(lastSkippedSpec.PrivateEndpoints) == 0, nil + } + + return false, nil +} diff --git a/pkg/controller/atlasproject/project.go b/pkg/controller/atlasproject/project.go index 291af67161..7ba0725a67 100644 --- a/pkg/controller/atlasproject/project.go +++ b/pkg/controller/atlasproject/project.go @@ -168,5 +168,21 @@ func (r *AtlasProjectReconciler) hasDependencies(ctx *workflow.Context, project return false, err } - return len(streamInstances.Items) > 0, nil + if len(streamInstances.Items) > 0 { + return true, nil + } + + privateEndpoints := &akov2.AtlasPrivateEndpointList{} + listOps = &client.ListOptions{ + FieldSelector: fields.OneTermEqualSelector( + indexer.AtlasPrivateEndpointByProjectIndex, + client.ObjectKeyFromObject(project).String(), + ), + } + err = r.Client.List(ctx.Context, privateEndpoints, listOps) + if err != nil { + return false, err + } + + return len(privateEndpoints.Items) > 0, nil } diff --git a/pkg/controller/atlasproject/project_test.go b/pkg/controller/atlasproject/project_test.go index 9fcb46564a..81d5d47ada 100644 --- a/pkg/controller/atlasproject/project_test.go +++ b/pkg/controller/atlasproject/project_test.go @@ -612,15 +612,13 @@ func TestHandleProject(t *testing.T) { require.NoError(t, akov2.AddToScheme(testScheme)) require.NoError(t, corev1.AddToScheme(testScheme)) instancesIndexer := indexer.NewAtlasStreamInstanceByProjectIndexer(zaptest.NewLogger(t)) + peIndexer := indexer.NewAtlasPrivateEndpointByProjectIndexer(zaptest.NewLogger(t)) k8sClient := fake.NewClientBuilder(). WithScheme(testScheme). WithObjects(tt.project). WithStatusSubresource(tt.project). - WithIndex( - instancesIndexer.Object(), - instancesIndexer.Name(), - instancesIndexer.Keys, - ). + WithIndex(instancesIndexer.Object(), instancesIndexer.Name(), instancesIndexer.Keys). + WithIndex(peIndexer.Object(), peIndexer.Name(), peIndexer.Keys). WithInterceptorFuncs(tt.interceptors). Build() logger := zaptest.NewLogger(t).Sugar() @@ -1070,15 +1068,13 @@ func TestDelete(t *testing.T) { require.NoError(t, akov2.AddToScheme(testScheme)) require.NoError(t, corev1.AddToScheme(testScheme)) instancesIndexer := indexer.NewAtlasStreamInstanceByProjectIndexer(zaptest.NewLogger(t)) + peIndexer := indexer.NewAtlasPrivateEndpointByProjectIndexer(zaptest.NewLogger(t)) k8sClient := fake.NewClientBuilder(). WithScheme(testScheme). WithObjects(tt.objects...). WithStatusSubresource(tt.objects...). - WithIndex( - instancesIndexer.Object(), - instancesIndexer.Name(), - instancesIndexer.Keys, - ). + WithIndex(instancesIndexer.Object(), instancesIndexer.Name(), instancesIndexer.Keys). + WithIndex(peIndexer.Object(), peIndexer.Name(), peIndexer.Keys). WithInterceptorFuncs(tt.interceptors). Build() logger := zaptest.NewLogger(t).Sugar() @@ -1155,17 +1151,16 @@ func TestHasDependencies(t *testing.T) { Namespace: "default", }, } - instanceIndexer := indexer.NewAtlasStreamInstanceByProjectIndexer(zaptest.NewLogger(t)) + logger := zaptest.NewLogger(t) + instanceIndexer := indexer.NewAtlasStreamInstanceByProjectIndexer(logger) + peIndexer := indexer.NewAtlasPrivateEndpointByProjectIndexer(logger) testScheme := runtime.NewScheme() require.NoError(t, akov2.AddToScheme(testScheme)) k8sClient := fake.NewClientBuilder(). WithScheme(testScheme). WithObjects(p). - WithIndex( - instanceIndexer.Object(), - instanceIndexer.Name(), - instanceIndexer.Keys, - ). + WithIndex(instanceIndexer.Object(), instanceIndexer.Name(), instanceIndexer.Keys). + WithIndex(peIndexer.Object(), peIndexer.Name(), peIndexer.Keys). Build() reconciler := &AtlasProjectReconciler{ Client: k8sClient, @@ -1179,7 +1174,7 @@ func TestHasDependencies(t *testing.T) { assert.False(t, ok) }) - t.Run("should return true when project has dependencies", func(t *testing.T) { + t.Run("should return true when project has streams as dependencies", func(t *testing.T) { p := &akov2.AtlasProject{ ObjectMeta: metav1.ObjectMeta{ Name: "my-project", @@ -1198,17 +1193,58 @@ func TestHasDependencies(t *testing.T) { }, }, } - instanceIndexer := indexer.NewAtlasStreamInstanceByProjectIndexer(zaptest.NewLogger(t)) + logger := zaptest.NewLogger(t) + instanceIndexer := indexer.NewAtlasStreamInstanceByProjectIndexer(logger) + peIndexer := indexer.NewAtlasPrivateEndpointByProjectIndexer(logger) testScheme := runtime.NewScheme() require.NoError(t, akov2.AddToScheme(testScheme)) k8sClient := fake.NewClientBuilder(). WithScheme(testScheme). WithObjects(p, streamsInstance). - WithIndex( - instanceIndexer.Object(), - instanceIndexer.Name(), - instanceIndexer.Keys, - ). + WithIndex(instanceIndexer.Object(), instanceIndexer.Name(), instanceIndexer.Keys). + WithIndex(peIndexer.Object(), peIndexer.Name(), peIndexer.Keys). + Build() + reconciler := &AtlasProjectReconciler{ + Client: k8sClient, + } + ctx := &workflow.Context{ + Context: context.Background(), + } + + ok, err := reconciler.hasDependencies(ctx, p) + require.NoError(t, err) + assert.True(t, ok) + }) + + t.Run("should return true when project has private endpoints as dependencies", func(t *testing.T) { + p := &akov2.AtlasProject{ + ObjectMeta: metav1.ObjectMeta{ + Name: "my-project", + Namespace: "default", + }, + } + pe := &akov2.AtlasPrivateEndpoint{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pe-0", + Namespace: "default", + }, + Spec: akov2.AtlasPrivateEndpointSpec{ + Project: &common.ResourceRefNamespaced{ + Name: "my-project", + Namespace: "default", + }, + }, + } + logger := zaptest.NewLogger(t) + instanceIndexer := indexer.NewAtlasStreamInstanceByProjectIndexer(logger) + peIndexer := indexer.NewAtlasPrivateEndpointByProjectIndexer(logger) + testScheme := runtime.NewScheme() + require.NoError(t, akov2.AddToScheme(testScheme)) + k8sClient := fake.NewClientBuilder(). + WithScheme(testScheme). + WithObjects(p, pe). + WithIndex(instanceIndexer.Object(), instanceIndexer.Name(), instanceIndexer.Keys). + WithIndex(peIndexer.Object(), peIndexer.Name(), peIndexer.Keys). Build() reconciler := &AtlasProjectReconciler{ Client: k8sClient, diff --git a/pkg/controller/customresource/protection.go b/pkg/controller/customresource/protection.go index 491611c802..5d07b026ec 100644 --- a/pkg/controller/customresource/protection.go +++ b/pkg/controller/customresource/protection.go @@ -12,6 +12,7 @@ import ( const ( AnnotationLastAppliedConfiguration = "mongodb.com/last-applied-configuration" + AnnotationLastSkippedConfiguration = "mongodb.com/last-skipped-configuration" ) type OperatorChecker func(resource api.AtlasCustomResource) (bool, error) @@ -40,6 +41,14 @@ func IsOwner(resource api.AtlasCustomResource, protectionFlag bool, operatorChec } func ApplyLastConfigApplied(ctx context.Context, resource api.AtlasCustomResource, k8sClient client.Client) error { + return applyLastSpec(ctx, resource, k8sClient, AnnotationLastAppliedConfiguration) +} + +func ApplyLastConfigSkipped(ctx context.Context, resource api.AtlasCustomResource, k8sClient client.Client) error { + return applyLastSpec(ctx, resource, k8sClient, AnnotationLastSkippedConfiguration) +} + +func applyLastSpec(ctx context.Context, resource api.AtlasCustomResource, k8sClient client.Client, annotationKey string) error { uObj, err := runtime.DefaultUnstructuredConverter.ToUnstructured(resource) if err != nil { return err @@ -58,7 +67,7 @@ func ApplyLastConfigApplied(ctx context.Context, resource api.AtlasCustomResourc annotations = map[string]string{} } - annotations[AnnotationLastAppliedConfiguration] = string(js) + annotations[annotationKey] = string(js) resourceCopy.SetAnnotations(annotations) err = k8sClient.Patch(ctx, resourceCopy, client.MergeFrom(resource)) if err != nil { diff --git a/pkg/controller/customresource/protection_test.go b/pkg/controller/customresource/protection_test.go index 7bea4583b4..45beaef7bd 100644 --- a/pkg/controller/customresource/protection_test.go +++ b/pkg/controller/customresource/protection_test.go @@ -153,6 +153,22 @@ func TestApplyLastConfigApplied(t *testing.T) { assert.Equal(t, expectedConfig, annot[customresource.AnnotationLastAppliedConfiguration]) } +func TestApplyLastConfigSkipped(t *testing.T) { + resource := sampleResource() + resource.Name = "foo" + resource.Spec.Username = "test-user" + + scheme := runtime.NewScheme() + utilruntime.Must(akov2.AddToScheme(scheme)) + c := fake.NewClientBuilder().WithObjects(resource).WithScheme(scheme).Build() + assert.NoError(t, customresource.ApplyLastConfigSkipped(context.Background(), resource, c)) + + annot := resource.GetAnnotations() + assert.NotEmpty(t, annot) + expectedConfig := `{"roles":null,"username":"test-user"}` + assert.Equal(t, expectedConfig, annot[customresource.AnnotationLastSkippedConfiguration]) +} + func TestIsResourceManagedByOperator(t *testing.T) { testCases := []struct { title string diff --git a/pkg/controller/workflow/reason.go b/pkg/controller/workflow/reason.go index adccb2f385..3df9c7be52 100644 --- a/pkg/controller/workflow/reason.go +++ b/pkg/controller/workflow/reason.go @@ -131,3 +131,16 @@ const ( StreamConnectionNotRemoved ConditionReason = "StreamConnectionNotRemoved" StreamConnectionNotUpdated ConditionReason = "StreamConnectionNotUpdated" ) + +const ( + PrivateEndpointServiceCreated ConditionReason = "PrivateEndpointServiceCreated" + PrivateEndpointServiceFailedToCreate ConditionReason = "PrivateEndpointServiceFailedToCreate" + PrivateEndpointServiceFailedToConfigure ConditionReason = "PrivateEndpointServiceFailedToConfigure" + PrivateEndpointServiceInitializing ConditionReason = "PrivateEndpointServiceInitializing" + PrivateEndpointServiceDeleting ConditionReason = "PrivateEndpointServiceDeleting" + PrivateEndpointFailedToCreate ConditionReason = "PrivateEndpointFailedToCreate" + PrivateEndpointUpdating ConditionReason = "PrivateEndpointUpdating" + PrivateEndpointConfigurationPending ConditionReason = "PrivateEndpointConfigurationPending" + PrivateEndpointFailedToConfigure ConditionReason = "PrivateEndpointFailedToConfigure" + PrivateEndpointFailedToDelete ConditionReason = "PrivateEndpointFailedToDelete" +) diff --git a/pkg/indexer/atlasprivateendpointcredentials.go b/pkg/indexer/atlasprivateendpointcredentials.go new file mode 100644 index 0000000000..a6df75e703 --- /dev/null +++ b/pkg/indexer/atlasprivateendpointcredentials.go @@ -0,0 +1,24 @@ +package indexer + +import ( + "go.uber.org/zap" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + + akov2 "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/api/v1" +) + +const ( + AtlasPrivateEndpointCredentialsIndex = "atlasprivateendpoint.credentials" +) + +func NewAtlasPrivateEndpointByCredentialIndexer(logger *zap.Logger) *LocalCredentialIndexer { + return NewLocalCredentialsIndexer(AtlasPrivateEndpointCredentialsIndex, &akov2.AtlasPrivateEndpoint{}, logger) +} + +func PrivateEndpointRequests(list *akov2.AtlasPrivateEndpointList) []reconcile.Request { + requests := make([]reconcile.Request, 0, len(list.Items)) + for _, item := range list.Items { + requests = append(requests, toRequest(&item)) + } + return requests +} diff --git a/pkg/indexer/atlasprivateendpoints.go b/pkg/indexer/atlasprivateendpoints.go new file mode 100644 index 0000000000..741d008c84 --- /dev/null +++ b/pkg/indexer/atlasprivateendpoints.go @@ -0,0 +1,50 @@ +package indexer + +import ( + "go.uber.org/zap" + "sigs.k8s.io/controller-runtime/pkg/client" + + akov2 "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/api/v1" +) + +const ( + AtlasPrivateEndpointByProjectIndex = "atlasprivateendpoint.spec.projectRef" +) + +type AtlasPrivateEndpointByProjectIndexer struct { + logger *zap.SugaredLogger +} + +func NewAtlasPrivateEndpointByProjectIndexer(logger *zap.Logger) *AtlasPrivateEndpointByProjectIndexer { + return &AtlasPrivateEndpointByProjectIndexer{ + logger: logger.Named(AtlasPrivateEndpointByProjectIndex).Sugar(), + } +} + +func (*AtlasPrivateEndpointByProjectIndexer) Object() client.Object { + return &akov2.AtlasPrivateEndpoint{} +} + +func (*AtlasPrivateEndpointByProjectIndexer) Name() string { + return AtlasPrivateEndpointByProjectIndex +} + +func (a *AtlasPrivateEndpointByProjectIndexer) Keys(object client.Object) []string { + pe, ok := object.(*akov2.AtlasPrivateEndpoint) + if !ok { + a.logger.Errorf("expected *akov2.AtlasPrivateEndpoint but got %T", object) + return nil + } + + if pe.Spec.Project == nil { + return nil + } + + if pe.Spec.Project.Name == "" { + return nil + } + + key := pe.Spec.Project.GetObject(pe.GetNamespace()) + + return []string{key.String()} +} diff --git a/pkg/indexer/atlasprivateendpoints_test.go b/pkg/indexer/atlasprivateendpoints_test.go new file mode 100644 index 0000000000..1a6af30d2d --- /dev/null +++ b/pkg/indexer/atlasprivateendpoints_test.go @@ -0,0 +1,49 @@ +package indexer + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "go.uber.org/zap/zaptest" + + akov2 "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/api/v1" + "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/api/v1/common" +) + +func TestAtlasPrivateEndpointsByProjectIndices(t *testing.T) { + t.Run("should return nil when instance has no project associated to it", func(t *testing.T) { + pe := &akov2.AtlasPrivateEndpoint{ + Spec: akov2.AtlasPrivateEndpointSpec{ + Provider: "AWS", + Region: "US_EAST_1", + }, + } + + indexer := NewAtlasPrivateEndpointByProjectIndexer(zaptest.NewLogger(t)) + keys := indexer.Keys(pe) + assert.Nil(t, keys) + }) + + t.Run("should return indexes slice when instance has project associated to it", func(t *testing.T) { + pe := &akov2.AtlasPrivateEndpoint{ + Spec: akov2.AtlasPrivateEndpointSpec{ + Provider: "AWS", + Region: "US_EAST_1", + Project: &common.ResourceRefNamespaced{ + Name: "project-1", + Namespace: "default", + }, + }, + } + + indexer := NewAtlasPrivateEndpointByProjectIndexer(zaptest.NewLogger(t)) + keys := indexer.Keys(pe) + assert.Equal( + t, + []string{ + "default/project-1", + }, + keys, + ) + }) +} diff --git a/pkg/indexer/indexer.go b/pkg/indexer/indexer.go index 4fef3f28d5..2e5a7cb4f7 100644 --- a/pkg/indexer/indexer.go +++ b/pkg/indexer/indexer.go @@ -36,6 +36,8 @@ func RegisterAll(ctx context.Context, mgr manager.Manager, logger *zap.Logger) e NewAtlasDeploymentByCredentialIndexer(logger), NewAtlasDatabaseUserByProjectIndexer(ctx, mgr.GetClient(), logger), NewAtlasDataFederationByProjectIndexer(logger), + NewAtlasPrivateEndpointByCredentialIndexer(logger), + NewAtlasPrivateEndpointByProjectIndexer(logger), ) } diff --git a/pkg/operator/builder.go b/pkg/operator/builder.go index eb41669df0..a2dad461c8 100644 --- a/pkg/operator/builder.go +++ b/pkg/operator/builder.go @@ -7,6 +7,8 @@ import ( "os" "time" + "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/controller/atlasprivateendpoint" + "go.uber.org/zap" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/labels" @@ -321,6 +323,17 @@ func (b *Builder) Build(ctx context.Context) (manager.Manager, error) { return nil, fmt.Errorf("unable to create controller AtlasBackupCompliancePolicy: %w", err) } + peReconciler := atlasprivateendpoint.NewAtlasPrivateEndpointReconciler( + mgr, + b.predicates, + b.atlasProvider, + b.deletionProtection, + b.logger, + ) + if err = peReconciler.SetupWithManager(mgr, b.skipNameValidation); err != nil { + return nil, fmt.Errorf("unable to create controller AtlasPrivateEndpoint: %w", err) + } + return mgr, nil } diff --git a/test/e2e/privateendpoint_test.go b/test/e2e/privateendpoint_test.go new file mode 100644 index 0000000000..cc70b60995 --- /dev/null +++ b/test/e2e/privateendpoint_test.go @@ -0,0 +1,620 @@ +package e2e_test + +import ( + "fmt" + "time" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "sigs.k8s.io/controller-runtime/pkg/client" + + "github.com/mongodb/mongodb-atlas-kubernetes/v2/internal/pointer" + "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/api" + akov2 "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/api/v1" + "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/api/v1/common" + "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/api/v1/provider" + "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/api/v1/status" + "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/controller/customresource" + "github.com/mongodb/mongodb-atlas-kubernetes/v2/test/helper/conditions" + "github.com/mongodb/mongodb-atlas-kubernetes/v2/test/helper/e2e/actions" + "github.com/mongodb/mongodb-atlas-kubernetes/v2/test/helper/e2e/actions/cloud" + "github.com/mongodb/mongodb-atlas-kubernetes/v2/test/helper/e2e/data" + "github.com/mongodb/mongodb-atlas-kubernetes/v2/test/helper/e2e/model" + "github.com/mongodb/mongodb-atlas-kubernetes/v2/test/helper/resources" +) + +var _ = Describe("Private Endpoints", Label("private-endpoint"), func() { + var testData *model.TestDataProvider + + _ = BeforeEach(OncePerOrdered, func() { + checkUpAWSEnvironment() + checkUpAzureEnvironment() + checkNSetUpGCPEnvironment() + }) + + _ = AfterEach(func() { + GinkgoWriter.Println() + GinkgoWriter.Println("===============================================") + GinkgoWriter.Println("Operator namespace: " + testData.Resources.Namespace) + GinkgoWriter.Println("===============================================") + if CurrentSpecReport().Failed() { + Expect(actions.SaveProjectsToFile(testData.Context, testData.K8SClient, testData.Resources.Namespace)).Should(Succeed()) + } + By("Delete Project and cluster resources", func() { + actions.DeleteTestDataProject(testData) + actions.AfterEachFinalCleanup([]model.TestDataProvider{*testData}) + }) + }) + + DescribeTable( + "Configure private endpoint for all supported cloud provider", + func(test *model.TestDataProvider, pe *akov2.AtlasPrivateEndpoint) { + var privateEndpointDetails *cloud.PrivateEndpointDetails + + testData = test + actions.ProjectCreationFlow(test) + + By("Referring to a project", func() { + pe.Namespace = test.Resources.Namespace + pe.Spec.Project = &common.ResourceRefNamespaced{ + Name: test.Project.Name, + Namespace: test.Project.Namespace, + } + }) + + By("Creating private endpoint", func() { + Expect(test.K8SClient.Create(test.Context, pe)).To(Succeed()) + + Eventually(func(g Gomega) { + g.Expect(testData.K8SClient.Get(test.Context, client.ObjectKeyFromObject(pe), pe)).To(Succeed()) + g.Expect(pe.Status.ServiceStatus).To(Equal("AVAILABLE")) + g.Expect(resources.CheckCondition(testData.K8SClient, pe, api.TrueCondition(api.PrivateEndpointServiceReady))).Should(BeTrue()) + }).WithTimeout(15 * time.Minute).WithPolling(10 * time.Second).Should(Succeed()) + }) + + By("Configuring external network", func() { + Expect(testData.K8SClient.Get(test.Context, client.ObjectKeyFromObject(pe), pe)).To(Succeed()) + action, err := prepareProviderAction() + Expect(err).To(BeNil()) + + switch pe.Spec.Provider { + case "AWS": + awsConfig := cloud.AWSConfig{ + Region: "eu-central-1", + VPC: fmt.Sprintf("%s-%s", pe.Name, test.Resources.TestID), + EnableCleanup: true, + } + + Expect(action.SetupNetwork(provider.ProviderName(pe.Spec.Provider), cloud.WithAWSConfig(&awsConfig))).ToNot(BeEmpty()) + privateEndpointDetails = action.SetupPrivateEndpoint(&cloud.AWSPrivateEndpointRequest{ + ID: "aws-e2e-pe", + Region: "eu-central-1", + ServiceName: pe.Status.ServiceName, + }) + case "AZURE": + azureConfig := cloud.AzureConfig{ + Region: "northeurope", + VPC: fmt.Sprintf("%s-%s", pe.Name, test.Resources.TestID), + EnableCleanup: true, + } + + Expect(action.SetupNetwork(provider.ProviderName(pe.Spec.Provider), cloud.WithAzureConfig(&azureConfig))).ToNot(BeEmpty()) + privateEndpointDetails = action.SetupPrivateEndpoint(&cloud.AzurePrivateEndpointRequest{ + ID: "azure-e2e-pe", + Region: "northeurope", + ServiceResourceID: pe.Status.ResourceID, + SubnetName: cloud.Subnet1Name, + }) + case "GCP": + gcpConfig := cloud.GCPConfig{ + Region: "europe-west3", + VPC: fmt.Sprintf("%s-%s", pe.Name, test.Resources.TestID), + EnableCleanup: true, + } + + Expect(action.SetupNetwork(provider.ProviderName(pe.Spec.Provider), cloud.WithGCPConfig(&gcpConfig))).ToNot(BeEmpty()) + privateEndpointDetails = action.SetupPrivateEndpoint(&cloud.GCPPrivateEndpointRequest{ + ID: fmt.Sprintf("%s-%s", pe.Name, test.Resources.TestID), + Region: "europe-west3", + Targets: pe.Status.ServiceAttachmentNames, + SubnetName: cloud.Subnet1Name, + }) + } + }) + + By("Configuring private endpoint with external network details", func() { + Expect(testData.K8SClient.Get(test.Context, client.ObjectKeyFromObject(pe), pe)).To(Succeed()) + + switch pe.Spec.Provider { + case "AWS": + pe.Spec.AWSConfiguration = []akov2.AWSPrivateEndpointConfiguration{ + { + ID: privateEndpointDetails.ID, + }, + } + case "AZURE": + pe.Spec.AzureConfiguration = []akov2.AzurePrivateEndpointConfiguration{ + { + ID: privateEndpointDetails.ID, + IP: privateEndpointDetails.IP, + }, + } + case "GCP": + gcpEndpoints := make([]akov2.GCPPrivateEndpoint, 0, len(privateEndpointDetails.Endpoints)) + for _, ep := range privateEndpointDetails.Endpoints { + gcpEndpoints = append( + gcpEndpoints, + akov2.GCPPrivateEndpoint{ + Name: ep.Name, + IP: ep.IP, + }, + ) + } + pe.Spec.GCPConfiguration = []akov2.GCPPrivateEndpointConfiguration{ + { + ProjectID: privateEndpointDetails.GCPProjectID, + GroupName: privateEndpointDetails.EndpointGroupName, + Endpoints: gcpEndpoints, + }, + } + } + + Expect(test.K8SClient.Update(test.Context, pe)).To(Succeed()) + Eventually(func(g Gomega) { //nolint:dupl + g.Expect(testData.K8SClient.Get(test.Context, client.ObjectKeyFromObject(pe), pe)).To(Succeed()) + g.Expect(pe.Status.ServiceStatus).To(Equal("AVAILABLE")) + g.Expect(resources.CheckCondition(testData.K8SClient, pe, api.TrueCondition(api.PrivateEndpointServiceReady))).Should(BeTrue()) + for _, eStatus := range pe.Status.Endpoints { + g.Expect(eStatus.Status).To(Equal("AVAILABLE")) + } + g.Expect(resources.CheckCondition(testData.K8SClient, pe, api.TrueCondition(api.PrivateEndpointReady))).Should(BeTrue()) + g.Expect(resources.CheckCondition(testData.K8SClient, pe, api.TrueCondition(api.ReadyType))).To(BeTrue()) + }).WithTimeout(15 * time.Minute).WithPolling(10 * time.Second).Should(Succeed()) + }) + + By("Removing private endpoint", func() { + Expect(test.K8SClient.Delete(test.Context, pe)).To(Succeed()) + + Eventually(func(g Gomega) { + g.Expect(testData.K8SClient.Get(test.Context, client.ObjectKeyFromObject(pe), pe)).ShouldNot(Succeed()) + }).WithTimeout(15 * time.Minute).WithPolling(20 * time.Second).Should(Succeed()) + }) + }, + Entry( + "Configure AWS private endpoint", + Label("aws-private-endpoint"), + model.DataProvider( + "aws-pe-1", + model.NewEmptyAtlasKeyType().UseDefaultFullAccess(), + 40000, + []func(*model.TestDataProvider){}, + ).WithProject(data.DefaultProject()), + &akov2.AtlasPrivateEndpoint{ + ObjectMeta: metav1.ObjectMeta{ + Name: "aws-pe-1", + }, + Spec: akov2.AtlasPrivateEndpointSpec{ + Provider: "AWS", + Region: "EU_CENTRAL_1", + }, + }, + ), + Entry( + "Configure Azure private endpoint", + Label("azure-private-endpoint"), + model.DataProvider( + "azure-pe-1", + model.NewEmptyAtlasKeyType().UseDefaultFullAccess(), + 40000, + []func(*model.TestDataProvider){}, + ).WithProject(data.DefaultProject()), + &akov2.AtlasPrivateEndpoint{ + ObjectMeta: metav1.ObjectMeta{ + Name: "azure-pe-1", + }, + Spec: akov2.AtlasPrivateEndpointSpec{ + Provider: "AZURE", + Region: "EUROPE_NORTH", + }, + }, + ), + Entry( + "Configure GCP private endpoint", + Label("gcp-private-endpoint"), + model.DataProvider( + "gcp-pe-1", + model.NewEmptyAtlasKeyType().UseDefaultFullAccess(), + 40000, + []func(*model.TestDataProvider){}, + ).WithProject(data.DefaultProject()), + &akov2.AtlasPrivateEndpoint{ + ObjectMeta: metav1.ObjectMeta{ + Name: "gcp-pe-1", + }, + Spec: akov2.AtlasPrivateEndpointSpec{ + Provider: "GCP", + Region: "EUROPE_WEST_3", + }, + }, + ), + ) +}) + +var _ = Describe("Migrate private endpoints from sub-resources to separate custom resources", Label("private-endpoint"), func() { + var testData *model.TestDataProvider + var awsPE *akov2.AtlasPrivateEndpoint + var azurePE *akov2.AtlasPrivateEndpoint + var gcpPE *akov2.AtlasPrivateEndpoint + privateEndpointDetails := map[string]*cloud.PrivateEndpointDetails{} + + _ = BeforeEach(func() { + checkUpAWSEnvironment() + checkUpAzureEnvironment() + checkNSetUpGCPEnvironment() + }) + + _ = AfterEach(func() { + GinkgoWriter.Println() + GinkgoWriter.Println("===============================================") + GinkgoWriter.Println("Operator namespace: " + testData.Resources.Namespace) + GinkgoWriter.Println("===============================================") + if CurrentSpecReport().Failed() { + Expect(actions.SaveProjectsToFile(testData.Context, testData.K8SClient, testData.Resources.Namespace)).Should(Succeed()) + } + By("Delete Project and cluster resources", func() { + actions.DeleteTestDataProject(testData) + actions.AfterEachFinalCleanup([]model.TestDataProvider{*testData}) + }) + }) + + It("Should migrate a private endpoint configured in a project as sub-resource to a separate custom resource", func() { + By("Setting up project", func() { + testData = model.DataProvider( + "project-with-private-endpoint", + model.NewEmptyAtlasKeyType().UseDefaultFullAccess(), + 40000, + []func(*model.TestDataProvider){}, + ).WithProject(data.DefaultProject()) + + actions.ProjectCreationFlow(testData) + }) + + By("Configuring a private endpoint as a sub-resource", func() { + By("Setting up the private endpoint service", func() { + Expect(testData.K8SClient.Get(testData.Context, client.ObjectKeyFromObject(testData.Project), testData.Project)).To(Succeed()) + + testData.Project.Spec.PrivateEndpoints = []akov2.PrivateEndpoint{ + { + Provider: "AWS", + Region: "EU_CENTRAL_1", + }, + { + Provider: "AZURE", + Region: "EUROPE_NORTH", + }, + { + Provider: "GCP", + Region: "EUROPE_WEST_3", + }, + } + + Expect(testData.K8SClient.Update(testData.Context, testData.Project)).To(Succeed()) + Eventually(func(g Gomega) { + g.Expect(testData.K8SClient.Get(testData.Context, client.ObjectKeyFromObject(testData.Project), testData.Project)).To(Succeed()) + g.Expect(testData.Project.Status.Conditions).To(ContainElement(conditions.MatchCondition(api.TrueCondition(api.PrivateEndpointServiceReadyType)))) + }).WithTimeout(15 * time.Minute).WithPolling(10 * time.Second).Should(Succeed()) + }) + + By("Configuring external network", func() { + Expect(testData.K8SClient.Get(testData.Context, client.ObjectKeyFromObject(testData.Project), testData.Project)).To(Succeed()) + action, err := prepareProviderAction() + Expect(err).To(BeNil()) + + for _, pe := range testData.Project.Spec.PrivateEndpoints { + peStatus := statusForProvider(testData.Project.Status.PrivateEndpoints, pe.Provider) + Expect(peStatus).ToNot(BeNil()) + + switch pe.Provider { + case "AWS": + awsConfig := cloud.AWSConfig{ + Region: "eu-central-1", + VPC: fmt.Sprintf("pe-migration-aws-%s", testData.Resources.TestID), + EnableCleanup: true, + } + + Expect(action.SetupNetwork(pe.Provider, cloud.WithAWSConfig(&awsConfig))).ToNot(BeEmpty()) + privateEndpointDetails[string(pe.Provider)] = action.SetupPrivateEndpoint(&cloud.AWSPrivateEndpointRequest{ + ID: "aws-e2e-pe", + Region: "eu-central-1", + ServiceName: peStatus.ServiceName, + }) + case "AZURE": + azureConfig := cloud.AzureConfig{ + Region: "northeurope", + VPC: fmt.Sprintf("pe-migration-azure-%s", testData.Resources.TestID), + EnableCleanup: true, + } + + Expect(action.SetupNetwork(pe.Provider, cloud.WithAzureConfig(&azureConfig))).ToNot(BeEmpty()) + privateEndpointDetails[string(pe.Provider)] = action.SetupPrivateEndpoint(&cloud.AzurePrivateEndpointRequest{ + ID: "azure-e2e-pe", + Region: "northeurope", + ServiceResourceID: peStatus.ServiceResourceID, + SubnetName: cloud.Subnet1Name, + }) + case "GCP": + gcpConfig := cloud.GCPConfig{ + Region: "europe-west3", + VPC: fmt.Sprintf("pe-migration-gcp-%s", testData.Resources.TestID), + EnableCleanup: true, + } + + Expect(action.SetupNetwork(pe.Provider, cloud.WithGCPConfig(&gcpConfig))).ToNot(BeEmpty()) + privateEndpointDetails[string(pe.Provider)] = action.SetupPrivateEndpoint(&cloud.GCPPrivateEndpointRequest{ + ID: fmt.Sprintf("pe-migration-gcp-%s", testData.Resources.TestID), + Region: "europe-west3", + Targets: peStatus.ServiceAttachmentNames, + SubnetName: cloud.Subnet1Name, + }) + } + } + }) + + By("Configuring private endpoint with external network details", func() { + Expect(testData.K8SClient.Get(testData.Context, client.ObjectKeyFromObject(testData.Project), testData.Project)).To(Succeed()) + + for i, pe := range testData.Project.Spec.PrivateEndpoints { + switch pe.Provider { + case "AWS": + pe.ID = privateEndpointDetails[string(pe.Provider)].ID + case "AZURE": + pe.ID = privateEndpointDetails[string(pe.Provider)].ID + pe.IP = privateEndpointDetails[string(pe.Provider)].IP + case "GCP": + gcpEndpoints := make([]akov2.GCPEndpoint, 0, len(privateEndpointDetails[string(pe.Provider)].Endpoints)) + for _, ep := range privateEndpointDetails[string(pe.Provider)].Endpoints { + gcpEndpoints = append( + gcpEndpoints, + akov2.GCPEndpoint{ + EndpointName: ep.Name, + IPAddress: ep.IP, + }, + ) + } + pe.GCPProjectID = privateEndpointDetails[string(pe.Provider)].GCPProjectID + pe.EndpointGroupName = privateEndpointDetails[string(pe.Provider)].EndpointGroupName + pe.Endpoints = gcpEndpoints + } + + testData.Project.Spec.PrivateEndpoints[i] = pe + } + + Expect(testData.K8SClient.Update(testData.Context, testData.Project)).To(Succeed()) + Eventually(func(g Gomega) { + expectedConditions := conditions.MatchConditions( + api.TrueCondition(api.PrivateEndpointServiceReadyType), + api.TrueCondition(api.PrivateEndpointReadyType), + api.TrueCondition(api.ReadyType), + ) + + g.Expect(testData.K8SClient.Get(testData.Context, client.ObjectKeyFromObject(testData.Project), testData.Project)).To(Succeed()) + g.Expect(testData.Project.Status.Conditions).To(ContainElements(expectedConditions)) + }).WithTimeout(15 * time.Minute).WithPolling(10 * time.Second).Should(Succeed()) + }) + }) + + By("Stopping reconciling project and its sub-resources", func() { + Expect(testData.K8SClient.Get(testData.Context, client.ObjectKeyFromObject(testData.Project), testData.Project)).To(Succeed()) + testData.Project.Annotations[customresource.ReconciliationPolicyAnnotation] = customresource.ReconciliationPolicySkip + testData.Project.Spec.PrivateEndpoints = nil + + Expect(testData.K8SClient.Update(testData.Context, testData.Project)).To(Succeed()) + Eventually(func(g Gomega) { + g.Expect(testData.K8SClient.Get(testData.Context, client.ObjectKeyFromObject(testData.Project), testData.Project)).To(Succeed()) + g.Expect(testData.Project.Generation).ToNot(Equal(testData.Project.Status.ObservedGeneration)) + g.Expect(customresource.AnnotationLastSkippedConfiguration).To(BeKeyOf(testData.Project.GetAnnotations())) + g.Expect(testData.Project.Status.Conditions).To(ContainElement(conditions.MatchCondition(api.TrueCondition(api.ReadyType)))) + }).WithTimeout(15 * time.Minute).WithPolling(10 * time.Second).Should(Succeed()) + }) + + By("Migrate private endpoint as separate custom resource", func() { + By("Migrating AWS private endpoint", func() { + awsPE = &akov2.AtlasPrivateEndpoint{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pe-aws-" + testData.Resources.TestID, + Namespace: testData.Resources.Namespace, + }, + Spec: akov2.AtlasPrivateEndpointSpec{ + Project: &common.ResourceRefNamespaced{ + Name: testData.Project.Name, + Namespace: testData.Project.Namespace, + }, + Provider: "AWS", + Region: "EU_CENTRAL_1", + AWSConfiguration: []akov2.AWSPrivateEndpointConfiguration{ + { + ID: privateEndpointDetails["AWS"].ID, + }, + }, + }, + } + + Expect(testData.K8SClient.Create(testData.Context, awsPE)).To(Succeed()) + Eventually(func(g Gomega) { //nolint:dupl + expectedConditions := conditions.MatchConditions( + api.TrueCondition(api.PrivateEndpointServiceReady), + api.TrueCondition(api.PrivateEndpointReady), + api.TrueCondition(api.ReadyType), + ) + g.Expect(testData.K8SClient.Get(testData.Context, client.ObjectKeyFromObject(awsPE), awsPE)).To(Succeed()) + g.Expect(testData.Project.Status.Conditions).To(ContainElements(expectedConditions)) + g.Expect(awsPE.Status.ServiceStatus).To(Equal("AVAILABLE")) + for _, eStatus := range awsPE.Status.Endpoints { + g.Expect(eStatus.Status).To(Equal("AVAILABLE")) + } + }).WithTimeout(15 * time.Minute).WithPolling(10 * time.Second).Should(Succeed()) + }) + + By("Migrating AZURE private endpoint", func() { + azurePE = &akov2.AtlasPrivateEndpoint{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pe-azure-" + testData.Resources.TestID, + Namespace: testData.Resources.Namespace, + }, + Spec: akov2.AtlasPrivateEndpointSpec{ + Project: &common.ResourceRefNamespaced{ + Name: testData.Project.Name, + Namespace: testData.Project.Namespace, + }, + Provider: "AZURE", + Region: "EUROPE_NORTH", + AzureConfiguration: []akov2.AzurePrivateEndpointConfiguration{ + { + ID: privateEndpointDetails["AZURE"].ID, + IP: privateEndpointDetails["AZURE"].IP, + }, + }, + }, + } + + Expect(testData.K8SClient.Create(testData.Context, azurePE)).To(Succeed()) + Eventually(func(g Gomega) { //nolint:dupl + expectedConditions := conditions.MatchConditions( + api.TrueCondition(api.PrivateEndpointServiceReady), + api.TrueCondition(api.PrivateEndpointReady), + api.TrueCondition(api.ReadyType), + ) + g.Expect(testData.K8SClient.Get(testData.Context, client.ObjectKeyFromObject(azurePE), azurePE)).To(Succeed()) + g.Expect(testData.Project.Status.Conditions).To(ContainElements(expectedConditions)) + g.Expect(azurePE.Status.ServiceStatus).To(Equal("AVAILABLE")) + for _, eStatus := range azurePE.Status.Endpoints { + g.Expect(eStatus.Status).To(Equal("AVAILABLE")) + } + }).WithTimeout(15 * time.Minute).WithPolling(10 * time.Second).Should(Succeed()) + }) + + By("Migrating GCP private endpoint", func() { + endpoints := make([]akov2.GCPPrivateEndpoint, 0, len(privateEndpointDetails["GCP"].Endpoints)) + for _, ep := range privateEndpointDetails["GCP"].Endpoints { + endpoints = append( + endpoints, + akov2.GCPPrivateEndpoint{ + Name: ep.Name, + IP: ep.IP, + }, + ) + } + + gcpPE = &akov2.AtlasPrivateEndpoint{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pe-gcp-" + testData.Resources.TestID, + Namespace: testData.Resources.Namespace, + }, + Spec: akov2.AtlasPrivateEndpointSpec{ + Project: &common.ResourceRefNamespaced{ + Name: testData.Project.Name, + Namespace: testData.Project.Namespace, + }, + Provider: "GCP", + Region: "EUROPE_WEST_3", + GCPConfiguration: []akov2.GCPPrivateEndpointConfiguration{ + { + ProjectID: privateEndpointDetails["GCP"].GCPProjectID, + GroupName: privateEndpointDetails["GCP"].EndpointGroupName, + Endpoints: endpoints, + }, + }, + }, + } + + Expect(testData.K8SClient.Create(testData.Context, gcpPE)).To(Succeed()) + Eventually(func(g Gomega) { //nolint:dupl + expectedConditions := conditions.MatchConditions( + api.TrueCondition(api.PrivateEndpointServiceReady), + api.TrueCondition(api.PrivateEndpointReady), + api.TrueCondition(api.ReadyType), + ) + g.Expect(testData.K8SClient.Get(testData.Context, client.ObjectKeyFromObject(gcpPE), gcpPE)).To(Succeed()) + g.Expect(testData.Project.Status.Conditions).To(ContainElements(expectedConditions)) + g.Expect(gcpPE.Status.ServiceStatus).To(Equal("AVAILABLE")) + for _, eStatus := range gcpPE.Status.Endpoints { + g.Expect(eStatus.Status).To(Equal("AVAILABLE")) + } + }).WithTimeout(15 * time.Minute).WithPolling(10 * time.Second).Should(Succeed()) + }) + }) + + By("Restating project reconciliation", func() { + Expect(testData.K8SClient.Get(testData.Context, client.ObjectKeyFromObject(testData.Project), testData.Project)).To(Succeed()) + delete(testData.Project.Annotations, customresource.ReconciliationPolicyAnnotation) + + Expect(testData.K8SClient.Update(testData.Context, testData.Project)).To(Succeed()) + Eventually(func(g Gomega) { + g.Expect(testData.K8SClient.Get(testData.Context, client.ObjectKeyFromObject(testData.Project), testData.Project)).To(Succeed()) + g.Expect(testData.Project.Status.Conditions).To(ContainElement(conditions.MatchCondition(api.TrueCondition(api.ReadyType)))) + }).WithTimeout(15 * time.Minute).WithPolling(10 * time.Second).Should(Succeed()) + }) + + By("Updating project doesn't affect private endpoint", func() { + Expect(testData.K8SClient.Get(testData.Context, client.ObjectKeyFromObject(testData.Project), testData.Project)).To(Succeed()) + testData.Project.Spec.Settings = &akov2.ProjectSettings{ + IsSchemaAdvisorEnabled: pointer.MakePtr(true), + } + + Expect(testData.K8SClient.Update(testData.Context, testData.Project)).To(Succeed()) + Eventually(func(g Gomega) { + notExpectedConditions := conditions.MatchConditions( + api.TrueCondition(api.PrivateEndpointServiceReady), + api.TrueCondition(api.PrivateEndpointReady), + api.FalseCondition(api.PrivateEndpointServiceReady), + api.FalseCondition(api.PrivateEndpointReady), + ) + + g.Expect(testData.K8SClient.Get(testData.Context, client.ObjectKeyFromObject(testData.Project), testData.Project)).To(Succeed()) + g.Expect(testData.Project.Status.Conditions).ToNot(ContainElements(notExpectedConditions)) + g.Expect(testData.Project.Status.Conditions).To(ContainElement(conditions.MatchCondition(api.TrueCondition(api.ReadyType)))) + }).WithTimeout(15 * time.Minute).WithPolling(10 * time.Second).Should(Succeed()) + }) + + By("Private endpoint are still ready", func() { + Eventually(func(g Gomega) { //nolint:dupl + expectedConditions := conditions.MatchConditions( + api.TrueCondition(api.PrivateEndpointServiceReady), + api.TrueCondition(api.PrivateEndpointReady), + api.TrueCondition(api.ReadyType), + ) + for _, pe := range []*akov2.AtlasPrivateEndpoint{awsPE, azurePE, gcpPE} { + g.Expect(testData.K8SClient.Get(testData.Context, client.ObjectKeyFromObject(pe), pe)).To(Succeed()) + g.Expect(testData.Project.Status.Conditions).To(ContainElements(expectedConditions)) + g.Expect(pe.Status.ServiceStatus).To(Equal("AVAILABLE")) + for _, eStatus := range pe.Status.Endpoints { + g.Expect(eStatus.Status).To(Equal("AVAILABLE")) + } + } + }).WithTimeout(15 * time.Minute).WithPolling(10 * time.Second).Should(Succeed()) + }) + + By("Removing private endpoints", func() { + Expect(testData.K8SClient.Delete(testData.Context, awsPE)).To(Succeed()) + Expect(testData.K8SClient.Delete(testData.Context, azurePE)).To(Succeed()) + Expect(testData.K8SClient.Delete(testData.Context, gcpPE)).To(Succeed()) + + Eventually(func(g Gomega) { + g.Expect(testData.K8SClient.Get(testData.Context, client.ObjectKeyFromObject(awsPE), awsPE)).ShouldNot(Succeed()) + g.Expect(testData.K8SClient.Get(testData.Context, client.ObjectKeyFromObject(azurePE), azurePE)).ShouldNot(Succeed()) + g.Expect(testData.K8SClient.Get(testData.Context, client.ObjectKeyFromObject(gcpPE), gcpPE)).ShouldNot(Succeed()) + }).WithTimeout(15 * time.Minute).WithPolling(20 * time.Second).Should(Succeed()) + }) + }) +}) + +func statusForProvider(peStatus []status.ProjectPrivateEndpoint, providerName provider.ProviderName) *status.ProjectPrivateEndpoint { + for _, s := range peStatus { + if s.Provider == providerName { + return &s + } + } + + return nil +}