From beaa7094e219d8f7eaac2e25cdf4a828924428ae Mon Sep 17 00:00:00 2001 From: Helder Santana Date: Tue, 3 Dec 2024 15:47:08 +0100 Subject: [PATCH] CLOUDP-280222: Private Endpoint controller (#1916) --- .github/workflows/test-e2e.yml | 1 + ...las.mongodb.com_atlasprivateendpoints.yaml | 9 + config/rbac/clusterwide/role.yaml | 2 + config/rbac/namespaced/role.yaml | 2 + .../translation/privateendpoint/conversion.go | 27 + pkg/api/condition.go | 6 + pkg/api/v1/atlasprivateendpoint_types.go | 6 + pkg/controller/atlas/provider.go | 3 +- .../atlasprivateendpoint_controller.go | 343 +++++ .../atlasprivateendpoint_controller_test.go | 600 ++++++++ .../atlasprivateendpoint/privateendpoint.go | 274 ++++ .../privateendpoint_test.go | 1277 +++++++++++++++++ .../atlasproject/atlasproject_controller.go | 1 + .../atlasproject/private_endpoint.go | 30 + pkg/controller/atlasproject/project.go | 18 +- pkg/controller/atlasproject/project_test.go | 83 +- pkg/controller/customresource/protection.go | 1 + .../customresource/protection_test.go | 16 + pkg/controller/workflow/reason.go | 14 + .../atlasprivateendpointcredentials.go | 24 + pkg/indexer/atlasprivateendpointsprojects.go | 44 + .../atlasprivateendpointsprojects_test.go | 49 + pkg/indexer/indexer.go | 2 + pkg/operator/builder.go | 15 +- test/e2e/privateendpoint_test.go | 626 ++++++++ test/helper/e2e/actions/cloud/provider.go | 199 ++- 26 files changed, 3642 insertions(+), 30 deletions(-) create mode 100644 pkg/controller/atlasprivateendpoint/atlasprivateendpoint_controller.go create mode 100644 pkg/controller/atlasprivateendpoint/atlasprivateendpoint_controller_test.go create mode 100644 pkg/controller/atlasprivateendpoint/privateendpoint.go create mode 100644 pkg/controller/atlasprivateendpoint/privateendpoint_test.go create mode 100644 pkg/indexer/atlasprivateendpointcredentials.go create mode 100644 pkg/indexer/atlasprivateendpointsprojects.go create mode 100644 pkg/indexer/atlasprivateendpointsprojects_test.go create mode 100644 test/e2e/privateendpoint_test.go diff --git a/.github/workflows/test-e2e.yml b/.github/workflows/test-e2e.yml index 4c6e099407..3d0e13ff9f 100644 --- a/.github/workflows/test-e2e.yml +++ b/.github/workflows/test-e2e.yml @@ -167,6 +167,7 @@ jobs: "multinamespaced", "networkpeering", "privatelink", + "private-endpoint", "project-settings", "serverless-pe", "x509auth", diff --git a/config/crd/bases/atlas.mongodb.com_atlasprivateendpoints.yaml b/config/crd/bases/atlas.mongodb.com_atlasprivateendpoints.yaml index e9fcc68f09..99f12365d5 100644 --- a/config/crd/bases/atlas.mongodb.com_atlasprivateendpoints.yaml +++ b/config/crd/bases/atlas.mongodb.com_atlasprivateendpoints.yaml @@ -73,6 +73,9 @@ spec: - id type: object type: array + x-kubernetes-list-map-keys: + - id + x-kubernetes-list-type: map azureConfiguration: description: AzureConfiguration is the specific Azure settings for the private endpoint @@ -93,6 +96,9 @@ spec: - ipAddress type: object type: array + x-kubernetes-list-map-keys: + - id + x-kubernetes-list-type: map connectionSecret: description: Name of the secret containing Atlas API private and public keys @@ -156,6 +162,9 @@ spec: - projectId type: object type: array + x-kubernetes-list-map-keys: + - groupName + x-kubernetes-list-type: map projectRef: description: Project is a reference to AtlasProject resource the user belongs to diff --git a/config/rbac/clusterwide/role.yaml b/config/rbac/clusterwide/role.yaml index 4e73644e4d..2daca39ba9 100644 --- a/config/rbac/clusterwide/role.yaml +++ b/config/rbac/clusterwide/role.yaml @@ -34,6 +34,7 @@ rules: - atlasdatafederations - atlasdeployments - atlasfederatedauths + - atlasprivateendpoints - atlasprojects - atlassearchindexconfigs - atlasstreamconnections @@ -58,6 +59,7 @@ rules: - atlasdatafederations/status - atlasdeployments/status - atlasfederatedauths/status + - atlasprivateendpoints/status - atlasprojects/status - atlassearchindexconfigs/status - atlasstreamconnections/status diff --git a/config/rbac/namespaced/role.yaml b/config/rbac/namespaced/role.yaml index a65a44d5a1..fae64cd7d9 100644 --- a/config/rbac/namespaced/role.yaml +++ b/config/rbac/namespaced/role.yaml @@ -35,6 +35,7 @@ rules: - atlasdatafederations - atlasdeployments - atlasfederatedauths + - atlasprivateendpoints - atlasprojects - atlassearchindexconfigs - atlasstreamconnections @@ -58,6 +59,7 @@ rules: - atlasdatafederations/status - atlasdeployments/status - atlasfederatedauths/status + - atlasprivateendpoints/status - atlasprojects/status - atlassearchindexconfigs/status - atlasstreamconnections/status diff --git a/internal/translation/privateendpoint/conversion.go b/internal/translation/privateendpoint/conversion.go index 9501f68156..6d7ed9121d 100644 --- a/internal/translation/privateendpoint/conversion.go +++ b/internal/translation/privateendpoint/conversion.go @@ -473,3 +473,30 @@ func interfaceCreateToAtlas(peInterface EndpointInterface, gcpProjectID string) return nil } + +type CompositeEndpointInterface struct { + AKO EndpointInterface + Atlas EndpointInterface +} + +func MapPrivateEndpoints(akoInterfaces, atlasInterfaces []EndpointInterface) map[string]CompositeEndpointInterface { + m := map[string]CompositeEndpointInterface{} + + for _, akoInterface := range akoInterfaces { + m[akoInterface.InterfaceID()] = CompositeEndpointInterface{ + AKO: akoInterface, + } + } + + for _, atlasInterface := range atlasInterfaces { + i := CompositeEndpointInterface{} + if existing, ok := m[atlasInterface.InterfaceID()]; ok { + i = existing + } + + i.Atlas = atlasInterface + m[atlasInterface.InterfaceID()] = i + } + + return m +} diff --git a/pkg/api/condition.go b/pkg/api/condition.go index 11451937f7..b67346d669 100644 --- a/pkg/api/condition.go +++ b/pkg/api/condition.go @@ -94,6 +94,12 @@ const ( TeamUnmanaged ConditionType = "TeamUnmanaged" ) +// Atlas Private Endpoint condition types +const ( + PrivateEndpointServiceReady ConditionType = "PrivateEndpointServiceReady" + PrivateEndpointReady ConditionType = "PrivateEndpointReady" +) + // Generic condition type const ( ResourceVersionStatus ConditionType = "ResourceVersionIsValid" diff --git a/pkg/api/v1/atlasprivateendpoint_types.go b/pkg/api/v1/atlasprivateendpoint_types.go index 4d66426c5d..0b6071939e 100644 --- a/pkg/api/v1/atlasprivateendpoint_types.go +++ b/pkg/api/v1/atlasprivateendpoint_types.go @@ -53,12 +53,18 @@ type AtlasPrivateEndpointSpec struct { // +kubebuilder:validation:Required Region string `json:"region"` // AWSConfiguration is the specific AWS settings for the private endpoint + // +listType=map + // +listMapKey=id // +kubebuilder:validation:Optional AWSConfiguration []AWSPrivateEndpointConfiguration `json:"awsConfiguration,omitempty"` // AzureConfiguration is the specific Azure settings for the private endpoint + // +listType=map + // +listMapKey=id // +kubebuilder:validation:Optional AzureConfiguration []AzurePrivateEndpointConfiguration `json:"azureConfiguration,omitempty"` // GCPConfiguration is the specific Google Cloud settings for the private endpoint + // +listType=map + // +listMapKey=groupName // +kubebuilder:validation:Optional GCPConfiguration []GCPPrivateEndpointConfiguration `json:"gcpConfiguration,omitempty"` } diff --git a/pkg/controller/atlas/provider.go b/pkg/controller/atlas/provider.go index ba303de8dd..51d3dad9e8 100644 --- a/pkg/controller/atlas/provider.go +++ b/pkg/controller/atlas/provider.go @@ -76,7 +76,8 @@ func (p *ProductionProvider) IsResourceSupported(resource api.AtlasCustomResourc *akov2.AtlasDatabaseUser, *akov2.AtlasSearchIndexConfig, *akov2.AtlasBackupCompliancePolicy, - *akov2.AtlasFederatedAuth: + *akov2.AtlasFederatedAuth, + *akov2.AtlasPrivateEndpoint: return true case *akov2.AtlasDataFederation, *akov2.AtlasStreamInstance, diff --git a/pkg/controller/atlasprivateendpoint/atlasprivateendpoint_controller.go b/pkg/controller/atlasprivateendpoint/atlasprivateendpoint_controller.go new file mode 100644 index 0000000000..4d70195f8f --- /dev/null +++ b/pkg/controller/atlasprivateendpoint/atlasprivateendpoint_controller.go @@ -0,0 +1,343 @@ +/* +Copyright 2024 MongoDB. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package atlasprivateendpoint + +import ( + "context" + "errors" + "fmt" + "time" + + "go.uber.org/zap" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/tools/record" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/builder" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" + "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/manager" + "sigs.k8s.io/controller-runtime/pkg/predicate" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + + "github.com/mongodb/mongodb-atlas-kubernetes/v2/internal/pointer" + "github.com/mongodb/mongodb-atlas-kubernetes/v2/internal/translation/privateendpoint" + "github.com/mongodb/mongodb-atlas-kubernetes/v2/internal/translation/project" + "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/api" + akov2 "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/api/v1" + "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/controller/atlas" + "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/controller/customresource" + "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/controller/statushandler" + "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/controller/workflow" + "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/indexer" +) + +// AtlasPrivateEndpointReconciler reconciles a AtlasPrivateEndpoint object +type AtlasPrivateEndpointReconciler struct { + Scheme *runtime.Scheme + Client client.Client + EventRecorder record.EventRecorder + AtlasProvider atlas.Provider + GlobalPredicates []predicate.Predicate + Log *zap.SugaredLogger + privateEndpointService privateendpoint.PrivateEndpointService + + ObjectDeletionProtection bool + independentSyncPeriod time.Duration +} + +// +kubebuilder:rbac:groups=atlas.mongodb.com,resources=atlasprivateendpoints,verbs=get;list;watch;create;update;patch;delete +// +kubebuilder:rbac:groups=atlas.mongodb.com,resources=atlasprivateendpoints/status,verbs=get;update;patch +// +kubebuilder:rbac:groups=atlas.mongodb.com,namespace=default,resources=atlasprivateendpoints,verbs=get;list;watch;create;update;patch;delete +// +kubebuilder:rbac:groups=atlas.mongodb.com,namespace=default,resources=atlasprivateendpoints/status,verbs=get;update;patch +// +kubebuilder:rbac:groups="",resources=events,verbs=create;patch +// +kubebuilder:rbac:groups="",resources=secrets,verbs=get;list;watch + +func (r *AtlasPrivateEndpointReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { + r.Log.Infow("-> Starting AtlasPrivateEndpoint reconciliation") + + akoPrivateEndpoint := akov2.AtlasPrivateEndpoint{} + result := customresource.PrepareResource(ctx, r.Client, req, &akoPrivateEndpoint, r.Log) + if !result.IsOk() { + return result.ReconcileResult(), errors.New(result.GetMessage()) + } + + return r.ensureCustomResource(ctx, &akoPrivateEndpoint) +} + +func (r *AtlasPrivateEndpointReconciler) ensureCustomResource(ctx context.Context, akoPrivateEndpoint *akov2.AtlasPrivateEndpoint) (ctrl.Result, error) { + if customresource.ReconciliationShouldBeSkipped(akoPrivateEndpoint) { + return r.skip(ctx, akoPrivateEndpoint) + } + + conditions := api.InitCondition(akoPrivateEndpoint, api.FalseCondition(api.ReadyType)) + workflowCtx := workflow.NewContext(r.Log, conditions, ctx) + defer statushandler.Update(workflowCtx, r.Client, r.EventRecorder, akoPrivateEndpoint) + + isValid := customresource.ValidateResourceVersion(workflowCtx, akoPrivateEndpoint, r.Log) + if !isValid.IsOk() { + return r.invalidate(isValid) + } + + if !r.AtlasProvider.IsResourceSupported(akoPrivateEndpoint) { + return r.unsupport(workflowCtx) + } + + var atlasProject *project.Project + var err error + if akoPrivateEndpoint.Spec.ExternalProject != nil { + atlasProject, err = r.getProjectFromAtlas(ctx, akoPrivateEndpoint) + } else { + atlasProject, err = r.getProjectFromKube(ctx, akoPrivateEndpoint) + } + if err != nil { + return r.terminate(workflowCtx, akoPrivateEndpoint, nil, api.ReadyType, workflow.AtlasAPIAccessNotConfigured, err) + } + + return r.handlePrivateEndpointService(workflowCtx, atlasProject.ID, akoPrivateEndpoint) +} + +func (r *AtlasPrivateEndpointReconciler) getProjectFromAtlas(ctx context.Context, akoPrivateEndpoint *akov2.AtlasPrivateEndpoint) (*project.Project, error) { + sdkClient, _, err := r.AtlasProvider.SdkClient( + ctx, + &client.ObjectKey{Namespace: akoPrivateEndpoint.Namespace, Name: akoPrivateEndpoint.Credentials().Name}, + r.Log, + ) + if err != nil { + return nil, fmt.Errorf("failed to create Atlas SDK client: %w", err) + } + + projectService := project.NewProjectAPIService(sdkClient.ProjectsApi) + r.privateEndpointService = privateendpoint.NewPrivateEndpointAPI(sdkClient.PrivateEndpointServicesApi) + + atlasProject, err := projectService.GetProject(ctx, akoPrivateEndpoint.Spec.ExternalProject.ID) + if err != nil { + return nil, fmt.Errorf("failed to retrieve project from Atlas: %w", err) + } + + return atlasProject, nil +} + +func (r *AtlasPrivateEndpointReconciler) getProjectFromKube(ctx context.Context, akoPrivateEndpoint *akov2.AtlasPrivateEndpoint) (*project.Project, error) { + atlasProject := &akov2.AtlasProject{} + if err := r.Client.Get(ctx, akoPrivateEndpoint.AtlasProjectObjectKey(), atlasProject); err != nil { + return nil, fmt.Errorf("failed to retrieve project custom resource: %w", err) + } + + credentialsSecret, err := customresource.ComputeSecret(atlasProject, akoPrivateEndpoint) + if err != nil { + return nil, fmt.Errorf("failed to compute secret: %w", err) + } + + sdkClient, orgID, err := r.AtlasProvider.SdkClient(ctx, credentialsSecret, r.Log) + if err != nil { + return nil, fmt.Errorf("failed to create Atlas SDK client: %w", err) + } + + r.privateEndpointService = privateendpoint.NewPrivateEndpointAPI(sdkClient.PrivateEndpointServicesApi) + + return project.NewProject(atlasProject, orgID), nil +} + +func (r *AtlasPrivateEndpointReconciler) skip(ctx context.Context, akoPrivateEndpoint *akov2.AtlasPrivateEndpoint) (ctrl.Result, error) { + r.Log.Infow(fmt.Sprintf("-> Skipping AtlasPrivateEndpoint reconciliation as annotation %s=%s", customresource.ReconciliationPolicyAnnotation, customresource.ReconciliationPolicySkip), "spec", akoPrivateEndpoint.Spec) + if !akoPrivateEndpoint.GetDeletionTimestamp().IsZero() { + if err := customresource.ManageFinalizer(ctx, r.Client, akoPrivateEndpoint, customresource.UnsetFinalizer); err != nil { + result := workflow.Terminate(workflow.Internal, err.Error()) + r.Log.Errorw("Failed to remove finalizer", "terminate", err) + + return result.ReconcileResult(), nil + } + } + + return workflow.OK().ReconcileResult(), nil +} + +func (r *AtlasPrivateEndpointReconciler) invalidate(invalid workflow.Result) (ctrl.Result, error) { + // note: ValidateResourceVersion already set the state so we don't have to do it here. + r.Log.Debugf("AtlasPrivateEndpoint is invalid: %v", invalid) + return invalid.ReconcileResult(), nil +} + +func (r *AtlasPrivateEndpointReconciler) unsupport(ctx *workflow.Context) (ctrl.Result, error) { + unsupported := workflow.Terminate( + workflow.AtlasGovUnsupported, "the AtlasPrivateEndpoint is not supported by Atlas for government"). + WithoutRetry() + ctx.SetConditionFromResult(api.ReadyType, unsupported) + return unsupported.ReconcileResult(), nil +} + +func (r *AtlasPrivateEndpointReconciler) terminate( + ctx *workflow.Context, + akoPrivateEndpoint *akov2.AtlasPrivateEndpoint, + atlasPEService privateendpoint.EndpointService, + condition api.ConditionType, + reason workflow.ConditionReason, + err error, +) (ctrl.Result, error) { + r.Log.Errorf("resource %T(%s/%s) failed on condition %s: %s", akoPrivateEndpoint, akoPrivateEndpoint.GetNamespace(), akoPrivateEndpoint.GetName(), condition, err) + result := workflow.Terminate(reason, err.Error()) + ctx.SetConditionFalse(api.ReadyType). + SetConditionFromResult(condition, result) + + if atlasPEService != nil { + ctx.EnsureStatusOption(privateendpoint.NewPrivateEndpointStatus(atlasPEService)) + } + + return result.ReconcileResult(), nil +} + +func (r *AtlasPrivateEndpointReconciler) inProgress( + ctx *workflow.Context, + akoPrivateEndpoint *akov2.AtlasPrivateEndpoint, + atlasPEService privateendpoint.EndpointService, + condition api.ConditionType, + reason workflow.ConditionReason, + msg string, +) (ctrl.Result, error) { + if err := customresource.ManageFinalizer(ctx.Context, r.Client, akoPrivateEndpoint, customresource.SetFinalizer); err != nil { + return r.terminate(ctx, akoPrivateEndpoint, atlasPEService, api.ReadyType, workflow.AtlasFinalizerNotSet, err) + } + + result := workflow.InProgress(reason, msg) + ctx.SetConditionFalse(api.ReadyType). + SetConditionFromResult(condition, result). + EnsureStatusOption(privateendpoint.NewPrivateEndpointStatus(atlasPEService)) + + return result.ReconcileResult(), nil +} + +func (r *AtlasPrivateEndpointReconciler) ready(ctx *workflow.Context, akoPrivateEndpoint *akov2.AtlasPrivateEndpoint, atlasPEService privateendpoint.EndpointService) (ctrl.Result, error) { + if err := customresource.ManageFinalizer(ctx.Context, r.Client, akoPrivateEndpoint, customresource.SetFinalizer); err != nil { + return r.terminate(ctx, akoPrivateEndpoint, atlasPEService, api.ReadyType, workflow.AtlasFinalizerNotSet, err) + } + + ctx.SetConditionTrue(api.PrivateEndpointServiceReady). + SetConditionTrue(api.PrivateEndpointReady). + SetConditionTrue(api.ReadyType). + EnsureStatusOption(privateendpoint.NewPrivateEndpointStatus(atlasPEService)) + + if akoPrivateEndpoint.Spec.ExternalProject != nil { + return workflow.Requeue(r.independentSyncPeriod).ReconcileResult(), nil + } + + return workflow.OK().ReconcileResult(), nil +} + +func (r *AtlasPrivateEndpointReconciler) waitForConfiguration(ctx *workflow.Context, akoPrivateEndpoint *akov2.AtlasPrivateEndpoint, atlasPEService privateendpoint.EndpointService) (ctrl.Result, error) { + if err := customresource.ManageFinalizer(ctx.Context, r.Client, akoPrivateEndpoint, customresource.SetFinalizer); err != nil { + return r.terminate(ctx, akoPrivateEndpoint, atlasPEService, api.ReadyType, workflow.AtlasFinalizerNotSet, err) + } + + result := workflow.InProgress(workflow.PrivateEndpointConfigurationPending, "waiting for private endpoint configuration from customer side"). + WithoutRetry() + ctx.SetConditionFalse(api.ReadyType). + SetConditionTrue(api.PrivateEndpointServiceReady). + SetConditionFromResult(api.PrivateEndpointReady, result). + EnsureStatusOption(privateendpoint.NewPrivateEndpointStatus(atlasPEService)) + + return result.ReconcileResult(), nil +} + +func (r *AtlasPrivateEndpointReconciler) unmanage(ctx *workflow.Context, akoPrivateEndpoint *akov2.AtlasPrivateEndpoint) (ctrl.Result, error) { + if err := customresource.ManageFinalizer(ctx.Context, r.Client, akoPrivateEndpoint, customresource.UnsetFinalizer); err != nil { + return r.terminate(ctx, akoPrivateEndpoint, nil, api.ReadyType, workflow.AtlasFinalizerNotRemoved, err) + } + + return workflow.Deleted().ReconcileResult(), nil +} + +func (r *AtlasPrivateEndpointReconciler) SetupWithManager(mgr ctrl.Manager, skipNameValidation bool) error { + return ctrl.NewControllerManagedBy(mgr). + Named("AtlasPrivateEndpoint"). + For(&akov2.AtlasPrivateEndpoint{}, builder.WithPredicates(r.GlobalPredicates...)). + Watches( + &akov2.AtlasProject{}, + handler.EnqueueRequestsFromMapFunc(r.privateEndpointForProjectMapFunc()), + builder.WithPredicates(predicate.ResourceVersionChangedPredicate{}), + ). + Watches( + &corev1.Secret{}, + handler.EnqueueRequestsFromMapFunc(r.privateEndpointForCredentialMapFunc()), + builder.WithPredicates(predicate.ResourceVersionChangedPredicate{}), + ). + WithOptions(controller.TypedOptions[reconcile.Request]{SkipNameValidation: pointer.MakePtr(skipNameValidation)}). + Complete(r) +} + +func (r *AtlasPrivateEndpointReconciler) privateEndpointForProjectMapFunc() handler.MapFunc { + return func(ctx context.Context, obj client.Object) []reconcile.Request { + atlasProject, ok := obj.(*akov2.AtlasProject) + if !ok { + r.Log.Warnf("watching Project but got %T", obj) + + return nil + } + + peList := &akov2.AtlasPrivateEndpointList{} + listOpts := &client.ListOptions{ + FieldSelector: fields.OneTermEqualSelector( + indexer.AtlasPrivateEndpointByProjectIndex, + client.ObjectKeyFromObject(atlasProject).String(), + ), + } + err := r.Client.List(ctx, peList, listOpts) + if err != nil { + r.Log.Errorf("failed to list AtlasPrivateEndpoint: %s", err) + + return []reconcile.Request{} + } + + requests := make([]reconcile.Request, 0, len(peList.Items)) + for _, item := range peList.Items { + requests = append(requests, reconcile.Request{NamespacedName: types.NamespacedName{Name: item.Name, Namespace: item.Namespace}}) + } + + return requests + } +} + +func (r *AtlasPrivateEndpointReconciler) privateEndpointForCredentialMapFunc() handler.MapFunc { + return indexer.CredentialsIndexMapperFunc( + indexer.AtlasPrivateEndpointCredentialsIndex, + func() *akov2.AtlasPrivateEndpointList { return &akov2.AtlasPrivateEndpointList{} }, + indexer.PrivateEndpointRequests, + r.Client, + r.Log, + ) +} + +func NewAtlasPrivateEndpointReconciler( + mgr manager.Manager, + predicates []predicate.Predicate, + atlasProvider atlas.Provider, + deletionProtection bool, + logger *zap.Logger, +) *AtlasPrivateEndpointReconciler { + return &AtlasPrivateEndpointReconciler{ + Scheme: mgr.GetScheme(), + Client: mgr.GetClient(), + EventRecorder: mgr.GetEventRecorderFor("AtlasPrivateEndpoint"), + AtlasProvider: atlasProvider, + GlobalPredicates: predicates, + Log: logger.Named("controllers").Named("AtlasPrivateEndpoint").Sugar(), + ObjectDeletionProtection: deletionProtection, + } +} diff --git a/pkg/controller/atlasprivateendpoint/atlasprivateendpoint_controller_test.go b/pkg/controller/atlasprivateendpoint/atlasprivateendpoint_controller_test.go new file mode 100644 index 0000000000..b246bff308 --- /dev/null +++ b/pkg/controller/atlasprivateendpoint/atlasprivateendpoint_controller_test.go @@ -0,0 +1,600 @@ +package atlasprivateendpoint + +import ( + "context" + "errors" + "fmt" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + "go.mongodb.org/atlas-sdk/v20231115008/admin" + "go.mongodb.org/atlas-sdk/v20231115008/mockadmin" + "go.uber.org/zap" + "go.uber.org/zap/zaptest" + "go.uber.org/zap/zaptest/observer" + k8serrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/tools/record" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/client/fake" + "sigs.k8s.io/controller-runtime/pkg/client/interceptor" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + + atlasmock "github.com/mongodb/mongodb-atlas-kubernetes/v2/internal/mocks/atlas" + "github.com/mongodb/mongodb-atlas-kubernetes/v2/internal/pointer" + "github.com/mongodb/mongodb-atlas-kubernetes/v2/internal/translation/privateendpoint" + "github.com/mongodb/mongodb-atlas-kubernetes/v2/internal/translation/project" + "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/api" + akov2 "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/api/v1" + "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/api/v1/common" + "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/api/v1/status" + "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/controller/atlas" + "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/controller/customresource" + "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/controller/workflow" +) + +func TestReconcile(t *testing.T) { + ctx := context.Background() + projectID := "project-id" + + testScheme := runtime.NewScheme() + require.NoError(t, akov2.AddToScheme(testScheme)) + + tests := map[string]struct { + request reconcile.Request + expectedResult reconcile.Result + expectedLogs []string + }{ + "failed to prepare resource": { + request: reconcile.Request{NamespacedName: types.NamespacedName{Namespace: "default", Name: "pe2"}}, + expectedResult: reconcile.Result{}, + expectedLogs: []string{ + "-> Starting AtlasPrivateEndpoint reconciliation", + "Object default/pe2 doesn't exist, was it deleted after reconcile request?", + }, + }, + "prepare resource for reconciliation": { + request: reconcile.Request{NamespacedName: types.NamespacedName{Namespace: "default", Name: "pe1"}}, + expectedResult: reconcile.Result{}, + expectedLogs: []string{ + "-> Starting AtlasPrivateEndpoint reconciliation", + "-> Skipping AtlasPrivateEndpoint reconciliation as annotation mongodb.com/atlas-reconciliation-policy=skip", + }, + }, + } + for name, tt := range tests { + t.Run(name, func(t *testing.T) { + pe := &akov2.AtlasPrivateEndpoint{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pe1", + Namespace: "default", + Annotations: map[string]string{ + customresource.ReconciliationPolicyAnnotation: customresource.ReconciliationPolicySkip, + }, + }, + Spec: akov2.AtlasPrivateEndpointSpec{ + ExternalProject: &akov2.ExternalProjectReference{ + ID: projectID, + }, + LocalCredentialHolder: api.LocalCredentialHolder{}, + Provider: "AWS", + Region: "US_EAST_1", + }, + Status: status.AtlasPrivateEndpointStatus{ + ServiceID: "pe-service-id", + ServiceStatus: "AVAILABLE", + }, + } + + core, logs := observer.New(zap.DebugLevel) + fakeClient := fake.NewClientBuilder(). + WithScheme(testScheme). + WithObjects(pe). + Build() + r := &AtlasPrivateEndpointReconciler{ + Client: fakeClient, + Log: zap.New(core).Sugar(), + } + result, _ := r.Reconcile(ctx, tt.request) + assert.Equal(t, tt.expectedResult, result) + assert.Equal(t, len(tt.expectedLogs), logs.Len()) + for i, logMsg := range tt.expectedLogs { + assert.Equal(t, logMsg, logs.All()[i].Message) + } + }) + } +} + +func TestEnsureCustomResource(t *testing.T) { + ctx := context.Background() + projectID := "project-id" + + testScheme := runtime.NewScheme() + require.NoError(t, akov2.AddToScheme(testScheme)) + + tests := map[string]struct { + atlasPrivateEndpoint *akov2.AtlasPrivateEndpoint + provider atlas.Provider + expectedResult reconcile.Result + expectedLogs []string + }{ + "skip custom resource reconciliation": { + atlasPrivateEndpoint: &akov2.AtlasPrivateEndpoint{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pe1", + Namespace: "default", + Annotations: map[string]string{ + customresource.ReconciliationPolicyAnnotation: customresource.ReconciliationPolicySkip, + }, + }, + Spec: akov2.AtlasPrivateEndpointSpec{ + ExternalProject: &akov2.ExternalProjectReference{ + ID: projectID, + }, + LocalCredentialHolder: api.LocalCredentialHolder{}, + Provider: "AWS", + Region: "US_EAST_1", + }, + Status: status.AtlasPrivateEndpointStatus{ + ServiceID: "pe-service-id", + ServiceStatus: "AVAILABLE", + }, + }, + expectedResult: reconcile.Result{}, + expectedLogs: []string{ + "-> Skipping AtlasPrivateEndpoint reconciliation as annotation mongodb.com/atlas-reconciliation-policy=skip", + }, + }, + "custom resource version is invalid": { + atlasPrivateEndpoint: &akov2.AtlasPrivateEndpoint{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pe1", + Namespace: "default", + Labels: map[string]string{ + customresource.ResourceVersion: "wrong", + }, + }, + Spec: akov2.AtlasPrivateEndpointSpec{ + ExternalProject: &akov2.ExternalProjectReference{ + ID: projectID, + }, + LocalCredentialHolder: api.LocalCredentialHolder{}, + Provider: "AWS", + Region: "US_EAST_1", + }, + Status: status.AtlasPrivateEndpointStatus{ + ServiceID: "pe-service-id", + ServiceStatus: "AVAILABLE", + }, + }, + expectedResult: reconcile.Result{RequeueAfter: workflow.DefaultRetry}, + expectedLogs: []string{ + "resource version for 'pe1' is invalid", + "AtlasPrivateEndpoint is invalid: {true 10000000000 wrong is not a valid semver version for label mongodb.com/atlas-resource-version AtlasResourceVersionIsInvalid true false}", + "Status update", + }, + }, + "custom resource is not supported": { + atlasPrivateEndpoint: &akov2.AtlasPrivateEndpoint{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pe1", + Namespace: "default", + }, + Spec: akov2.AtlasPrivateEndpointSpec{ + ExternalProject: &akov2.ExternalProjectReference{ + ID: projectID, + }, + LocalCredentialHolder: api.LocalCredentialHolder{}, + Provider: "AWS", + Region: "US_EAST_1", + }, + Status: status.AtlasPrivateEndpointStatus{ + ServiceID: "pe-service-id", + ServiceStatus: "AVAILABLE", + }, + }, + provider: &atlasmock.TestProvider{ + IsCloudGovFunc: func() bool { + return true + }, + IsSupportedFunc: func() bool { + return false + }, + }, + expectedResult: reconcile.Result{}, + expectedLogs: []string{ + "resource 'pe1' version is valid", + "Status update", + }, + }, + "failed to get project from atlas": { + atlasPrivateEndpoint: &akov2.AtlasPrivateEndpoint{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pe1", + Namespace: "default", + }, + Spec: akov2.AtlasPrivateEndpointSpec{ + ExternalProject: &akov2.ExternalProjectReference{ + ID: projectID, + }, + LocalCredentialHolder: api.LocalCredentialHolder{ + ConnectionSecret: &api.LocalObjectReference{ + Name: "my-secret", + }, + }, + Provider: "AWS", + Region: "US_EAST_1", + }, + Status: status.AtlasPrivateEndpointStatus{ + ServiceID: "pe-service-id", + ServiceStatus: "AVAILABLE", + }, + }, + provider: &atlasmock.TestProvider{ + IsCloudGovFunc: func() bool { + return true + }, + IsSupportedFunc: func() bool { + return true + }, + SdkClientFunc: func(secretRef *client.ObjectKey, log *zap.SugaredLogger) (*admin.APIClient, string, error) { + return nil, "", errors.New("failed to create sdk client") + }, + }, + expectedResult: reconcile.Result{RequeueAfter: workflow.DefaultRetry}, + expectedLogs: []string{ + "resource 'pe1' version is valid", + "resource *v1.AtlasPrivateEndpoint(default/pe1) failed on condition Ready: failed to create Atlas SDK client: failed to create sdk client", + "Status update", + }, + }, + "failed to get project from cluster": { + atlasPrivateEndpoint: &akov2.AtlasPrivateEndpoint{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pe1", + Namespace: "default", + }, + Spec: akov2.AtlasPrivateEndpointSpec{ + Project: &common.ResourceRefNamespaced{ + Name: "my-project", + Namespace: "default", + }, + Provider: "AWS", + Region: "US_EAST_1", + }, + Status: status.AtlasPrivateEndpointStatus{ + ServiceID: "pe-service-id", + ServiceStatus: "AVAILABLE", + }, + }, + provider: &atlasmock.TestProvider{ + IsCloudGovFunc: func() bool { + return true + }, + IsSupportedFunc: func() bool { + return true + }, + }, + expectedResult: reconcile.Result{RequeueAfter: workflow.DefaultRetry}, + expectedLogs: []string{ + "resource 'pe1' version is valid", + "resource *v1.AtlasPrivateEndpoint(default/pe1) failed on condition Ready: failed to retrieve project custom resource: atlasprojects.atlas.mongodb.com \"my-project\" not found", + "Status update", + }, + }, + "custom resource is ready for reconciliation": { + atlasPrivateEndpoint: &akov2.AtlasPrivateEndpoint{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pe1", + Namespace: "default", + }, + Spec: akov2.AtlasPrivateEndpointSpec{ + ExternalProject: &akov2.ExternalProjectReference{ + ID: projectID, + }, + LocalCredentialHolder: api.LocalCredentialHolder{ + ConnectionSecret: &api.LocalObjectReference{ + Name: "my-secret", + }, + }, + Provider: "AWS", + Region: "US_EAST_1", + }, + Status: status.AtlasPrivateEndpointStatus{ + ServiceID: "pe-service-id", + ServiceStatus: "INITIATING", + }, + }, + provider: &atlasmock.TestProvider{ + IsCloudGovFunc: func() bool { + return true + }, + IsSupportedFunc: func() bool { + return true + }, + SdkClientFunc: func(secretRef *client.ObjectKey, log *zap.SugaredLogger) (*admin.APIClient, string, error) { + projectAPI := mockadmin.NewProjectsApi(t) + projectAPI.EXPECT().GetProject(ctx, projectID).Return(admin.GetProjectApiRequest{ApiService: projectAPI}) + projectAPI.EXPECT().GetProjectExecute(mock.AnythingOfType("admin.GetProjectApiRequest")). + Return(&admin.Group{Id: &projectID}, nil, nil) + + peAPI := mockadmin.NewPrivateEndpointServicesApi(t) + peAPI.EXPECT().GetPrivateEndpointService(ctx, projectID, "AWS", "pe-service-id"). + Return(admin.GetPrivateEndpointServiceApiRequest{ApiService: peAPI}) + peAPI.EXPECT().GetPrivateEndpointServiceExecute(mock.AnythingOfType("admin.GetPrivateEndpointServiceApiRequest")). + Return( + &admin.EndpointService{ + Id: pointer.MakePtr("pe-service-id"), + CloudProvider: "AWS", + RegionName: pointer.MakePtr("US_EAST_1"), + Status: pointer.MakePtr("INITIATING"), + }, + nil, + nil, + ) + + return &admin.APIClient{ProjectsApi: projectAPI, PrivateEndpointServicesApi: peAPI}, "", nil + }, + }, + expectedResult: reconcile.Result{RequeueAfter: workflow.DefaultRetry}, + expectedLogs: []string{ + "resource 'pe1' version is valid", + "Status update", + }, + }, + } + for name, tt := range tests { + t.Run(name, func(t *testing.T) { + core, logs := observer.New(zap.DebugLevel) + fakeClient := fake.NewClientBuilder(). + WithScheme(testScheme). + WithObjects(tt.atlasPrivateEndpoint). + WithStatusSubresource(tt.atlasPrivateEndpoint). + Build() + r := &AtlasPrivateEndpointReconciler{ + Client: fakeClient, + AtlasProvider: tt.provider, + EventRecorder: record.NewFakeRecorder(10), + Log: zap.New(core).Sugar(), + } + result, err := r.ensureCustomResource(ctx, tt.atlasPrivateEndpoint) + assert.NoError(t, err) + assert.Equal(t, tt.expectedResult, result) + assert.Equal(t, len(tt.expectedLogs), logs.Len()) + for i, logMsg := range tt.expectedLogs { + assert.Equal(t, logMsg, logs.All()[i].Message) + } + }) + } +} + +func TestGetProjectFromKube(t *testing.T) { + ctx := context.Background() + projectID := "project-id" + + testScheme := runtime.NewScheme() + require.NoError(t, akov2.AddToScheme(testScheme)) + + tests := map[string]struct { + atlasPrivateEndpoint *akov2.AtlasPrivateEndpoint + provider atlas.Provider + expectedProject *project.Project + expectedErr error + }{ + "failed to resolve secret": { + atlasPrivateEndpoint: &akov2.AtlasPrivateEndpoint{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pe1", + Namespace: "default", + }, + Spec: akov2.AtlasPrivateEndpointSpec{ + Project: &common.ResourceRefNamespaced{ + Name: "my-missing-project", + Namespace: "default", + }, + Provider: "AWS", + Region: "US_EAST_1", + }, + Status: status.AtlasPrivateEndpointStatus{ + ServiceID: "pe-service-id", + ServiceStatus: "AVAILABLE", + }, + }, + expectedErr: fmt.Errorf( + "failed to retrieve project custom resource: %w", + &k8serrors.StatusError{ + ErrStatus: metav1.Status{ + Status: "Failure", + Message: "atlasprojects.atlas.mongodb.com \"my-missing-project\" not found", + Reason: "NotFound", + Details: &metav1.StatusDetails{ + Name: "my-missing-project", + Group: "atlas.mongodb.com", + Kind: "atlasprojects", + }, + Code: 404, + }, + }, + ), + }, + "failed to create sdk client": { + atlasPrivateEndpoint: &akov2.AtlasPrivateEndpoint{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pe1", + Namespace: "default", + }, + Spec: akov2.AtlasPrivateEndpointSpec{ + Project: &common.ResourceRefNamespaced{ + Name: "my-project", + Namespace: "default", + }, + Provider: "AWS", + Region: "US_EAST_1", + }, + Status: status.AtlasPrivateEndpointStatus{ + ServiceID: "pe-service-id", + ServiceStatus: "AVAILABLE", + }, + }, + provider: &atlasmock.TestProvider{ + SdkClientFunc: func(secretRef *client.ObjectKey, log *zap.SugaredLogger) (*admin.APIClient, string, error) { + return nil, "", errors.New("failed to create sdk client") + }, + }, + expectedErr: fmt.Errorf("failed to create Atlas SDK client: %w", errors.New("failed to create sdk client")), + }, + "get project from cluster": { + atlasPrivateEndpoint: &akov2.AtlasPrivateEndpoint{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pe1", + Namespace: "default", + }, + Spec: akov2.AtlasPrivateEndpointSpec{ + Project: &common.ResourceRefNamespaced{ + Name: "my-project", + Namespace: "default", + }, + Provider: "AWS", + Region: "US_EAST_1", + }, + Status: status.AtlasPrivateEndpointStatus{ + ServiceID: "pe-service-id", + ServiceStatus: "AVAILABLE", + }, + }, + provider: &atlasmock.TestProvider{ + SdkClientFunc: func(secretRef *client.ObjectKey, log *zap.SugaredLogger) (*admin.APIClient, string, error) { + return &admin.APIClient{}, "org-id", nil + }, + }, + expectedProject: &project.Project{ + OrgID: "org-id", + ID: "project-id", + Name: "My Project", + }, + }, + } + for name, tt := range tests { + t.Run(name, func(t *testing.T) { + akoProject := &akov2.AtlasProject{ + ObjectMeta: metav1.ObjectMeta{ + Name: "my-project", + Namespace: "default", + }, + Spec: akov2.AtlasProjectSpec{ + Name: "My Project", + }, + Status: status.AtlasProjectStatus{ + ID: projectID, + }, + } + + fakeClient := fake.NewClientBuilder(). + WithScheme(testScheme). + WithObjects(akoProject). + Build() + r := &AtlasPrivateEndpointReconciler{ + Client: fakeClient, + AtlasProvider: tt.provider, + } + p, err := r.getProjectFromKube(ctx, tt.atlasPrivateEndpoint) + assert.Equal(t, tt.expectedProject, p) + assert.Equal(t, tt.expectedErr, err) + }) + } +} + +func TestFailManageFinalizer(t *testing.T) { + testScheme := runtime.NewScheme() + require.NoError(t, akov2.AddToScheme(testScheme)) + pe := &akov2.AtlasPrivateEndpoint{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pe1", + Namespace: "default", + }, + Spec: akov2.AtlasPrivateEndpointSpec{ + ExternalProject: &akov2.ExternalProjectReference{ + ID: "project-id", + }, + LocalCredentialHolder: api.LocalCredentialHolder{}, + Provider: "AWS", + Region: "US_EAST_1", + }, + Status: status.AtlasPrivateEndpointStatus{ + ServiceID: "pe-service-id", + ServiceStatus: "AVAILABLE", + }, + } + atlasPE := &privateendpoint.AWSService{ + CommonEndpointService: privateendpoint.CommonEndpointService{ + ID: "pe-service-id", + CloudRegion: "US_EAST_1", + ServiceStatus: "AVAILABLE", + }, + ServiceName: "aws/service/name", + } + + fakeClient := fake.NewClientBuilder(). + WithScheme(testScheme). + WithObjects(pe). + WithStatusSubresource(pe). + WithInterceptorFuncs(interceptor.Funcs{ + Patch: func(ctx context.Context, client client.WithWatch, obj client.Object, patch client.Patch, opts ...client.PatchOption) error { + return errors.New("failed to manage finalizer") + }, + }). + Build() + + logger := zaptest.NewLogger(t).Sugar() + workflowCtx := &workflow.Context{ + Log: logger, + } + + tests := map[string]struct { + transition func(r *AtlasPrivateEndpointReconciler) (reconcile.Result, error) + }{ + "failed to manage finalizer when transitioning to in progress": { + transition: func(r *AtlasPrivateEndpointReconciler) (reconcile.Result, error) { + return r.inProgress( + workflowCtx, + pe, + atlasPE, + api.PrivateEndpointServiceReady, + workflow.PrivateEndpointServiceInitializing, + "testing transition", + ) + }, + }, + "failed to manage finalizer when transitioning to ready": { + transition: func(r *AtlasPrivateEndpointReconciler) (reconcile.Result, error) { + return r.ready(workflowCtx, pe, atlasPE) + }, + }, + "failed to manage finalizer when transitioning to waiting configuration": { + transition: func(r *AtlasPrivateEndpointReconciler) (reconcile.Result, error) { + return r.waitForConfiguration(workflowCtx, pe, atlasPE) + }, + }, + "failed to manage finalizer when transitioning to unmanage": { + transition: func(r *AtlasPrivateEndpointReconciler) (reconcile.Result, error) { + return r.unmanage(workflowCtx, pe) + }, + }, + } + for name, tt := range tests { + t.Run(name, func(t *testing.T) { + r := &AtlasPrivateEndpointReconciler{ + Client: fakeClient, + Log: zaptest.NewLogger(t).Sugar(), + } + + result, err := tt.transition(r) + assert.NoError(t, err) + assert.Equal(t, reconcile.Result{RequeueAfter: workflow.DefaultRetry}, result) + }) + } +} diff --git a/pkg/controller/atlasprivateendpoint/privateendpoint.go b/pkg/controller/atlasprivateendpoint/privateendpoint.go new file mode 100644 index 0000000000..0745478134 --- /dev/null +++ b/pkg/controller/atlasprivateendpoint/privateendpoint.go @@ -0,0 +1,274 @@ +/* +Copyright 2024 MongoDB. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package atlasprivateendpoint + +import ( + "context" + "errors" + + ctrl "sigs.k8s.io/controller-runtime" + + "github.com/mongodb/mongodb-atlas-kubernetes/v2/internal/translation/privateendpoint" + "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/api" + akov2 "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/api/v1" + "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/controller/customresource" + "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/controller/workflow" +) + +func (r *AtlasPrivateEndpointReconciler) handlePrivateEndpointService( + ctx *workflow.Context, + projectID string, + akoPrivateEndpoint *akov2.AtlasPrivateEndpoint, +) (ctrl.Result, error) { + akoPEService := privateendpoint.NewPrivateEndpoint(akoPrivateEndpoint) + atlasPEService, err := r.getOrMatchPrivateEndpointService(ctx.Context, projectID, akoPrivateEndpoint) + if err != nil { + return r.terminate(ctx, akoPrivateEndpoint, atlasPEService, api.ReadyType, workflow.Internal, err) + } + + wasDeleted := !akoPrivateEndpoint.GetDeletionTimestamp().IsZero() + existInAtlas := atlasPEService != nil + + switch { + case !existInAtlas && !wasDeleted: + return r.createPrivateEndpointService(ctx, projectID, akoPrivateEndpoint, akoPEService) + case !existInAtlas && wasDeleted: + return r.unmanage(ctx, akoPrivateEndpoint) + case existInAtlas && wasDeleted: + return r.deletePrivateEndpointService(ctx, projectID, akoPrivateEndpoint, akoPEService, atlasPEService) + } + + return r.watchServiceState(ctx, projectID, akoPrivateEndpoint, akoPEService, atlasPEService) +} + +func (r *AtlasPrivateEndpointReconciler) createPrivateEndpointService( + ctx *workflow.Context, + projectID string, + akoPrivateEndpoint *akov2.AtlasPrivateEndpoint, + akoPEService privateendpoint.EndpointService, +) (ctrl.Result, error) { + atlasPEService, err := r.privateEndpointService.CreatePrivateEndpointService(ctx.Context, projectID, akoPEService) + if err != nil { + return r.terminate(ctx, akoPrivateEndpoint, atlasPEService, api.PrivateEndpointServiceReady, workflow.PrivateEndpointServiceFailedToCreate, err) + } + + return r.watchServiceState(ctx, projectID, akoPrivateEndpoint, akoPEService, atlasPEService) +} + +func (r *AtlasPrivateEndpointReconciler) deletePrivateEndpointService( + ctx *workflow.Context, + projectID string, + akoPrivateEndpoint *akov2.AtlasPrivateEndpoint, + akoPEService privateendpoint.EndpointService, + atlasPEService privateendpoint.EndpointService, +) (ctrl.Result, error) { + if customresource.IsResourcePolicyKeepOrDefault(akoPrivateEndpoint, r.ObjectDeletionProtection) { + return r.unmanage(ctx, akoPrivateEndpoint) + } + + atlasPEService, err := r.deletePrivateEndpoint(ctx.Context, projectID, atlasPEService) + if err != nil { + return r.terminate(ctx, akoPrivateEndpoint, atlasPEService, api.PrivateEndpointServiceReady, workflow.PrivateEndpointFailedToDelete, err) + } + + return r.watchServiceState(ctx, projectID, akoPrivateEndpoint, akoPEService, atlasPEService) +} + +func (r *AtlasPrivateEndpointReconciler) watchServiceState( + ctx *workflow.Context, + projectID string, + akoPrivateEndpoint *akov2.AtlasPrivateEndpoint, + akoPEService privateendpoint.EndpointService, + atlasPEService privateendpoint.EndpointService, +) (ctrl.Result, error) { + switch atlasPEService.Status() { + case privateendpoint.StatusInitiating: + return r.inProgress(ctx, akoPrivateEndpoint, atlasPEService, api.PrivateEndpointServiceReady, workflow.PrivateEndpointServiceInitializing, "Private Endpoint is being initialized") + case privateendpoint.StatusPending, privateendpoint.StatusPendingAcceptance, privateendpoint.StatusWaitingForUser, privateendpoint.StatusVerified: + return r.inProgress(ctx, akoPrivateEndpoint, atlasPEService, api.PrivateEndpointServiceReady, workflow.PrivateEndpointServiceInitializing, "Private Endpoint is waiting for human action") + case privateendpoint.StatusFailed, privateendpoint.StatusRejected: + return r.terminate(ctx, akoPrivateEndpoint, atlasPEService, api.PrivateEndpointServiceReady, workflow.PrivateEndpointServiceFailedToConfigure, errors.New(atlasPEService.ErrorMessage())) + case privateendpoint.StatusDeleting: + return r.inProgress(ctx, akoPrivateEndpoint, atlasPEService, api.PrivateEndpointServiceReady, workflow.PrivateEndpointServiceDeleting, "Private Endpoint is being deleted") + } + + ctx.SetConditionTrue(api.PrivateEndpointServiceReady) + r.EventRecorder.Event(akoPrivateEndpoint, "Normal", string(workflow.PrivateEndpointServiceCreated), "Private Endpoint Service is available") + + return r.handlePrivateEndpointInterface(ctx, projectID, akoPrivateEndpoint, akoPEService, atlasPEService) +} + +func (r *AtlasPrivateEndpointReconciler) handlePrivateEndpointInterface( + ctx *workflow.Context, + projectID string, + akoPrivateEndpoint *akov2.AtlasPrivateEndpoint, + akoPEService privateendpoint.EndpointService, + atlasPEService privateendpoint.EndpointService, +) (ctrl.Result, error) { + compositeInterfacesMap := privateendpoint.MapPrivateEndpoints(akoPEService.EndpointInterfaces(), atlasPEService.EndpointInterfaces()) + + if len(compositeInterfacesMap) == 0 { + return r.waitForConfiguration(ctx, akoPrivateEndpoint, atlasPEService) + } + + // we want to sync all interface, if any of them is in progress, after all we transition to in progress + pendingResources := false + for _, compositeInterfaceMap := range compositeInterfacesMap { + // The interface can be in 4 state: + // * It doesn't exist and need to be created, and it's expected to be in progress on next reconciliation + // * It exist and need to be deleted, and it's expected to in progress on next reconciliation + // * It's in progress, we skip it to wait it to be ready + // * It's failed to be provisioned, we terminate the reconciliation + inAKO := compositeInterfaceMap.AKO != nil + inAtlas := compositeInterfaceMap.Atlas != nil + inProgress := isInterfaceInProgress(compositeInterfaceMap.Atlas) + failed := hasInterfaceFailed(compositeInterfaceMap.Atlas) + + switch { + case failed: + return r.terminate( + ctx, + akoPrivateEndpoint, + atlasPEService, + api.PrivateEndpointReady, + workflow.PrivateEndpointFailedToConfigure, + errors.New(compositeInterfaceMap.Atlas.ErrorMessage()), + ) + case inProgress: + pendingResources = true + continue + case inAKO && !inAtlas: + gcpProjectID := getGCPProjectID(akoPrivateEndpoint, compositeInterfaceMap.AKO.InterfaceID()) + _, err := r.privateEndpointService.CreatePrivateEndpointInterface( + ctx.Context, + projectID, + akoPrivateEndpoint.Spec.Provider, + akoPrivateEndpoint.Status.ServiceID, + gcpProjectID, + compositeInterfaceMap.AKO, + ) + if err != nil { + return r.terminate(ctx, akoPrivateEndpoint, atlasPEService, api.PrivateEndpointReady, workflow.PrivateEndpointFailedToCreate, err) + } + pendingResources = true + case !inAKO && inAtlas: + err := r.privateEndpointService.DeleteEndpointInterface( + ctx.Context, + projectID, + akoPrivateEndpoint.Spec.Provider, + akoPrivateEndpoint.Status.ServiceID, + compositeInterfaceMap.Atlas.InterfaceID(), + ) + if err != nil { + return r.terminate(ctx, akoPrivateEndpoint, atlasPEService, api.PrivateEndpointReady, workflow.PrivateEndpointFailedToDelete, err) + } + pendingResources = true + } + } + + if pendingResources { + return r.inProgress(ctx, akoPrivateEndpoint, atlasPEService, api.PrivateEndpointReady, workflow.PrivateEndpointUpdating, "Private Endpoints are being updated") + } + + return r.ready(ctx, akoPrivateEndpoint, atlasPEService) +} + +// getOrMatchPrivateEndpointService retrieve the project by ID if one is set or try to match by provider/region +// only one private endpoint service per provider/region is allowed +func (r *AtlasPrivateEndpointReconciler) getOrMatchPrivateEndpointService( + ctx context.Context, + projectID string, + akoPrivateEndpoint *akov2.AtlasPrivateEndpoint, +) (privateendpoint.EndpointService, error) { + if akoPrivateEndpoint.Status.ServiceID != "" { + return r.privateEndpointService.GetPrivateEndpoint(ctx, projectID, akoPrivateEndpoint.Spec.Provider, akoPrivateEndpoint.Status.ServiceID) + } + + endpointServices, err := r.privateEndpointService.ListPrivateEndpoints(ctx, projectID, akoPrivateEndpoint.Spec.Provider) + if err != nil { + return nil, err + } + + for _, endpointService := range endpointServices { + if endpointService.Region() == akoPrivateEndpoint.Spec.Region { + return endpointService, err + } + } + + return nil, nil +} + +func (r *AtlasPrivateEndpointReconciler) deletePrivateEndpoint( + ctx context.Context, + projectID string, + atlasPEService privateendpoint.EndpointService, +) (privateendpoint.EndpointService, error) { + if len(atlasPEService.EndpointInterfaces()) == 0 && atlasPEService.Status() != privateendpoint.StatusDeleting { + err := r.privateEndpointService.DeleteEndpointService(ctx, projectID, atlasPEService.Provider(), atlasPEService.ServiceID()) + if err != nil { + return nil, err + } + } + + for _, i := range atlasPEService.EndpointInterfaces() { + if i.Status() != privateendpoint.StatusDeleting { + err := r.privateEndpointService.DeleteEndpointInterface(ctx, projectID, atlasPEService.Provider(), atlasPEService.ServiceID(), i.InterfaceID()) + if err != nil { + return nil, err + } + } + } + + return r.privateEndpointService.GetPrivateEndpoint(ctx, projectID, atlasPEService.Provider(), atlasPEService.ServiceID()) +} + +func isInterfaceInProgress(ep privateendpoint.EndpointInterface) bool { + if ep == nil { + return false + } + + status := ep.Status() + + return status == privateendpoint.StatusInitiating || + status == privateendpoint.StatusPending || + status == privateendpoint.StatusPendingAcceptance || + status == privateendpoint.StatusWaitingForUser || + status == privateendpoint.StatusVerified || + status == privateendpoint.StatusDeleting +} + +func hasInterfaceFailed(ep privateendpoint.EndpointInterface) bool { + if ep == nil { + return false + } + + status := ep.Status() + + return status == privateendpoint.StatusFailed || status == privateendpoint.StatusRejected +} + +func getGCPProjectID(akoPrivateEndpoint *akov2.AtlasPrivateEndpoint, interfaceID string) string { + if akoPrivateEndpoint.Spec.Provider == privateendpoint.ProviderGCP && len(akoPrivateEndpoint.Spec.GCPConfiguration) > 0 { + for _, config := range akoPrivateEndpoint.Spec.GCPConfiguration { + if config.GroupName == interfaceID { + return config.ProjectID + } + } + } + + return "" +} diff --git a/pkg/controller/atlasprivateendpoint/privateendpoint_test.go b/pkg/controller/atlasprivateendpoint/privateendpoint_test.go new file mode 100644 index 0000000000..3a06770b11 --- /dev/null +++ b/pkg/controller/atlasprivateendpoint/privateendpoint_test.go @@ -0,0 +1,1277 @@ +package atlasprivateendpoint + +import ( + "context" + "errors" + "testing" + + "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + "go.uber.org/zap/zaptest" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/tools/record" + "sigs.k8s.io/controller-runtime/pkg/client/fake" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + + "github.com/mongodb/mongodb-atlas-kubernetes/v2/internal/mocks/translation" + "github.com/mongodb/mongodb-atlas-kubernetes/v2/internal/translation/privateendpoint" + "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/api" + akov2 "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/api/v1" + "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/api/v1/status" + "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/controller/customresource" + "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/controller/workflow" +) + +func TestHandlePrivateEndpointService(t *testing.T) { + ctx := context.Background() + projectID := "project-id" + deletionTime := metav1.Now() + + testScheme := runtime.NewScheme() + require.NoError(t, akov2.AddToScheme(testScheme)) + + tests := map[string]struct { + atlasPrivateEndpoint *akov2.AtlasPrivateEndpoint + peClient func() privateendpoint.PrivateEndpointService + expectedResult reconcile.Result + expectedConditions []api.Condition + }{ + "failed to retrieve private endpoint": { + atlasPrivateEndpoint: &akov2.AtlasPrivateEndpoint{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pe1", + Namespace: "default", + }, + Spec: akov2.AtlasPrivateEndpointSpec{ + ExternalProject: &akov2.ExternalProjectReference{ + ID: projectID, + }, + LocalCredentialHolder: api.LocalCredentialHolder{}, + Provider: "AWS", + Region: "US_EAST_1", + }, + Status: status.AtlasPrivateEndpointStatus{ + ServiceID: "pe-service-id", + ServiceStatus: "AVAILABLE", + }, + }, + peClient: func() privateendpoint.PrivateEndpointService { + c := translation.NewPrivateEndpointServiceMock(t) + c.EXPECT().GetPrivateEndpoint(ctx, projectID, "AWS", "pe-service-id"). + Return(nil, errors.New("failed to get private endpoint")) + + return c + }, + expectedResult: reconcile.Result{RequeueAfter: workflow.DefaultRetry}, + expectedConditions: []api.Condition{ + api.FalseCondition(api.ReadyType). + WithReason(string(workflow.Internal)). + WithMessageRegexp("failed to get private endpoint"), + }, + }, + "failed to create private endpoint service": { + atlasPrivateEndpoint: &akov2.AtlasPrivateEndpoint{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pe1", + Namespace: "default", + }, + Spec: akov2.AtlasPrivateEndpointSpec{ + ExternalProject: &akov2.ExternalProjectReference{ + ID: projectID, + }, + LocalCredentialHolder: api.LocalCredentialHolder{}, + Provider: "AWS", + Region: "US_EAST_1", + }, + }, + peClient: func() privateendpoint.PrivateEndpointService { + c := translation.NewPrivateEndpointServiceMock(t) + c.EXPECT().ListPrivateEndpoints(ctx, projectID, "AWS"). + Return(nil, nil) + c.EXPECT().CreatePrivateEndpointService(ctx, projectID, mock.AnythingOfType("*privateendpoint.AWSService")). + Return(nil, errors.New("failed to create private endpoint")) + + return c + }, + expectedResult: reconcile.Result{RequeueAfter: workflow.DefaultRetry}, + expectedConditions: []api.Condition{ + api.FalseCondition(api.ReadyType), + api.FalseCondition(api.PrivateEndpointServiceReady). + WithReason(string(workflow.PrivateEndpointServiceFailedToCreate)). + WithMessageRegexp("failed to create private endpoint"), + }, + }, + "unmanage already deleted private endpoint": { + atlasPrivateEndpoint: &akov2.AtlasPrivateEndpoint{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pe1", + Namespace: "default", + DeletionTimestamp: &deletionTime, + Finalizers: []string{customresource.FinalizerLabel}, + }, + Spec: akov2.AtlasPrivateEndpointSpec{ + ExternalProject: &akov2.ExternalProjectReference{ + ID: projectID, + }, + LocalCredentialHolder: api.LocalCredentialHolder{}, + Provider: "AWS", + Region: "US_EAST_1", + }, + Status: status.AtlasPrivateEndpointStatus{ + ServiceID: "pe-service-id", + ServiceStatus: "AVAILABLE", + }, + }, + peClient: func() privateendpoint.PrivateEndpointService { + c := translation.NewPrivateEndpointServiceMock(t) + c.EXPECT().GetPrivateEndpoint(ctx, projectID, "AWS", "pe-service-id"). + Return(nil, nil) + + return c + }, + expectedResult: reconcile.Result{}, + }, + "unmanage protected private endpoint": { + atlasPrivateEndpoint: &akov2.AtlasPrivateEndpoint{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pe1", + Namespace: "default", + DeletionTimestamp: &deletionTime, + Finalizers: []string{customresource.FinalizerLabel}, + Annotations: map[string]string{ + customresource.ResourcePolicyAnnotation: customresource.ResourcePolicyKeep, + }, + }, + Spec: akov2.AtlasPrivateEndpointSpec{ + ExternalProject: &akov2.ExternalProjectReference{ + ID: projectID, + }, + LocalCredentialHolder: api.LocalCredentialHolder{}, + Provider: "AWS", + Region: "US_EAST_1", + }, + Status: status.AtlasPrivateEndpointStatus{ + ServiceID: "pe-service-id", + ServiceStatus: "AVAILABLE", + }, + }, + peClient: func() privateendpoint.PrivateEndpointService { + c := translation.NewPrivateEndpointServiceMock(t) + c.EXPECT().GetPrivateEndpoint(ctx, projectID, "AWS", "pe-service-id"). + Return(&privateendpoint.AWSService{}, nil) + + return c + }, + expectedResult: reconcile.Result{}, + }, + "failed to delete private endpoint": { + atlasPrivateEndpoint: &akov2.AtlasPrivateEndpoint{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pe1", + Namespace: "default", + DeletionTimestamp: &deletionTime, + Finalizers: []string{customresource.FinalizerLabel}, + }, + Spec: akov2.AtlasPrivateEndpointSpec{ + ExternalProject: &akov2.ExternalProjectReference{ + ID: projectID, + }, + LocalCredentialHolder: api.LocalCredentialHolder{}, + Provider: "AWS", + Region: "US_EAST_1", + }, + Status: status.AtlasPrivateEndpointStatus{ + ServiceID: "pe-service-id", + ServiceStatus: "AVAILABLE", + }, + }, + peClient: func() privateendpoint.PrivateEndpointService { + awsService := &privateendpoint.AWSService{ + CommonEndpointService: privateendpoint.CommonEndpointService{ + ID: "pe-service-id", + CloudRegion: "US_EAST_1", + ServiceStatus: "AVAILABLE", + Interfaces: privateendpoint.EndpointInterfaces{}, + }, + } + + c := translation.NewPrivateEndpointServiceMock(t) + c.EXPECT().GetPrivateEndpoint(ctx, projectID, "AWS", "pe-service-id"). + Return(awsService, nil) + c.EXPECT().DeleteEndpointService(ctx, projectID, "AWS", "pe-service-id"). + Return(errors.New("failed to delete private endpoint")) + + return c + }, + expectedResult: reconcile.Result{RequeueAfter: workflow.DefaultRetry}, + expectedConditions: []api.Condition{ + api.FalseCondition(api.ReadyType), + api.FalseCondition(api.PrivateEndpointServiceReady). + WithReason(string(workflow.PrivateEndpointFailedToDelete)). + WithMessageRegexp("failed to delete private endpoint"), + }, + }, + "private endpoint service is initiating": { + atlasPrivateEndpoint: &akov2.AtlasPrivateEndpoint{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pe1", + Namespace: "default", + }, + Spec: akov2.AtlasPrivateEndpointSpec{ + ExternalProject: &akov2.ExternalProjectReference{ + ID: projectID, + }, + LocalCredentialHolder: api.LocalCredentialHolder{}, + Provider: "AWS", + Region: "US_EAST_1", + }, + Status: status.AtlasPrivateEndpointStatus{ + ServiceID: "pe-service-id", + ServiceStatus: "INITIATING", + }, + }, + peClient: func() privateendpoint.PrivateEndpointService { + awsService := &privateendpoint.AWSService{ + CommonEndpointService: privateendpoint.CommonEndpointService{ + ID: "pe-service-id", + CloudRegion: "US_EAST_1", + ServiceStatus: "INITIATING", + Interfaces: privateendpoint.EndpointInterfaces{}, + }, + } + + c := translation.NewPrivateEndpointServiceMock(t) + c.EXPECT().GetPrivateEndpoint(ctx, projectID, "AWS", "pe-service-id"). + Return(awsService, nil) + + return c + }, + expectedResult: reconcile.Result{RequeueAfter: workflow.DefaultRetry}, + expectedConditions: []api.Condition{ + api.FalseCondition(api.ReadyType), + api.FalseCondition(api.PrivateEndpointServiceReady). + WithReason(string(workflow.PrivateEndpointServiceInitializing)). + WithMessageRegexp("Private Endpoint is being initialized"), + }, + }, + "private endpoint service is pending": { + atlasPrivateEndpoint: &akov2.AtlasPrivateEndpoint{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pe1", + Namespace: "default", + }, + Spec: akov2.AtlasPrivateEndpointSpec{ + ExternalProject: &akov2.ExternalProjectReference{ + ID: projectID, + }, + LocalCredentialHolder: api.LocalCredentialHolder{}, + Provider: "AWS", + Region: "US_EAST_1", + }, + Status: status.AtlasPrivateEndpointStatus{ + ServiceID: "pe-service-id", + ServiceStatus: "PENDING", + }, + }, + peClient: func() privateendpoint.PrivateEndpointService { + awsService := &privateendpoint.AWSService{ + CommonEndpointService: privateendpoint.CommonEndpointService{ + ID: "pe-service-id", + CloudRegion: "US_EAST_1", + ServiceStatus: "PENDING", + Interfaces: privateendpoint.EndpointInterfaces{}, + }, + } + + c := translation.NewPrivateEndpointServiceMock(t) + c.EXPECT().GetPrivateEndpoint(ctx, projectID, "AWS", "pe-service-id"). + Return(awsService, nil) + + return c + }, + expectedResult: reconcile.Result{RequeueAfter: workflow.DefaultRetry}, + expectedConditions: []api.Condition{ + api.FalseCondition(api.ReadyType), + api.FalseCondition(api.PrivateEndpointServiceReady). + WithReason(string(workflow.PrivateEndpointServiceInitializing)). + WithMessageRegexp("Private Endpoint is waiting for human action"), + }, + }, + "private endpoint service was rejected": { + atlasPrivateEndpoint: &akov2.AtlasPrivateEndpoint{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pe1", + Namespace: "default", + }, + Spec: akov2.AtlasPrivateEndpointSpec{ + ExternalProject: &akov2.ExternalProjectReference{ + ID: projectID, + }, + LocalCredentialHolder: api.LocalCredentialHolder{}, + Provider: "AWS", + Region: "US_EAST_1", + }, + Status: status.AtlasPrivateEndpointStatus{ + ServiceID: "pe-service-id", + ServiceStatus: "REJECTED", + }, + }, + peClient: func() privateendpoint.PrivateEndpointService { + awsService := &privateendpoint.AWSService{ + CommonEndpointService: privateendpoint.CommonEndpointService{ + ID: "pe-service-id", + CloudRegion: "US_EAST_1", + ServiceStatus: "REJECTED", + Error: "atlas could not connect the private endpoint", + Interfaces: privateendpoint.EndpointInterfaces{}, + }, + } + + c := translation.NewPrivateEndpointServiceMock(t) + c.EXPECT().GetPrivateEndpoint(ctx, projectID, "AWS", "pe-service-id"). + Return(awsService, nil) + + return c + }, + expectedResult: reconcile.Result{RequeueAfter: workflow.DefaultRetry}, + expectedConditions: []api.Condition{ + api.FalseCondition(api.ReadyType), + api.FalseCondition(api.PrivateEndpointServiceReady). + WithReason(string(workflow.PrivateEndpointServiceFailedToConfigure)). + WithMessageRegexp("atlas could not connect the private endpoint"), + }, + }, + "private endpoint service is being deleted": { + atlasPrivateEndpoint: &akov2.AtlasPrivateEndpoint{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pe1", + Namespace: "default", + }, + Spec: akov2.AtlasPrivateEndpointSpec{ + ExternalProject: &akov2.ExternalProjectReference{ + ID: projectID, + }, + LocalCredentialHolder: api.LocalCredentialHolder{}, + Provider: "AWS", + Region: "US_EAST_1", + }, + Status: status.AtlasPrivateEndpointStatus{ + ServiceID: "pe-service-id", + ServiceStatus: "DELETING", + }, + }, + peClient: func() privateendpoint.PrivateEndpointService { + awsService := &privateendpoint.AWSService{ + CommonEndpointService: privateendpoint.CommonEndpointService{ + ID: "pe-service-id", + CloudRegion: "US_EAST_1", + ServiceStatus: "DELETING", + Interfaces: privateendpoint.EndpointInterfaces{}, + }, + } + + c := translation.NewPrivateEndpointServiceMock(t) + c.EXPECT().GetPrivateEndpoint(ctx, projectID, "AWS", "pe-service-id"). + Return(awsService, nil) + + return c + }, + expectedResult: reconcile.Result{RequeueAfter: workflow.DefaultRetry}, + expectedConditions: []api.Condition{ + api.FalseCondition(api.ReadyType), + api.FalseCondition(api.PrivateEndpointServiceReady). + WithReason(string(workflow.PrivateEndpointServiceDeleting)). + WithMessageRegexp("Private Endpoint is being deleted"), + }, + }, + "private endpoint service is available": { + atlasPrivateEndpoint: &akov2.AtlasPrivateEndpoint{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pe1", + Namespace: "default", + }, + Spec: akov2.AtlasPrivateEndpointSpec{ + ExternalProject: &akov2.ExternalProjectReference{ + ID: projectID, + }, + LocalCredentialHolder: api.LocalCredentialHolder{}, + Provider: "AWS", + Region: "US_EAST_1", + }, + Status: status.AtlasPrivateEndpointStatus{ + ServiceID: "pe-service-id", + ServiceStatus: "AVAILABLE", + }, + }, + peClient: func() privateendpoint.PrivateEndpointService { + awsService := &privateendpoint.AWSService{ + CommonEndpointService: privateendpoint.CommonEndpointService{ + ID: "pe-service-id", + CloudRegion: "US_EAST_1", + ServiceStatus: "AVAILABLE", + Interfaces: privateendpoint.EndpointInterfaces{}, + }, + } + + c := translation.NewPrivateEndpointServiceMock(t) + c.EXPECT().GetPrivateEndpoint(ctx, projectID, "AWS", "pe-service-id"). + Return(awsService, nil) + + return c + }, + expectedResult: reconcile.Result{}, + expectedConditions: []api.Condition{ + api.TrueCondition(api.PrivateEndpointServiceReady), + api.FalseCondition(api.ReadyType), + api.FalseCondition(api.PrivateEndpointReady). + WithReason(string(workflow.PrivateEndpointConfigurationPending)). + WithMessageRegexp("waiting for private endpoint configuration from customer side"), + }, + }, + } + for name, tt := range tests { + t.Run(name, func(t *testing.T) { + fakeClient := fake.NewClientBuilder(). + WithScheme(testScheme). + WithObjects(tt.atlasPrivateEndpoint). + WithStatusSubresource(tt.atlasPrivateEndpoint). + Build() + + logger := zaptest.NewLogger(t).Sugar() + r := &AtlasPrivateEndpointReconciler{ + Client: fakeClient, + EventRecorder: record.NewFakeRecorder(10), + Log: logger, + privateEndpointService: tt.peClient(), + } + workflowCtx := workflow.Context{ + Context: ctx, + Log: logger, + } + + result, err := r.handlePrivateEndpointService(&workflowCtx, projectID, tt.atlasPrivateEndpoint) + assert.NoError(t, err) + assert.Equal(t, tt.expectedResult, result) + assert.True( + t, + cmp.Equal( + tt.expectedConditions, + workflowCtx.Conditions(), + cmpopts.IgnoreFields(api.Condition{}, "LastTransitionTime"), + ), + ) + }) + } +} + +func TestHandlePrivateEndpointInterfaces(t *testing.T) { + ctx := context.Background() + projectID := "project-id" + + testScheme := runtime.NewScheme() + require.NoError(t, akov2.AddToScheme(testScheme)) + + tests := map[string]struct { + atlasPrivateEndpoint *akov2.AtlasPrivateEndpoint + atlasPEService func() privateendpoint.EndpointService + peClient func() privateendpoint.PrivateEndpointService + expectedResult reconcile.Result + expectedConditions []api.Condition + }{ + "failed to create private endpoint interface": { + atlasPrivateEndpoint: &akov2.AtlasPrivateEndpoint{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pe1", + Namespace: "default", + }, + Spec: akov2.AtlasPrivateEndpointSpec{ + ExternalProject: &akov2.ExternalProjectReference{ + ID: projectID, + }, + LocalCredentialHolder: api.LocalCredentialHolder{}, + Provider: "AWS", + Region: "US_EAST_1", + AWSConfiguration: []akov2.AWSPrivateEndpointConfiguration{ + { + ID: "vpcpe-123456", + }, + }, + }, + Status: status.AtlasPrivateEndpointStatus{ + Common: api.Common{ + Conditions: []api.Condition{ + { + Type: api.ReadyType, + Status: corev1.ConditionFalse, + LastTransitionTime: metav1.Now(), + }, + { + Type: api.PrivateEndpointServiceReady, + Status: corev1.ConditionTrue, + LastTransitionTime: metav1.Now(), + }, + { + Type: api.PrivateEndpointReady, + Status: corev1.ConditionFalse, + Reason: string(workflow.PrivateEndpointConfigurationPending), + Message: "waiting for private endpoint configuration from customer side", + LastTransitionTime: metav1.Now(), + }, + }, + }, + ServiceID: "pe-service-id", + ServiceName: "aws/service/name", + ServiceStatus: "AVAILABLE", + }, + }, + atlasPEService: func() privateendpoint.EndpointService { + awsService := &privateendpoint.AWSService{ + CommonEndpointService: privateendpoint.CommonEndpointService{ + Interfaces: privateendpoint.EndpointInterfaces{}, + }, + } + + return awsService + }, + peClient: func() privateendpoint.PrivateEndpointService { + c := translation.NewPrivateEndpointServiceMock(t) + c.EXPECT(). + CreatePrivateEndpointInterface(ctx, projectID, "AWS", "pe-service-id", "", mock.AnythingOfType("*privateendpoint.AWSInterface")). + Return(nil, errors.New("failed to create private endpoint interface")) + + return c + }, + expectedResult: reconcile.Result{RequeueAfter: workflow.DefaultRetry}, + expectedConditions: []api.Condition{ + api.FalseCondition(api.ReadyType), + api.FalseCondition(api.PrivateEndpointReady). + WithReason(string(workflow.PrivateEndpointFailedToCreate)). + WithMessageRegexp("failed to create private endpoint interface"), + }, + }, + "create private endpoint interface": { + atlasPrivateEndpoint: &akov2.AtlasPrivateEndpoint{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pe1", + Namespace: "default", + }, + Spec: akov2.AtlasPrivateEndpointSpec{ + ExternalProject: &akov2.ExternalProjectReference{ + ID: projectID, + }, + LocalCredentialHolder: api.LocalCredentialHolder{}, + Provider: "GCP", + Region: "EUROPE_WEST_3", + GCPConfiguration: []akov2.GCPPrivateEndpointConfiguration{ + { + ProjectID: "customer-project-id1", + GroupName: "group-name1", + Endpoints: []akov2.GCPPrivateEndpoint{ + { + Name: "group-name1-pe1", + IP: "10.0.0.1", + }, + { + Name: "group-name1-pe2", + IP: "10.0.0.2", + }, + { + Name: "group-name1-pe3", + IP: "10.0.0.3", + }, + }, + }, + }, + }, + Status: status.AtlasPrivateEndpointStatus{ + Common: api.Common{ + Conditions: []api.Condition{ + { + Type: api.ReadyType, + Status: corev1.ConditionFalse, + LastTransitionTime: metav1.Now(), + }, + { + Type: api.PrivateEndpointServiceReady, + Status: corev1.ConditionTrue, + LastTransitionTime: metav1.Now(), + }, + { + Type: api.PrivateEndpointReady, + Status: corev1.ConditionFalse, + Reason: string(workflow.PrivateEndpointConfigurationPending), + Message: "waiting for private endpoint configuration from customer side", + LastTransitionTime: metav1.Now(), + }, + }, + }, + ServiceID: "pe-service-id", + ServiceAttachmentNames: []string{ + "atls/service/attachment/name/1", + "atls/service/attachment/name/2", + "atls/service/attachment/name/3", + }, + ServiceStatus: "AVAILABLE", + }, + }, + atlasPEService: func() privateendpoint.EndpointService { + awsService := &privateendpoint.AWSService{ + CommonEndpointService: privateendpoint.CommonEndpointService{ + Interfaces: privateendpoint.EndpointInterfaces{}, + }, + } + + return awsService + }, + peClient: func() privateendpoint.PrivateEndpointService { + c := translation.NewPrivateEndpointServiceMock(t) + c.EXPECT(). + CreatePrivateEndpointInterface( + ctx, + projectID, + "GCP", + "pe-service-id", + "customer-project-id1", + mock.AnythingOfType("*privateendpoint.GCPInterface"), + ). + Return(nil, nil) + + return c + }, + expectedResult: reconcile.Result{RequeueAfter: workflow.DefaultRetry}, + expectedConditions: []api.Condition{ + api.FalseCondition(api.ReadyType), + api.FalseCondition(api.PrivateEndpointReady). + WithReason(string(workflow.PrivateEndpointUpdating)). + WithMessageRegexp("Private Endpoints are being updated"), + }, + }, + "failed to configure private endpoint interface": { + atlasPrivateEndpoint: &akov2.AtlasPrivateEndpoint{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pe1", + Namespace: "default", + }, + Spec: akov2.AtlasPrivateEndpointSpec{ + ExternalProject: &akov2.ExternalProjectReference{ + ID: projectID, + }, + LocalCredentialHolder: api.LocalCredentialHolder{}, + Provider: "AWS", + Region: "US_EAST_1", + AWSConfiguration: []akov2.AWSPrivateEndpointConfiguration{ + { + ID: "vpcpe-123456", + }, + }, + }, + Status: status.AtlasPrivateEndpointStatus{ + Common: api.Common{ + Conditions: []api.Condition{ + { + Type: api.ReadyType, + Status: corev1.ConditionFalse, + LastTransitionTime: metav1.Now(), + }, + { + Type: api.PrivateEndpointServiceReady, + Status: corev1.ConditionTrue, + LastTransitionTime: metav1.Now(), + }, + { + Type: api.PrivateEndpointReady, + Status: corev1.ConditionFalse, + Reason: string(workflow.PrivateEndpointConfigurationPending), + Message: "waiting for private endpoint configuration from customer side", + LastTransitionTime: metav1.Now(), + }, + }, + }, + ServiceID: "pe-service-id", + ServiceName: "aws/service/name", + ServiceStatus: "AVAILABLE", + Endpoints: []status.EndpointInterfaceStatus{ + { + ID: "vpcpe-123456", + Status: "INITIATING", + }, + }, + }, + }, + atlasPEService: func() privateendpoint.EndpointService { + awsInterface := &privateendpoint.AWSInterface{ + CommonEndpointInterface: privateendpoint.CommonEndpointInterface{ + ID: "vpcpe-123456", + InterfaceStatus: "REJECTED", + Error: "failed to configure private endpoint interface", + }, + } + awsService := &privateendpoint.AWSService{ + CommonEndpointService: privateendpoint.CommonEndpointService{ + Interfaces: privateendpoint.EndpointInterfaces{awsInterface}, + }, + } + + return awsService + }, + peClient: func() privateendpoint.PrivateEndpointService { + c := translation.NewPrivateEndpointServiceMock(t) + + return c + }, + expectedResult: reconcile.Result{RequeueAfter: workflow.DefaultRetry}, + expectedConditions: []api.Condition{ + api.FalseCondition(api.ReadyType), + api.FalseCondition(api.PrivateEndpointReady). + WithReason(string(workflow.PrivateEndpointFailedToConfigure)). + WithMessageRegexp("failed to configure private endpoint interface"), + }, + }, + "failed to delete private endpoint interface": { + atlasPrivateEndpoint: &akov2.AtlasPrivateEndpoint{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pe1", + Namespace: "default", + }, + Spec: akov2.AtlasPrivateEndpointSpec{ + ExternalProject: &akov2.ExternalProjectReference{ + ID: projectID, + }, + LocalCredentialHolder: api.LocalCredentialHolder{}, + Provider: "AWS", + Region: "US_EAST_1", + }, + Status: status.AtlasPrivateEndpointStatus{ + Common: api.Common{ + Conditions: []api.Condition{ + { + Type: api.ReadyType, + Status: corev1.ConditionFalse, + LastTransitionTime: metav1.Now(), + }, + { + Type: api.PrivateEndpointServiceReady, + Status: corev1.ConditionTrue, + LastTransitionTime: metav1.Now(), + }, + { + Type: api.PrivateEndpointReady, + Status: corev1.ConditionFalse, + Reason: string(workflow.PrivateEndpointConfigurationPending), + Message: "waiting for private endpoint configuration from customer side", + LastTransitionTime: metav1.Now(), + }, + }, + }, + ServiceID: "pe-service-id", + ServiceName: "aws/service/name", + ServiceStatus: "AVAILABLE", + Endpoints: []status.EndpointInterfaceStatus{ + { + ID: "vpcpe-123456", + Status: "AVAILABLE", + }, + }, + }, + }, + atlasPEService: func() privateendpoint.EndpointService { + awsInterface := &privateendpoint.AWSInterface{ + CommonEndpointInterface: privateendpoint.CommonEndpointInterface{ + ID: "vpcpe-123456", + InterfaceStatus: "AVAILABLE", + }, + } + awsService := &privateendpoint.AWSService{ + CommonEndpointService: privateendpoint.CommonEndpointService{ + Interfaces: privateendpoint.EndpointInterfaces{awsInterface}, + }, + } + + return awsService + }, + peClient: func() privateendpoint.PrivateEndpointService { + c := translation.NewPrivateEndpointServiceMock(t) + c.EXPECT(). + DeleteEndpointInterface(ctx, projectID, "AWS", "pe-service-id", "vpcpe-123456"). + Return(errors.New("failed to delete private endpoint interface")) + + return c + }, + expectedResult: reconcile.Result{RequeueAfter: workflow.DefaultRetry}, + expectedConditions: []api.Condition{ + api.FalseCondition(api.ReadyType), + api.FalseCondition(api.PrivateEndpointReady). + WithReason(string(workflow.PrivateEndpointFailedToDelete)). + WithMessageRegexp("failed to delete private endpoint interface"), + }, + }, + "delete private endpoint interface": { + atlasPrivateEndpoint: &akov2.AtlasPrivateEndpoint{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pe1", + Namespace: "default", + }, + Spec: akov2.AtlasPrivateEndpointSpec{ + ExternalProject: &akov2.ExternalProjectReference{ + ID: projectID, + }, + LocalCredentialHolder: api.LocalCredentialHolder{}, + Provider: "AWS", + Region: "US_EAST_1", + }, + Status: status.AtlasPrivateEndpointStatus{ + Common: api.Common{ + Conditions: []api.Condition{ + { + Type: api.ReadyType, + Status: corev1.ConditionTrue, + LastTransitionTime: metav1.Now(), + }, + { + Type: api.PrivateEndpointServiceReady, + Status: corev1.ConditionTrue, + LastTransitionTime: metav1.Now(), + }, + { + Type: api.PrivateEndpointReady, + Status: corev1.ConditionTrue, + LastTransitionTime: metav1.Now(), + }, + }, + }, + ServiceID: "pe-service-id", + ServiceName: "aws/service/name", + ServiceStatus: "AVAILABLE", + Endpoints: []status.EndpointInterfaceStatus{ + { + ID: "vpcpe-123456", + Status: "AVAILABLE", + }, + }, + }, + }, + atlasPEService: func() privateendpoint.EndpointService { + awsInterface := &privateendpoint.AWSInterface{ + CommonEndpointInterface: privateendpoint.CommonEndpointInterface{ + ID: "vpcpe-123456", + InterfaceStatus: "AVAILABLE", + }, + } + awsService := &privateendpoint.AWSService{ + CommonEndpointService: privateendpoint.CommonEndpointService{ + Interfaces: privateendpoint.EndpointInterfaces{awsInterface}, + }, + } + + return awsService + }, + peClient: func() privateendpoint.PrivateEndpointService { + c := translation.NewPrivateEndpointServiceMock(t) + c.EXPECT(). + DeleteEndpointInterface(ctx, projectID, "AWS", "pe-service-id", "vpcpe-123456"). + Return(nil) + + return c + }, + expectedResult: reconcile.Result{RequeueAfter: workflow.DefaultRetry}, + expectedConditions: []api.Condition{ + api.FalseCondition(api.ReadyType), + api.FalseCondition(api.PrivateEndpointReady). + WithReason(string(workflow.PrivateEndpointUpdating)). + WithMessageRegexp("Private Endpoints are being updated"), + }, + }, + "private endpoints are in progress": { + atlasPrivateEndpoint: &akov2.AtlasPrivateEndpoint{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pe1", + Namespace: "default", + }, + Spec: akov2.AtlasPrivateEndpointSpec{ + ExternalProject: &akov2.ExternalProjectReference{ + ID: projectID, + }, + LocalCredentialHolder: api.LocalCredentialHolder{}, + Provider: "AZURE", + Region: "GERMANY_NORTH", + AzureConfiguration: []akov2.AzurePrivateEndpointConfiguration{ + { + ID: "azure/resource/id", + IP: "10.0.0.2", + }, + }, + }, + Status: status.AtlasPrivateEndpointStatus{ + Common: api.Common{ + Conditions: []api.Condition{ + { + Type: api.ReadyType, + Status: corev1.ConditionTrue, + LastTransitionTime: metav1.Now(), + }, + { + Type: api.PrivateEndpointServiceReady, + Status: corev1.ConditionTrue, + LastTransitionTime: metav1.Now(), + }, + { + Type: api.PrivateEndpointReady, + Status: corev1.ConditionTrue, + LastTransitionTime: metav1.Now(), + }, + }, + }, + ServiceID: "pe-service-id", + ResourceID: "atlas/azure/resource/id", + ServiceStatus: "AVAILABLE", + Endpoints: []status.EndpointInterfaceStatus{ + { + ID: "azure/resource/id", + ConnectionName: "atlas-connection-name", + Status: "INITIATING", + }, + }, + }, + }, + atlasPEService: func() privateendpoint.EndpointService { + azureInterface := &privateendpoint.AzureInterface{ + CommonEndpointInterface: privateendpoint.CommonEndpointInterface{ + ID: "azure/resource/id", + InterfaceStatus: "INITIATING", + }, + IP: "10.0.0.2", + ConnectionName: "atlas-connection-name", + } + azureService := &privateendpoint.AzureService{ + CommonEndpointService: privateendpoint.CommonEndpointService{ + ID: "pe-service-id", + CloudRegion: "GERMANY_NORTH", + ServiceStatus: "AVAILABLE", + Interfaces: privateendpoint.EndpointInterfaces{azureInterface}, + }, + ServiceName: "atlas/azure/service/name", + ResourceID: "atlas/azure/resource/id", + } + + return azureService + }, + peClient: func() privateendpoint.PrivateEndpointService { + c := translation.NewPrivateEndpointServiceMock(t) + + return c + }, + expectedResult: reconcile.Result{RequeueAfter: workflow.DefaultRetry}, + expectedConditions: []api.Condition{ + api.FalseCondition(api.ReadyType), + api.FalseCondition(api.PrivateEndpointReady). + WithReason(string(workflow.PrivateEndpointUpdating)). + WithMessageRegexp("Private Endpoints are being updated"), + }, + }, + "private endpoints are ready": { + atlasPrivateEndpoint: &akov2.AtlasPrivateEndpoint{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pe1", + Namespace: "default", + }, + Spec: akov2.AtlasPrivateEndpointSpec{ + ExternalProject: &akov2.ExternalProjectReference{ + ID: projectID, + }, + LocalCredentialHolder: api.LocalCredentialHolder{}, + Provider: "AZURE", + Region: "GERMANY_NORTH", + AzureConfiguration: []akov2.AzurePrivateEndpointConfiguration{ + { + ID: "azure/resource/id", + IP: "10.0.0.2", + }, + }, + }, + Status: status.AtlasPrivateEndpointStatus{ + Common: api.Common{ + Conditions: []api.Condition{ + { + Type: api.ReadyType, + Status: corev1.ConditionTrue, + LastTransitionTime: metav1.Now(), + }, + { + Type: api.PrivateEndpointServiceReady, + Status: corev1.ConditionTrue, + LastTransitionTime: metav1.Now(), + }, + { + Type: api.PrivateEndpointReady, + Status: corev1.ConditionTrue, + LastTransitionTime: metav1.Now(), + }, + }, + }, + ServiceID: "pe-service-id", + ResourceID: "atlas/azure/resource/id", + ServiceStatus: "AVAILABLE", + Endpoints: []status.EndpointInterfaceStatus{ + { + ID: "azure/resource/id", + ConnectionName: "atlas-connection-name", + Status: "AVAILABLE", + }, + }, + }, + }, + atlasPEService: func() privateendpoint.EndpointService { + azureInterface := &privateendpoint.AzureInterface{ + CommonEndpointInterface: privateendpoint.CommonEndpointInterface{ + ID: "azure/resource/id", + InterfaceStatus: "AVAILABLE", + }, + IP: "10.0.0.2", + ConnectionName: "atlas-connection-name", + } + azureService := &privateendpoint.AzureService{ + CommonEndpointService: privateendpoint.CommonEndpointService{ + ID: "pe-service-id", + CloudRegion: "GERMANY_NORTH", + ServiceStatus: "AVAILABLE", + Interfaces: privateendpoint.EndpointInterfaces{azureInterface}, + }, + ServiceName: "atlas/azure/service/name", + ResourceID: "atlas/azure/resource/id", + } + + return azureService + }, + peClient: func() privateendpoint.PrivateEndpointService { + c := translation.NewPrivateEndpointServiceMock(t) + + return c + }, + expectedResult: reconcile.Result{}, + expectedConditions: []api.Condition{ + api.TrueCondition(api.PrivateEndpointServiceReady), + api.TrueCondition(api.PrivateEndpointReady), + api.TrueCondition(api.ReadyType), + }, + }, + } + for name, tt := range tests { + t.Run(name, func(t *testing.T) { + fakeClient := fake.NewClientBuilder(). + WithScheme(testScheme). + WithObjects(tt.atlasPrivateEndpoint). + WithStatusSubresource(tt.atlasPrivateEndpoint). + Build() + + logger := zaptest.NewLogger(t).Sugar() + r := &AtlasPrivateEndpointReconciler{ + Client: fakeClient, + EventRecorder: record.NewFakeRecorder(10), + Log: logger, + privateEndpointService: tt.peClient(), + } + workflowCtx := workflow.Context{ + Context: ctx, + Log: logger, + } + + akoPEService := privateendpoint.NewPrivateEndpoint(tt.atlasPrivateEndpoint) + result, err := r.handlePrivateEndpointInterface(&workflowCtx, projectID, tt.atlasPrivateEndpoint, akoPEService, tt.atlasPEService()) + assert.NoError(t, err) + assert.Equal(t, tt.expectedResult, result) + t.Log(cmp.Diff( + tt.expectedConditions, + workflowCtx.Conditions(), + cmpopts.IgnoreFields(api.Condition{}, "LastTransitionTime"), + )) + assert.True( + t, + cmp.Equal( + tt.expectedConditions, + workflowCtx.Conditions(), + cmpopts.IgnoreFields(api.Condition{}, "LastTransitionTime"), + ), + ) + }) + } +} + +func TestGetPrivateEndpointService(t *testing.T) { + ctx := context.Background() + projectID := "project-id" + + testScheme := runtime.NewScheme() + require.NoError(t, akov2.AddToScheme(testScheme)) + + tests := map[string]struct { + atlasPrivateEndpoint *akov2.AtlasPrivateEndpoint + peClient func() privateendpoint.PrivateEndpointService + expectedResult privateendpoint.EndpointService + expectedErr error + }{ + "failed to list private endpoint to match": { + atlasPrivateEndpoint: &akov2.AtlasPrivateEndpoint{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pe1", + Namespace: "default", + }, + Spec: akov2.AtlasPrivateEndpointSpec{ + ExternalProject: &akov2.ExternalProjectReference{ + ID: projectID, + }, + LocalCredentialHolder: api.LocalCredentialHolder{}, + Provider: "AWS", + Region: "US_EAST_1", + }, + }, + peClient: func() privateendpoint.PrivateEndpointService { + c := translation.NewPrivateEndpointServiceMock(t) + c.EXPECT().ListPrivateEndpoints(ctx, projectID, "AWS"). + Return(nil, errors.New("failed to list private endpoints")) + + return c + }, + expectedErr: errors.New("failed to list private endpoints"), + }, + "match private endpoint from the list": { + atlasPrivateEndpoint: &akov2.AtlasPrivateEndpoint{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pe1", + Namespace: "default", + }, + Spec: akov2.AtlasPrivateEndpointSpec{ + ExternalProject: &akov2.ExternalProjectReference{ + ID: projectID, + }, + LocalCredentialHolder: api.LocalCredentialHolder{}, + Provider: "AWS", + Region: "US_EAST_1", + }, + }, + peClient: func() privateendpoint.PrivateEndpointService { + c := translation.NewPrivateEndpointServiceMock(t) + c.EXPECT().ListPrivateEndpoints(ctx, projectID, "AWS"). + Return( + []privateendpoint.EndpointService{ + &privateendpoint.AWSService{ + CommonEndpointService: privateendpoint.CommonEndpointService{ + ID: "pe-service-id-1", + CloudRegion: "EU_CENTRAL_1", + ServiceStatus: "AVAILABLE", + }, + }, + &privateendpoint.AWSService{ + CommonEndpointService: privateendpoint.CommonEndpointService{ + ID: "pe-service-id-2", + CloudRegion: "US_EAST_1", + ServiceStatus: "AVAILABLE", + }, + }, + }, + nil, + ) + + return c + }, + expectedResult: &privateendpoint.AWSService{ + CommonEndpointService: privateendpoint.CommonEndpointService{ + ID: "pe-service-id-2", + CloudRegion: "US_EAST_1", + ServiceStatus: "AVAILABLE", + }, + }, + }, + } + for name, tt := range tests { + t.Run(name, func(t *testing.T) { + r := &AtlasPrivateEndpointReconciler{ + privateEndpointService: tt.peClient(), + } + + result, err := r.getOrMatchPrivateEndpointService(ctx, projectID, tt.atlasPrivateEndpoint) + assert.Equal(t, tt.expectedErr, err) + assert.Equal(t, tt.expectedResult, result) + }) + } +} + +func TestDeletePrivateEndpoint(t *testing.T) { + ctx := context.Background() + projectID := "project-id" + + testScheme := runtime.NewScheme() + require.NoError(t, akov2.AddToScheme(testScheme)) + + tests := map[string]struct { + peService privateendpoint.EndpointService + peClient func() privateendpoint.PrivateEndpointService + expectedErr error + }{ + "failed to delete private endpoint interface": { + peService: &privateendpoint.AWSService{ + CommonEndpointService: privateendpoint.CommonEndpointService{ + ID: "pe-service-id", + CloudRegion: "US_EAST_1", + ServiceStatus: "AVAILABLE", + Interfaces: privateendpoint.EndpointInterfaces{ + &privateendpoint.AWSInterface{ + CommonEndpointInterface: privateendpoint.CommonEndpointInterface{ + ID: "vpcpe-123456", + InterfaceStatus: "AVAILABLE", + }, + }, + }, + }, + }, + peClient: func() privateendpoint.PrivateEndpointService { + c := translation.NewPrivateEndpointServiceMock(t) + c.EXPECT().DeleteEndpointInterface(ctx, projectID, "AWS", "pe-service-id", "vpcpe-123456"). + Return(errors.New("failed to delete private endpoint interface")) + + return c + }, + expectedErr: errors.New("failed to delete private endpoint interface"), + }, + "delete private endpoint interface": { + peService: &privateendpoint.AWSService{ + CommonEndpointService: privateendpoint.CommonEndpointService{ + ID: "pe-service-id", + CloudRegion: "US_EAST_1", + ServiceStatus: "AVAILABLE", + Interfaces: privateendpoint.EndpointInterfaces{ + &privateendpoint.AWSInterface{ + CommonEndpointInterface: privateendpoint.CommonEndpointInterface{ + ID: "vpcpe-123456", + InterfaceStatus: "AVAILABLE", + }, + }, + }, + }, + }, + peClient: func() privateendpoint.PrivateEndpointService { + c := translation.NewPrivateEndpointServiceMock(t) + c.EXPECT().DeleteEndpointInterface(ctx, projectID, "AWS", "pe-service-id", "vpcpe-123456"). + Return(nil) + c.EXPECT().GetPrivateEndpoint(ctx, projectID, "AWS", "pe-service-id"). + Return(&privateendpoint.AWSService{}, nil) + + return c + }, + }, + } + for name, tt := range tests { + t.Run(name, func(t *testing.T) { + r := &AtlasPrivateEndpointReconciler{ + privateEndpointService: tt.peClient(), + } + + _, err := r.deletePrivateEndpoint(ctx, projectID, tt.peService) + assert.Equal(t, tt.expectedErr, err) + }) + } +} diff --git a/pkg/controller/atlasproject/atlasproject_controller.go b/pkg/controller/atlasproject/atlasproject_controller.go index 100a413e8d..cd10dde041 100644 --- a/pkg/controller/atlasproject/atlasproject_controller.go +++ b/pkg/controller/atlasproject/atlasproject_controller.go @@ -101,6 +101,7 @@ func (r *AtlasProjectReconciler) Reconcile(ctx context.Context, req ctrl.Request return result.ReconcileResult(), nil } } + err := customresource.ApplyLastConfigSkipped(ctx, atlasProject, r.Client) if err != nil { log.Errorw("Failed to apply last skipped config", "error", err) diff --git a/pkg/controller/atlasproject/private_endpoint.go b/pkg/controller/atlasproject/private_endpoint.go index 9402f9f400..8e7c2a4dea 100644 --- a/pkg/controller/atlasproject/private_endpoint.go +++ b/pkg/controller/atlasproject/private_endpoint.go @@ -2,7 +2,9 @@ package atlasproject import ( "context" + "encoding/json" "errors" + "fmt" "net/http" "go.mongodb.org/atlas-sdk/v20231115008/admin" @@ -14,10 +16,24 @@ import ( akov2 "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/api/v1" "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/api/v1/provider" "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/api/v1/status" + "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/controller/customresource" "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/controller/workflow" ) func ensurePrivateEndpoint(workflowCtx *workflow.Context, project *akov2.AtlasProject) workflow.Result { + skipped, err := hasSkippedPrivateEndpointConfiguration(project) + if err != nil { + return workflow.Terminate(workflow.Internal, err.Error()) + } + + if skipped { + workflowCtx.EnsureStatusOption(status.AtlasProjectAddPrivateEndpointsOption(nil)) + workflowCtx.UnsetCondition(api.PrivateEndpointServiceReadyType) + workflowCtx.UnsetCondition(api.PrivateEndpointReadyType) + + return workflow.OK() + } + specPEs := project.Spec.DeepCopy().PrivateEndpoints atlasPEs, err := getAllPrivateEndpoints(workflowCtx.Context, workflowCtx.SdkClient, project.ID()) @@ -555,3 +571,17 @@ type intersectionPair struct { spec akov2.PrivateEndpoint atlas atlasPE } + +func hasSkippedPrivateEndpointConfiguration(atlasProject *akov2.AtlasProject) (bool, error) { + lastSkippedSpec := akov2.AtlasProjectSpec{} + lastSkippedSpecString, ok := atlasProject.Annotations[customresource.AnnotationLastSkippedConfiguration] + if ok { + if err := json.Unmarshal([]byte(lastSkippedSpecString), &lastSkippedSpec); err != nil { + return false, fmt.Errorf("failed to parse last skipped configuration: %w", err) + } + + return len(lastSkippedSpec.PrivateEndpoints) == 0, nil + } + + return false, nil +} diff --git a/pkg/controller/atlasproject/project.go b/pkg/controller/atlasproject/project.go index dab8c274d7..11262851ac 100644 --- a/pkg/controller/atlasproject/project.go +++ b/pkg/controller/atlasproject/project.go @@ -183,5 +183,21 @@ func (r *AtlasProjectReconciler) hasDependencies(ctx *workflow.Context, project return false, err } - return len(customRoles.Items) > 0, nil + if len(customRoles.Items) > 0 { + return true, nil + } + + privateEndpoints := &akov2.AtlasPrivateEndpointList{} + listOps = &client.ListOptions{ + FieldSelector: fields.OneTermEqualSelector( + indexer.AtlasPrivateEndpointByProjectIndex, + client.ObjectKeyFromObject(project).String(), + ), + } + err = r.Client.List(ctx.Context, privateEndpoints, listOps) + if err != nil { + return false, err + } + + return len(privateEndpoints.Items) > 0, nil } diff --git a/pkg/controller/atlasproject/project_test.go b/pkg/controller/atlasproject/project_test.go index 70c038345d..a8efa341b2 100644 --- a/pkg/controller/atlasproject/project_test.go +++ b/pkg/controller/atlasproject/project_test.go @@ -620,17 +620,14 @@ func TestHandleProject(t *testing.T) { } instancesIndexer := indexer.NewAtlasStreamInstanceByProjectIndexer(zaptest.NewLogger(t)) customRoleIndexer := indexer.NewAtlasCustomRoleByProjectIndexer(logger.Desugar()) - + peIndexer := indexer.NewAtlasPrivateEndpointByProjectIndexer(zaptest.NewLogger(t)) k8sClient := fake.NewClientBuilder(). WithScheme(testScheme). WithObjects(tt.project). WithStatusSubresource(tt.project). - WithIndex( - instancesIndexer.Object(), - instancesIndexer.Name(), - instancesIndexer.Keys, - ). + WithIndex(instancesIndexer.Object(), instancesIndexer.Name(), instancesIndexer.Keys). WithIndex(customRoleIndexer.Object(), customRoleIndexer.Name(), customRoleIndexer.Keys). + WithIndex(peIndexer.Object(), peIndexer.Name(), peIndexer.Keys). WithInterceptorFuncs(tt.interceptors). Build() @@ -1083,16 +1080,14 @@ func TestDelete(t *testing.T) { instancesIndexer := indexer.NewAtlasStreamInstanceByProjectIndexer(zaptest.NewLogger(t)) customRoleIndexer := indexer.NewAtlasCustomRoleByProjectIndexer(logger.Desugar()) + peIndexer := indexer.NewAtlasPrivateEndpointByProjectIndexer(zaptest.NewLogger(t)) k8sClient := fake.NewClientBuilder(). WithScheme(testScheme). WithObjects(tt.objects...). WithStatusSubresource(tt.objects...). - WithIndex( - instancesIndexer.Object(), - instancesIndexer.Name(), - instancesIndexer.Keys, - ). + WithIndex(instancesIndexer.Object(), instancesIndexer.Name(), instancesIndexer.Keys). WithIndex(customRoleIndexer.Object(), customRoleIndexer.Name(), customRoleIndexer.Keys). + WithIndex(peIndexer.Object(), peIndexer.Name(), peIndexer.Keys). WithInterceptorFuncs(tt.interceptors). Build() @@ -1166,19 +1161,18 @@ func TestHasDependencies(t *testing.T) { ctx := &workflow.Context{ Context: context.Background(), } + logger := zaptest.NewLogger(t) instanceIndexer := indexer.NewAtlasStreamInstanceByProjectIndexer(zaptest.NewLogger(t)) customRoleIndexer := indexer.NewAtlasCustomRoleByProjectIndexer(zap.L()) + peIndexer := indexer.NewAtlasPrivateEndpointByProjectIndexer(logger) testScheme := runtime.NewScheme() require.NoError(t, akov2.AddToScheme(testScheme)) k8sClient := fake.NewClientBuilder(). WithScheme(testScheme). WithObjects(p). - WithIndex( - instanceIndexer.Object(), - instanceIndexer.Name(), - instanceIndexer.Keys, - ). + WithIndex(instanceIndexer.Object(), instanceIndexer.Name(), instanceIndexer.Keys). WithIndex(customRoleIndexer.Object(), customRoleIndexer.Name(), customRoleIndexer.Keys). + WithIndex(peIndexer.Object(), peIndexer.Name(), peIndexer.Keys). Build() reconciler := &AtlasProjectReconciler{ Client: k8sClient, @@ -1189,7 +1183,7 @@ func TestHasDependencies(t *testing.T) { assert.False(t, ok) }) - t.Run("should return true when project has dependencies", func(t *testing.T) { + t.Run("should return true when project has streams as dependencies", func(t *testing.T) { p := &akov2.AtlasProject{ ObjectMeta: metav1.ObjectMeta{ Name: "my-project", @@ -1208,17 +1202,16 @@ func TestHasDependencies(t *testing.T) { }, }, } - instanceIndexer := indexer.NewAtlasStreamInstanceByProjectIndexer(zaptest.NewLogger(t)) + logger := zaptest.NewLogger(t) + instanceIndexer := indexer.NewAtlasStreamInstanceByProjectIndexer(logger) + peIndexer := indexer.NewAtlasPrivateEndpointByProjectIndexer(logger) testScheme := runtime.NewScheme() require.NoError(t, akov2.AddToScheme(testScheme)) k8sClient := fake.NewClientBuilder(). WithScheme(testScheme). WithObjects(p, streamsInstance). - WithIndex( - instanceIndexer.Object(), - instanceIndexer.Name(), - instanceIndexer.Keys, - ). + WithIndex(instanceIndexer.Object(), instanceIndexer.Name(), instanceIndexer.Keys). + WithIndex(peIndexer.Object(), peIndexer.Name(), peIndexer.Keys). Build() reconciler := &AtlasProjectReconciler{ Client: k8sClient, @@ -1231,4 +1224,48 @@ func TestHasDependencies(t *testing.T) { require.NoError(t, err) assert.True(t, ok) }) + + t.Run("should return true when project has private endpoints as dependencies", func(t *testing.T) { + p := &akov2.AtlasProject{ + ObjectMeta: metav1.ObjectMeta{ + Name: "my-project", + Namespace: "default", + }, + } + pe := &akov2.AtlasPrivateEndpoint{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pe-0", + Namespace: "default", + }, + Spec: akov2.AtlasPrivateEndpointSpec{ + Project: &common.ResourceRefNamespaced{ + Name: "my-project", + Namespace: "default", + }, + }, + } + logger := zaptest.NewLogger(t) + ctx := &workflow.Context{ + Context: context.Background(), + } + instanceIndexer := indexer.NewAtlasStreamInstanceByProjectIndexer(logger) + customRolesIndexer := indexer.NewAtlasCustomRoleByProjectIndexer(logger) + peIndexer := indexer.NewAtlasPrivateEndpointByProjectIndexer(logger) + testScheme := runtime.NewScheme() + require.NoError(t, akov2.AddToScheme(testScheme)) + k8sClient := fake.NewClientBuilder(). + WithScheme(testScheme). + WithObjects(p, pe). + WithIndex(instanceIndexer.Object(), instanceIndexer.Name(), instanceIndexer.Keys). + WithIndex(customRolesIndexer.Object(), customRolesIndexer.Name(), customRolesIndexer.Keys). + WithIndex(peIndexer.Object(), peIndexer.Name(), peIndexer.Keys). + Build() + reconciler := &AtlasProjectReconciler{ + Client: k8sClient, + } + + ok, err := reconciler.hasDependencies(ctx, p) + require.NoError(t, err) + assert.True(t, ok) + }) } diff --git a/pkg/controller/customresource/protection.go b/pkg/controller/customresource/protection.go index 6138d61614..5d07b026ec 100644 --- a/pkg/controller/customresource/protection.go +++ b/pkg/controller/customresource/protection.go @@ -47,6 +47,7 @@ func ApplyLastConfigApplied(ctx context.Context, resource api.AtlasCustomResourc func ApplyLastConfigSkipped(ctx context.Context, resource api.AtlasCustomResource, k8sClient client.Client) error { return applyLastSpec(ctx, resource, k8sClient, AnnotationLastSkippedConfiguration) } + func applyLastSpec(ctx context.Context, resource api.AtlasCustomResource, k8sClient client.Client, annotationKey string) error { uObj, err := runtime.DefaultUnstructuredConverter.ToUnstructured(resource) if err != nil { diff --git a/pkg/controller/customresource/protection_test.go b/pkg/controller/customresource/protection_test.go index 7bea4583b4..45beaef7bd 100644 --- a/pkg/controller/customresource/protection_test.go +++ b/pkg/controller/customresource/protection_test.go @@ -153,6 +153,22 @@ func TestApplyLastConfigApplied(t *testing.T) { assert.Equal(t, expectedConfig, annot[customresource.AnnotationLastAppliedConfiguration]) } +func TestApplyLastConfigSkipped(t *testing.T) { + resource := sampleResource() + resource.Name = "foo" + resource.Spec.Username = "test-user" + + scheme := runtime.NewScheme() + utilruntime.Must(akov2.AddToScheme(scheme)) + c := fake.NewClientBuilder().WithObjects(resource).WithScheme(scheme).Build() + assert.NoError(t, customresource.ApplyLastConfigSkipped(context.Background(), resource, c)) + + annot := resource.GetAnnotations() + assert.NotEmpty(t, annot) + expectedConfig := `{"roles":null,"username":"test-user"}` + assert.Equal(t, expectedConfig, annot[customresource.AnnotationLastSkippedConfiguration]) +} + func TestIsResourceManagedByOperator(t *testing.T) { testCases := []struct { title string diff --git a/pkg/controller/workflow/reason.go b/pkg/controller/workflow/reason.go index e3759d88ac..3247bb0959 100644 --- a/pkg/controller/workflow/reason.go +++ b/pkg/controller/workflow/reason.go @@ -137,3 +137,17 @@ const ( AtlasCustomRoleNotUpdated ConditionReason = "CustomRoleNotUpdated" AtlasCustomRoleNotDeleted ConditionReason = "CustomRoleNotDeleted" ) + +// Atlas Private Endpoint reasons +const ( + PrivateEndpointServiceCreated ConditionReason = "PrivateEndpointServiceCreated" + PrivateEndpointServiceFailedToCreate ConditionReason = "PrivateEndpointServiceFailedToCreate" + PrivateEndpointServiceFailedToConfigure ConditionReason = "PrivateEndpointServiceFailedToConfigure" + PrivateEndpointServiceInitializing ConditionReason = "PrivateEndpointServiceInitializing" + PrivateEndpointServiceDeleting ConditionReason = "PrivateEndpointServiceDeleting" + PrivateEndpointFailedToCreate ConditionReason = "PrivateEndpointFailedToCreate" + PrivateEndpointUpdating ConditionReason = "PrivateEndpointUpdating" + PrivateEndpointConfigurationPending ConditionReason = "PrivateEndpointConfigurationPending" + PrivateEndpointFailedToConfigure ConditionReason = "PrivateEndpointFailedToConfigure" + PrivateEndpointFailedToDelete ConditionReason = "PrivateEndpointFailedToDelete" +) diff --git a/pkg/indexer/atlasprivateendpointcredentials.go b/pkg/indexer/atlasprivateendpointcredentials.go new file mode 100644 index 0000000000..a6df75e703 --- /dev/null +++ b/pkg/indexer/atlasprivateendpointcredentials.go @@ -0,0 +1,24 @@ +package indexer + +import ( + "go.uber.org/zap" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + + akov2 "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/api/v1" +) + +const ( + AtlasPrivateEndpointCredentialsIndex = "atlasprivateendpoint.credentials" +) + +func NewAtlasPrivateEndpointByCredentialIndexer(logger *zap.Logger) *LocalCredentialIndexer { + return NewLocalCredentialsIndexer(AtlasPrivateEndpointCredentialsIndex, &akov2.AtlasPrivateEndpoint{}, logger) +} + +func PrivateEndpointRequests(list *akov2.AtlasPrivateEndpointList) []reconcile.Request { + requests := make([]reconcile.Request, 0, len(list.Items)) + for _, item := range list.Items { + requests = append(requests, toRequest(&item)) + } + return requests +} diff --git a/pkg/indexer/atlasprivateendpointsprojects.go b/pkg/indexer/atlasprivateendpointsprojects.go new file mode 100644 index 0000000000..9abe5e12eb --- /dev/null +++ b/pkg/indexer/atlasprivateendpointsprojects.go @@ -0,0 +1,44 @@ +package indexer + +import ( + "go.uber.org/zap" + "sigs.k8s.io/controller-runtime/pkg/client" + + akov2 "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/api/v1" +) + +const ( + AtlasPrivateEndpointByProjectIndex = "atlasprivateendpoint.spec.projectRef" +) + +type AtlasPrivateEndpointByProjectIndexer struct { + logger *zap.SugaredLogger +} + +func NewAtlasPrivateEndpointByProjectIndexer(logger *zap.Logger) *AtlasPrivateEndpointByProjectIndexer { + return &AtlasPrivateEndpointByProjectIndexer{ + logger: logger.Named(AtlasPrivateEndpointByProjectIndex).Sugar(), + } +} + +func (*AtlasPrivateEndpointByProjectIndexer) Object() client.Object { + return &akov2.AtlasPrivateEndpoint{} +} + +func (*AtlasPrivateEndpointByProjectIndexer) Name() string { + return AtlasPrivateEndpointByProjectIndex +} + +func (a *AtlasPrivateEndpointByProjectIndexer) Keys(object client.Object) []string { + pe, ok := object.(*akov2.AtlasPrivateEndpoint) + if !ok { + a.logger.Errorf("expected *akov2.AtlasPrivateEndpoint but got %T", object) + return nil + } + + if pe.Spec.Project == nil || pe.Spec.Project.Name == "" { + return nil + } + + return []string{pe.Spec.Project.GetObject(pe.GetNamespace()).String()} +} diff --git a/pkg/indexer/atlasprivateendpointsprojects_test.go b/pkg/indexer/atlasprivateendpointsprojects_test.go new file mode 100644 index 0000000000..1a6af30d2d --- /dev/null +++ b/pkg/indexer/atlasprivateendpointsprojects_test.go @@ -0,0 +1,49 @@ +package indexer + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "go.uber.org/zap/zaptest" + + akov2 "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/api/v1" + "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/api/v1/common" +) + +func TestAtlasPrivateEndpointsByProjectIndices(t *testing.T) { + t.Run("should return nil when instance has no project associated to it", func(t *testing.T) { + pe := &akov2.AtlasPrivateEndpoint{ + Spec: akov2.AtlasPrivateEndpointSpec{ + Provider: "AWS", + Region: "US_EAST_1", + }, + } + + indexer := NewAtlasPrivateEndpointByProjectIndexer(zaptest.NewLogger(t)) + keys := indexer.Keys(pe) + assert.Nil(t, keys) + }) + + t.Run("should return indexes slice when instance has project associated to it", func(t *testing.T) { + pe := &akov2.AtlasPrivateEndpoint{ + Spec: akov2.AtlasPrivateEndpointSpec{ + Provider: "AWS", + Region: "US_EAST_1", + Project: &common.ResourceRefNamespaced{ + Name: "project-1", + Namespace: "default", + }, + }, + } + + indexer := NewAtlasPrivateEndpointByProjectIndexer(zaptest.NewLogger(t)) + keys := indexer.Keys(pe) + assert.Equal( + t, + []string{ + "default/project-1", + }, + keys, + ) + }) +} diff --git a/pkg/indexer/indexer.go b/pkg/indexer/indexer.go index 7f8f8f6212..83e03dc299 100644 --- a/pkg/indexer/indexer.go +++ b/pkg/indexer/indexer.go @@ -38,6 +38,8 @@ func RegisterAll(ctx context.Context, mgr manager.Manager, logger *zap.Logger) e NewAtlasDataFederationByProjectIndexer(logger), NewAtlasCustomRoleByCredentialIndexer(logger), NewAtlasCustomRoleByProjectIndexer(logger), + NewAtlasPrivateEndpointByCredentialIndexer(logger), + NewAtlasPrivateEndpointByProjectIndexer(logger), ) } diff --git a/pkg/operator/builder.go b/pkg/operator/builder.go index 294870f759..8b18c2bdbb 100644 --- a/pkg/operator/builder.go +++ b/pkg/operator/builder.go @@ -7,8 +7,6 @@ import ( "os" "time" - "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/controller/atlascustomrole" - "go.uber.org/zap" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/labels" @@ -27,10 +25,12 @@ import ( "github.com/mongodb/mongodb-atlas-kubernetes/v2/internal/featureflags" "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/controller/atlas" "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/controller/atlasbackupcompliancepolicy" + "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/controller/atlascustomrole" "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/controller/atlasdatabaseuser" "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/controller/atlasdatafederation" "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/controller/atlasdeployment" "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/controller/atlasfederatedauth" + "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/controller/atlasprivateendpoint" "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/controller/atlasproject" "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/controller/atlassearchindexconfig" "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/controller/atlasstream" @@ -335,6 +335,17 @@ func (b *Builder) Build(ctx context.Context) (manager.Manager, error) { return nil, fmt.Errorf("unable to create controller AtlasCustomRole: %w", err) } + peReconciler := atlasprivateendpoint.NewAtlasPrivateEndpointReconciler( + mgr, + b.predicates, + b.atlasProvider, + b.deletionProtection, + b.logger, + ) + if err = peReconciler.SetupWithManager(mgr, b.skipNameValidation); err != nil { + return nil, fmt.Errorf("unable to create controller AtlasPrivateEndpoint: %w", err) + } + return mgr, nil } diff --git a/test/e2e/privateendpoint_test.go b/test/e2e/privateendpoint_test.go new file mode 100644 index 0000000000..e17f725f4b --- /dev/null +++ b/test/e2e/privateendpoint_test.go @@ -0,0 +1,626 @@ +package e2e_test + +import ( + "fmt" + "time" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "sigs.k8s.io/controller-runtime/pkg/client" + + "github.com/mongodb/mongodb-atlas-kubernetes/v2/internal/pointer" + "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/api" + akov2 "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/api/v1" + "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/api/v1/common" + "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/api/v1/provider" + "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/api/v1/status" + "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/controller/customresource" + "github.com/mongodb/mongodb-atlas-kubernetes/v2/test/helper/conditions" + "github.com/mongodb/mongodb-atlas-kubernetes/v2/test/helper/e2e/actions" + "github.com/mongodb/mongodb-atlas-kubernetes/v2/test/helper/e2e/actions/cloud" + "github.com/mongodb/mongodb-atlas-kubernetes/v2/test/helper/e2e/data" + "github.com/mongodb/mongodb-atlas-kubernetes/v2/test/helper/e2e/model" + "github.com/mongodb/mongodb-atlas-kubernetes/v2/test/helper/resources" +) + +var _ = Describe("Private Endpoints", Label("private-endpoint"), func() { + var testData *model.TestDataProvider + + _ = BeforeEach(OncePerOrdered, func() { + checkUpAWSEnvironment() + checkUpAzureEnvironment() + checkNSetUpGCPEnvironment() + }) + + _ = AfterEach(func() { + GinkgoWriter.Println() + GinkgoWriter.Println("===============================================") + GinkgoWriter.Println("Operator namespace: " + testData.Resources.Namespace) + GinkgoWriter.Println("===============================================") + if CurrentSpecReport().Failed() { + Expect(actions.SaveProjectsToFile(testData.Context, testData.K8SClient, testData.Resources.Namespace)).Should(Succeed()) + } + By("Delete Project and cluster resources", func() { + actions.DeleteTestDataProject(testData) + actions.AfterEachFinalCleanup([]model.TestDataProvider{*testData}) + }) + }) + + DescribeTable( + "Configure private endpoint for all supported cloud provider", + func(test *model.TestDataProvider, pe *akov2.AtlasPrivateEndpoint) { + var privateEndpointDetails *cloud.PrivateEndpointDetails + + testData = test + actions.ProjectCreationFlow(test) + + By("Preparing private endpoint resource", func() { + pe.Namespace = test.Resources.Namespace + pe.Spec.Project = &common.ResourceRefNamespaced{ + Name: test.Project.Name, + Namespace: test.Project.Namespace, + } + region, err := cloud.GetAtlasRegionByProvider(pe.Spec.Provider) + Expect(err).ToNot(HaveOccurred()) + pe.Spec.Region = region + }) + + By("Creating private endpoint", func() { + Expect(test.K8SClient.Create(test.Context, pe)).To(Succeed()) + + Eventually(func(g Gomega) { + g.Expect(testData.K8SClient.Get(test.Context, client.ObjectKeyFromObject(pe), pe)).To(Succeed()) + g.Expect(pe.Status.ServiceStatus).To(Equal("AVAILABLE")) + g.Expect(resources.CheckCondition(testData.K8SClient, pe, api.TrueCondition(api.PrivateEndpointServiceReady))).Should(BeTrue()) + }).WithTimeout(15 * time.Minute).WithPolling(10 * time.Second).Should(Succeed()) + }) + + By("Configuring external network", func() { + Expect(testData.K8SClient.Get(test.Context, client.ObjectKeyFromObject(pe), pe)).To(Succeed()) + action, err := prepareProviderAction() + Expect(err).ToNot(HaveOccurred()) + + cloudRegion := cloud.MapCloudProviderRegion(pe.Spec.Provider, pe.Spec.Region) + Expect(cloudRegion).ToNot(BeEmpty()) + + switch pe.Spec.Provider { + case "AWS": + awsConfig, err := cloud.GenerateCloudConfig[cloud.AWSConfig](pe.Spec.Provider, cloudRegion, testData.Resources.KeyName) + Expect(err).ToNot(HaveOccurred()) + + Expect(action.SetupNetwork(provider.ProviderName(pe.Spec.Provider), cloud.WithAWSConfig(awsConfig))).ToNot(BeEmpty()) + privateEndpointDetails = action.SetupPrivateEndpoint(&cloud.AWSPrivateEndpointRequest{ + ID: fmt.Sprintf("aws-e2e-pe-%s", testData.Resources.TestID), + Region: awsConfig.Region, + ServiceName: pe.Status.ServiceName, + }) + case "AZURE": + azureConfig, err := cloud.GenerateCloudConfig[cloud.AzureConfig](pe.Spec.Provider, cloudRegion, testData.Resources.KeyName) + Expect(err).ToNot(HaveOccurred()) + + Expect(action.SetupNetwork(provider.ProviderName(pe.Spec.Provider), cloud.WithAzureConfig(azureConfig))).ToNot(BeEmpty()) + privateEndpointDetails = action.SetupPrivateEndpoint(&cloud.AzurePrivateEndpointRequest{ + ID: fmt.Sprintf("azure-e2e-pe-%s", testData.Resources.TestID), + Region: azureConfig.Region, + ServiceResourceID: pe.Status.ResourceID, + SubnetName: randomKeyFromMap(azureConfig.Subnets), + }) + case "GCP": + gcpConfig, err := cloud.GenerateCloudConfig[cloud.GCPConfig](pe.Spec.Provider, cloudRegion, testData.Resources.KeyName) + Expect(err).ToNot(HaveOccurred()) + + Expect(action.SetupNetwork(provider.ProviderName(pe.Spec.Provider), cloud.WithGCPConfig(gcpConfig))).ToNot(BeEmpty()) + privateEndpointDetails = action.SetupPrivateEndpoint(&cloud.GCPPrivateEndpointRequest{ + ID: fmt.Sprintf("gcp-e2e-pe-%s", testData.Resources.TestID), + Region: gcpConfig.Region, + Targets: pe.Status.ServiceAttachmentNames, + SubnetName: randomKeyFromMap(gcpConfig.Subnets), + }) + } + }) + + By("Configuring private endpoint with external network details", func() { + Expect(testData.K8SClient.Get(test.Context, client.ObjectKeyFromObject(pe), pe)).To(Succeed()) + + switch pe.Spec.Provider { + case "AWS": + pe.Spec.AWSConfiguration = []akov2.AWSPrivateEndpointConfiguration{ + { + ID: privateEndpointDetails.ID, + }, + } + case "AZURE": + pe.Spec.AzureConfiguration = []akov2.AzurePrivateEndpointConfiguration{ + { + ID: privateEndpointDetails.ID, + IP: privateEndpointDetails.IP, + }, + } + case "GCP": + gcpEndpoints := make([]akov2.GCPPrivateEndpoint, 0, len(privateEndpointDetails.Endpoints)) + for _, ep := range privateEndpointDetails.Endpoints { + gcpEndpoints = append( + gcpEndpoints, + akov2.GCPPrivateEndpoint{ + Name: ep.Name, + IP: ep.IP, + }, + ) + } + pe.Spec.GCPConfiguration = []akov2.GCPPrivateEndpointConfiguration{ + { + ProjectID: privateEndpointDetails.GCPProjectID, + GroupName: privateEndpointDetails.EndpointGroupName, + Endpoints: gcpEndpoints, + }, + } + } + + Expect(test.K8SClient.Update(test.Context, pe)).To(Succeed()) + Eventually(func(g Gomega) { //nolint:dupl + g.Expect(testData.K8SClient.Get(test.Context, client.ObjectKeyFromObject(pe), pe)).To(Succeed()) + g.Expect(pe.Status.ServiceStatus).To(Equal("AVAILABLE")) + g.Expect(resources.CheckCondition(testData.K8SClient, pe, api.TrueCondition(api.PrivateEndpointServiceReady))).Should(BeTrue()) + for _, eStatus := range pe.Status.Endpoints { + g.Expect(eStatus.Status).To(Equal("AVAILABLE")) + } + g.Expect(resources.CheckCondition(testData.K8SClient, pe, api.TrueCondition(api.PrivateEndpointReady))).Should(BeTrue()) + g.Expect(resources.CheckCondition(testData.K8SClient, pe, api.TrueCondition(api.ReadyType))).To(BeTrue()) + }).WithTimeout(15 * time.Minute).WithPolling(10 * time.Second).Should(Succeed()) + }) + + By("Removing private endpoint", func() { + Expect(test.K8SClient.Delete(test.Context, pe)).To(Succeed()) + + Eventually(func(g Gomega) { + g.Expect(testData.K8SClient.Get(test.Context, client.ObjectKeyFromObject(pe), pe)).ShouldNot(Succeed()) + }).WithTimeout(15 * time.Minute).WithPolling(20 * time.Second).Should(Succeed()) + }) + }, + Entry( + "Configure AWS private endpoint", + Label("aws-private-endpoint"), + model.DataProvider( + "aws-pe-1", + model.NewEmptyAtlasKeyType().UseDefaultFullAccess(), + 40000, + []func(*model.TestDataProvider){}, + ).WithProject(data.DefaultProject()), + &akov2.AtlasPrivateEndpoint{ + ObjectMeta: metav1.ObjectMeta{ + Name: "aws-pe-1", + }, + Spec: akov2.AtlasPrivateEndpointSpec{ + Provider: "AWS", + }, + }, + ), + Entry( + "Configure Azure private endpoint", + Label("azure-private-endpoint"), + model.DataProvider( + "azure-pe-1", + model.NewEmptyAtlasKeyType().UseDefaultFullAccess(), + 40000, + []func(*model.TestDataProvider){}, + ).WithProject(data.DefaultProject()), + &akov2.AtlasPrivateEndpoint{ + ObjectMeta: metav1.ObjectMeta{ + Name: "azure-pe-1", + }, + Spec: akov2.AtlasPrivateEndpointSpec{ + Provider: "AZURE", + }, + }, + ), + Entry( + "Configure GCP private endpoint", + Label("gcp-private-endpoint"), + model.DataProvider( + "gcp-pe-1", + model.NewEmptyAtlasKeyType().UseDefaultFullAccess(), + 40000, + []func(*model.TestDataProvider){}, + ).WithProject(data.DefaultProject()), + &akov2.AtlasPrivateEndpoint{ + ObjectMeta: metav1.ObjectMeta{ + Name: "gcp-pe-1", + }, + Spec: akov2.AtlasPrivateEndpointSpec{ + Provider: "GCP", + }, + }, + ), + ) +}) + +var _ = Describe("Migrate private endpoints from sub-resources to separate custom resources", Label("private-endpoint"), func() { + var testData *model.TestDataProvider + var awsPE *akov2.AtlasPrivateEndpoint + var azurePE *akov2.AtlasPrivateEndpoint + var gcpPE *akov2.AtlasPrivateEndpoint + var awsRegion string + var azureRegion string + var gcpRegion string + privateEndpointDetails := map[string]*cloud.PrivateEndpointDetails{} + + _ = BeforeEach(func() { + checkUpAWSEnvironment() + checkUpAzureEnvironment() + checkNSetUpGCPEnvironment() + }) + + _ = AfterEach(func() { + GinkgoWriter.Println() + GinkgoWriter.Println("===============================================") + GinkgoWriter.Println("Operator namespace: " + testData.Resources.Namespace) + GinkgoWriter.Println("===============================================") + if CurrentSpecReport().Failed() { + Expect(actions.SaveProjectsToFile(testData.Context, testData.K8SClient, testData.Resources.Namespace)).Should(Succeed()) + } + By("Delete Project and cluster resources", func() { + actions.DeleteTestDataProject(testData) + actions.AfterEachFinalCleanup([]model.TestDataProvider{*testData}) + }) + }) + + It("Should migrate a private endpoint configured in a project as sub-resource to a separate custom resource", func() { + By("Setting up project", func() { + testData = model.DataProvider( + "migrate-private-endpoint", + model.NewEmptyAtlasKeyType().UseDefaultFullAccess(), + 40000, + []func(*model.TestDataProvider){}, + ).WithProject(data.DefaultProject()) + + actions.ProjectCreationFlow(testData) + }) + + By("Configuring a private endpoint as a sub-resource", func() { + By("Setting up the private endpoint service", func() { + Expect(testData.K8SClient.Get(testData.Context, client.ObjectKeyFromObject(testData.Project), testData.Project)).To(Succeed()) + var err error + awsRegion, err = cloud.GetAtlasRegionByProvider("AWS") + Expect(err).ToNot(HaveOccurred()) + azureRegion, err = cloud.GetAtlasRegionByProvider("AZURE") + Expect(err).ToNot(HaveOccurred()) + gcpRegion, err = cloud.GetAtlasRegionByProvider("GCP") + Expect(err).ToNot(HaveOccurred()) + + testData.Project.Spec.PrivateEndpoints = []akov2.PrivateEndpoint{ + { + Provider: "AWS", + Region: awsRegion, + }, + { + Provider: "AZURE", + Region: azureRegion, + }, + { + Provider: "GCP", + Region: gcpRegion, + }, + } + + Expect(testData.K8SClient.Update(testData.Context, testData.Project)).To(Succeed()) + Eventually(func(g Gomega) { + g.Expect(testData.K8SClient.Get(testData.Context, client.ObjectKeyFromObject(testData.Project), testData.Project)).To(Succeed()) + g.Expect(testData.Project.Status.Conditions).To(ContainElement(conditions.MatchCondition(api.TrueCondition(api.PrivateEndpointServiceReadyType)))) + }).WithTimeout(15 * time.Minute).WithPolling(10 * time.Second).Should(Succeed()) + }) + + By("Configuring external network", func() { + Expect(testData.K8SClient.Get(testData.Context, client.ObjectKeyFromObject(testData.Project), testData.Project)).To(Succeed()) + action, err := prepareProviderAction() + Expect(err).To(BeNil()) + + for _, pe := range testData.Project.Spec.PrivateEndpoints { + peStatus := statusForProvider(testData.Project.Status.PrivateEndpoints, pe.Provider) + Expect(peStatus).ToNot(BeNil()) + + cloudRegion := cloud.MapCloudProviderRegion(string(pe.Provider), pe.Region) + Expect(cloudRegion).ToNot(BeEmpty()) + + switch pe.Provider { + case "AWS": + awsConfig, err := cloud.GenerateCloudConfig[cloud.AWSConfig](string(pe.Provider), cloudRegion, testData.Resources.KeyName) + Expect(err).ToNot(HaveOccurred()) + + Expect(action.SetupNetwork(pe.Provider, cloud.WithAWSConfig(awsConfig))).ToNot(BeEmpty()) + privateEndpointDetails[string(pe.Provider)] = action.SetupPrivateEndpoint(&cloud.AWSPrivateEndpointRequest{ + ID: fmt.Sprintf("aws-e2e-pe-%s", testData.Resources.TestID), + Region: awsConfig.Region, + ServiceName: peStatus.ServiceName, + }) + case "AZURE": + azureConfig, err := cloud.GenerateCloudConfig[cloud.AzureConfig](string(pe.Provider), cloudRegion, testData.Resources.KeyName) + Expect(err).ToNot(HaveOccurred()) + + Expect(action.SetupNetwork(pe.Provider, cloud.WithAzureConfig(azureConfig))).ToNot(BeEmpty()) + privateEndpointDetails[string(pe.Provider)] = action.SetupPrivateEndpoint(&cloud.AzurePrivateEndpointRequest{ + ID: fmt.Sprintf("azure-e2e-pe-%s", testData.Resources.TestID), + Region: azureConfig.Region, + ServiceResourceID: peStatus.ServiceResourceID, + SubnetName: randomKeyFromMap(azureConfig.Subnets), + }) + case "GCP": + gcpConfig, err := cloud.GenerateCloudConfig[cloud.GCPConfig](string(pe.Provider), cloudRegion, testData.Resources.KeyName) + Expect(err).ToNot(HaveOccurred()) + + Expect(action.SetupNetwork(pe.Provider, cloud.WithGCPConfig(gcpConfig))).ToNot(BeEmpty()) + privateEndpointDetails[string(pe.Provider)] = action.SetupPrivateEndpoint(&cloud.GCPPrivateEndpointRequest{ + ID: fmt.Sprintf("pe-migration-gcp--%s-%s", pe.EndpointGroupName, testData.Resources.TestID), + Region: gcpConfig.Region, + Targets: peStatus.ServiceAttachmentNames, + SubnetName: randomKeyFromMap(gcpConfig.Subnets), + }) + } + } + }) + + By("Configuring private endpoint with external network details", func() { + Expect(testData.K8SClient.Get(testData.Context, client.ObjectKeyFromObject(testData.Project), testData.Project)).To(Succeed()) + + for i, pe := range testData.Project.Spec.PrivateEndpoints { + switch pe.Provider { + case "AWS": + pe.ID = privateEndpointDetails[string(pe.Provider)].ID + case "AZURE": + pe.ID = privateEndpointDetails[string(pe.Provider)].ID + pe.IP = privateEndpointDetails[string(pe.Provider)].IP + case "GCP": + gcpEndpoints := make([]akov2.GCPEndpoint, 0, len(privateEndpointDetails[string(pe.Provider)].Endpoints)) + for _, ep := range privateEndpointDetails[string(pe.Provider)].Endpoints { + gcpEndpoints = append( + gcpEndpoints, + akov2.GCPEndpoint{ + EndpointName: ep.Name, + IPAddress: ep.IP, + }, + ) + } + pe.GCPProjectID = privateEndpointDetails[string(pe.Provider)].GCPProjectID + pe.EndpointGroupName = privateEndpointDetails[string(pe.Provider)].EndpointGroupName + pe.Endpoints = gcpEndpoints + } + + testData.Project.Spec.PrivateEndpoints[i] = pe + } + + Expect(testData.K8SClient.Update(testData.Context, testData.Project)).To(Succeed()) + Eventually(func(g Gomega) { + expectedConditions := conditions.MatchConditions( + api.TrueCondition(api.PrivateEndpointServiceReadyType), + api.TrueCondition(api.PrivateEndpointReadyType), + api.TrueCondition(api.ReadyType), + ) + + g.Expect(testData.K8SClient.Get(testData.Context, client.ObjectKeyFromObject(testData.Project), testData.Project)).To(Succeed()) + g.Expect(testData.Project.Status.Conditions).To(ContainElements(expectedConditions)) + }).WithTimeout(15 * time.Minute).WithPolling(10 * time.Second).Should(Succeed()) + }) + }) + + By("Stopping reconciling project and its sub-resources", func() { + Expect(testData.K8SClient.Get(testData.Context, client.ObjectKeyFromObject(testData.Project), testData.Project)).To(Succeed()) + testData.Project.Annotations[customresource.ReconciliationPolicyAnnotation] = customresource.ReconciliationPolicySkip + testData.Project.Spec.PrivateEndpoints = nil + + Expect(testData.K8SClient.Update(testData.Context, testData.Project)).To(Succeed()) + Eventually(func(g Gomega) { + g.Expect(testData.K8SClient.Get(testData.Context, client.ObjectKeyFromObject(testData.Project), testData.Project)).To(Succeed()) + g.Expect(testData.Project.Generation).ToNot(Equal(testData.Project.Status.ObservedGeneration)) + g.Expect(customresource.AnnotationLastSkippedConfiguration).To(BeKeyOf(testData.Project.GetAnnotations())) + g.Expect(testData.Project.Status.Conditions).To(ContainElement(conditions.MatchCondition(api.TrueCondition(api.ReadyType)))) + }).WithTimeout(15 * time.Minute).WithPolling(10 * time.Second).Should(Succeed()) + }) + + By("Migrate private endpoint as separate custom resource", func() { + By("Migrating AWS private endpoint", func() { + awsPE = &akov2.AtlasPrivateEndpoint{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pe-aws-" + testData.Resources.TestID, + Namespace: testData.Resources.Namespace, + }, + Spec: akov2.AtlasPrivateEndpointSpec{ + Project: &common.ResourceRefNamespaced{ + Name: testData.Project.Name, + Namespace: testData.Project.Namespace, + }, + Provider: "AWS", + Region: awsRegion, + AWSConfiguration: []akov2.AWSPrivateEndpointConfiguration{ + { + ID: privateEndpointDetails["AWS"].ID, + }, + }, + }, + } + + Expect(testData.K8SClient.Create(testData.Context, awsPE)).To(Succeed()) + Eventually(func(g Gomega) { //nolint:dupl + expectedConditions := conditions.MatchConditions( + api.TrueCondition(api.PrivateEndpointServiceReady), + api.TrueCondition(api.PrivateEndpointReady), + api.TrueCondition(api.ReadyType), + ) + g.Expect(testData.K8SClient.Get(testData.Context, client.ObjectKeyFromObject(awsPE), awsPE)).To(Succeed()) + g.Expect(testData.Project.Status.Conditions).To(ContainElements(expectedConditions)) + g.Expect(awsPE.Status.ServiceStatus).To(Equal("AVAILABLE")) + for _, eStatus := range awsPE.Status.Endpoints { + g.Expect(eStatus.Status).To(Equal("AVAILABLE")) + } + }).WithTimeout(15 * time.Minute).WithPolling(10 * time.Second).Should(Succeed()) + }) + + By("Migrating AZURE private endpoint", func() { + azurePE = &akov2.AtlasPrivateEndpoint{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pe-azure-" + testData.Resources.TestID, + Namespace: testData.Resources.Namespace, + }, + Spec: akov2.AtlasPrivateEndpointSpec{ + Project: &common.ResourceRefNamespaced{ + Name: testData.Project.Name, + Namespace: testData.Project.Namespace, + }, + Provider: "AZURE", + Region: azureRegion, + AzureConfiguration: []akov2.AzurePrivateEndpointConfiguration{ + { + ID: privateEndpointDetails["AZURE"].ID, + IP: privateEndpointDetails["AZURE"].IP, + }, + }, + }, + } + + Expect(testData.K8SClient.Create(testData.Context, azurePE)).To(Succeed()) + Eventually(func(g Gomega) { //nolint:dupl + expectedConditions := conditions.MatchConditions( + api.TrueCondition(api.PrivateEndpointServiceReady), + api.TrueCondition(api.PrivateEndpointReady), + api.TrueCondition(api.ReadyType), + ) + g.Expect(testData.K8SClient.Get(testData.Context, client.ObjectKeyFromObject(azurePE), azurePE)).To(Succeed()) + g.Expect(testData.Project.Status.Conditions).To(ContainElements(expectedConditions)) + g.Expect(azurePE.Status.ServiceStatus).To(Equal("AVAILABLE")) + for _, eStatus := range azurePE.Status.Endpoints { + g.Expect(eStatus.Status).To(Equal("AVAILABLE")) + } + }).WithTimeout(15 * time.Minute).WithPolling(10 * time.Second).Should(Succeed()) + }) + + By("Migrating GCP private endpoint", func() { + endpoints := make([]akov2.GCPPrivateEndpoint, 0, len(privateEndpointDetails["GCP"].Endpoints)) + for _, ep := range privateEndpointDetails["GCP"].Endpoints { + endpoints = append( + endpoints, + akov2.GCPPrivateEndpoint{ + Name: ep.Name, + IP: ep.IP, + }, + ) + } + + gcpPE = &akov2.AtlasPrivateEndpoint{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pe-gcp-" + testData.Resources.TestID, + Namespace: testData.Resources.Namespace, + }, + Spec: akov2.AtlasPrivateEndpointSpec{ + Project: &common.ResourceRefNamespaced{ + Name: testData.Project.Name, + Namespace: testData.Project.Namespace, + }, + Provider: "GCP", + Region: gcpRegion, + GCPConfiguration: []akov2.GCPPrivateEndpointConfiguration{ + { + ProjectID: privateEndpointDetails["GCP"].GCPProjectID, + GroupName: privateEndpointDetails["GCP"].EndpointGroupName, + Endpoints: endpoints, + }, + }, + }, + } + + Expect(testData.K8SClient.Create(testData.Context, gcpPE)).To(Succeed()) + Eventually(func(g Gomega) { //nolint:dupl + expectedConditions := conditions.MatchConditions( + api.TrueCondition(api.PrivateEndpointServiceReady), + api.TrueCondition(api.PrivateEndpointReady), + api.TrueCondition(api.ReadyType), + ) + g.Expect(testData.K8SClient.Get(testData.Context, client.ObjectKeyFromObject(gcpPE), gcpPE)).To(Succeed()) + g.Expect(testData.Project.Status.Conditions).To(ContainElements(expectedConditions)) + g.Expect(gcpPE.Status.ServiceStatus).To(Equal("AVAILABLE")) + for _, eStatus := range gcpPE.Status.Endpoints { + g.Expect(eStatus.Status).To(Equal("AVAILABLE")) + } + }).WithTimeout(15 * time.Minute).WithPolling(10 * time.Second).Should(Succeed()) + }) + }) + + By("Restating project reconciliation", func() { + Expect(testData.K8SClient.Get(testData.Context, client.ObjectKeyFromObject(testData.Project), testData.Project)).To(Succeed()) + delete(testData.Project.Annotations, customresource.ReconciliationPolicyAnnotation) + + Expect(testData.K8SClient.Update(testData.Context, testData.Project)).To(Succeed()) + Eventually(func(g Gomega) { + g.Expect(testData.K8SClient.Get(testData.Context, client.ObjectKeyFromObject(testData.Project), testData.Project)).To(Succeed()) + g.Expect(testData.Project.Status.Conditions).To(ContainElement(conditions.MatchCondition(api.TrueCondition(api.ReadyType)))) + }).WithTimeout(15 * time.Minute).WithPolling(10 * time.Second).Should(Succeed()) + }) + + By("Updating project doesn't affect private endpoint", func() { + Expect(testData.K8SClient.Get(testData.Context, client.ObjectKeyFromObject(testData.Project), testData.Project)).To(Succeed()) + testData.Project.Spec.Settings = &akov2.ProjectSettings{ + IsSchemaAdvisorEnabled: pointer.MakePtr(true), + } + + Expect(testData.K8SClient.Update(testData.Context, testData.Project)).To(Succeed()) + Eventually(func(g Gomega) { + notExpectedConditions := conditions.MatchConditions( + api.TrueCondition(api.PrivateEndpointServiceReady), + api.TrueCondition(api.PrivateEndpointReady), + api.FalseCondition(api.PrivateEndpointServiceReady), + api.FalseCondition(api.PrivateEndpointReady), + ) + + g.Expect(testData.K8SClient.Get(testData.Context, client.ObjectKeyFromObject(testData.Project), testData.Project)).To(Succeed()) + g.Expect(testData.Project.Status.Conditions).ToNot(ContainElements(notExpectedConditions)) + g.Expect(testData.Project.Status.Conditions).To(ContainElement(conditions.MatchCondition(api.TrueCondition(api.ReadyType)))) + }).WithTimeout(15 * time.Minute).WithPolling(10 * time.Second).Should(Succeed()) + }) + + By("Private endpoint are still ready", func() { + Eventually(func(g Gomega) { //nolint:dupl + expectedConditions := conditions.MatchConditions( + api.TrueCondition(api.PrivateEndpointServiceReady), + api.TrueCondition(api.PrivateEndpointReady), + api.TrueCondition(api.ReadyType), + ) + for _, pe := range []*akov2.AtlasPrivateEndpoint{awsPE, azurePE, gcpPE} { + g.Expect(testData.K8SClient.Get(testData.Context, client.ObjectKeyFromObject(pe), pe)).To(Succeed()) + g.Expect(testData.Project.Status.Conditions).To(ContainElements(expectedConditions)) + g.Expect(pe.Status.ServiceStatus).To(Equal("AVAILABLE")) + for _, eStatus := range pe.Status.Endpoints { + g.Expect(eStatus.Status).To(Equal("AVAILABLE")) + } + } + }).WithTimeout(15 * time.Minute).WithPolling(10 * time.Second).Should(Succeed()) + }) + + By("Removing private endpoints", func() { + Expect(testData.K8SClient.Delete(testData.Context, awsPE)).To(Succeed()) + Expect(testData.K8SClient.Delete(testData.Context, azurePE)).To(Succeed()) + Expect(testData.K8SClient.Delete(testData.Context, gcpPE)).To(Succeed()) + + Eventually(func(g Gomega) { + g.Expect(testData.K8SClient.Get(testData.Context, client.ObjectKeyFromObject(awsPE), awsPE)).ShouldNot(Succeed()) + g.Expect(testData.K8SClient.Get(testData.Context, client.ObjectKeyFromObject(azurePE), azurePE)).ShouldNot(Succeed()) + g.Expect(testData.K8SClient.Get(testData.Context, client.ObjectKeyFromObject(gcpPE), gcpPE)).ShouldNot(Succeed()) + }).WithTimeout(15 * time.Minute).WithPolling(20 * time.Second).Should(Succeed()) + }) + }) +}) + +func statusForProvider(peStatus []status.ProjectPrivateEndpoint, providerName provider.ProviderName) *status.ProjectPrivateEndpoint { + for _, s := range peStatus { + if s.Provider == providerName { + return &s + } + } + + return nil +} + +func randomKeyFromMap[K comparable, V any](m map[K]V) K { + for k := range m { + return k + } + + return *new(K) +} diff --git a/test/helper/e2e/actions/cloud/provider.go b/test/helper/e2e/actions/cloud/provider.go index 3d9f651dd2..24c8950ab2 100644 --- a/test/helper/e2e/actions/cloud/provider.go +++ b/test/helper/e2e/actions/cloud/provider.go @@ -2,13 +2,19 @@ package cloud import ( "context" + "crypto/rand" + "errors" + "fmt" + "math/big" + "net" "path" + "strings" "time" - . "github.com/onsi/gomega" - "github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/network/armnetwork/v2" + "github.com/google/uuid" "github.com/onsi/ginkgo/v2/dsl/core" + . "github.com/onsi/gomega" "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/api/v1/provider" ) @@ -46,6 +52,10 @@ type ProviderAction struct { azureProvider *AzureAction } +type CloudConfig interface { + AWSConfig | AzureConfig | GCPConfig +} + type AWSConfig struct { Region string VPC string @@ -246,7 +256,7 @@ func (a *ProviderAction) SetupPrivateEndpoint(request PrivateEndpointRequest) *P Endpoints: endpoints, } case *AzurePrivateEndpointRequest: - pe, err := a.azureProvider.CreatePrivateEndpoint(vpcName, req.SubnetName, req.ID, req.ServiceResourceID, req.Region) + pe, err := a.azureProvider.CreatePrivateEndpoint(a.azureConfig.VPC, req.SubnetName, req.ID, req.ServiceResourceID, req.Region) Expect(err).To(BeNil()) Expect(pe).ShouldNot(BeNil()) Expect(pe.Properties).ShouldNot(BeNil()) @@ -356,3 +366,186 @@ func getAzureConfigDefaults() *AzureConfig { Subnets: map[string]string{Subnet1Name: Subnet1CIDR, Subnet2Name: Subnet2CIDR}, } } + +func GenerateCloudConfig[T CloudConfig](cloudProvider, region, prefixName string) (*T, error) { + vpc, subnets, err := generateVPCWithSubnets() + if err != nil { + return nil, err + } + + uniqueID := strings.ToLower(uuid.New().String()[0:6]) + + switch cloudProvider { + case "AWS": + return any(&AWSConfig{ + Region: region, + VPC: fmt.Sprintf("%s-aws-vpc-%s", prefixName, uniqueID), + CIDR: vpc, + Subnets: map[string]string{ + fmt.Sprintf("%s-aws-sn1-%s", prefixName, uniqueID): subnets[0], + fmt.Sprintf("%s-aws-sn2-%s", prefixName, uniqueID): subnets[1], + }, + EnableCleanup: true, + }).(*T), nil + case "AZURE": + return any(&AzureConfig{ + Region: region, + VPC: fmt.Sprintf("%s-azure-vpc-%s", prefixName, uniqueID), + CIDR: vpc, + Subnets: map[string]string{ + fmt.Sprintf("%s-azure-sn1-%s", prefixName, uniqueID): subnets[0], + fmt.Sprintf("%s-azure-sn2-%s", prefixName, uniqueID): subnets[1], + }, + EnableCleanup: true, + }).(*T), nil + case "GCP": + return any(&GCPConfig{ + Region: region, + VPC: fmt.Sprintf("%s-gcp-vpc-%s", prefixName, uniqueID), + Subnets: map[string]string{ + fmt.Sprintf("%s-gcp-sn1-%s", prefixName, uniqueID): subnets[0], + fmt.Sprintf("%s-gcp-sn2-%s", prefixName, uniqueID): subnets[1], + }, + EnableCleanup: true, + }).(*T), nil + } + + return nil, errors.New("unsupported provider, valid options are: AWS, Azure, GCP") +} + +func generateVPCWithSubnets() (string, []string, error) { + privateRanges := []struct { + base string + subnetMask int + }{ + {"10.0.0.0", 24}, + {"172.16.0.0", 24}, + {"192.168.0.0", 24}, + } + + // Pick a random range + r, err := rand.Int(rand.Reader, big.NewInt(int64(len(privateRanges)))) + if err != nil { + return "", nil, err + } + privateRange := privateRanges[r.Int64()] + + _, network, err := net.ParseCIDR(fmt.Sprintf("%s/%d", privateRange.base, privateRange.subnetMask)) + if err != nil { + return "", nil, err + } + + // Generate 2 subnets + ip := network.IP.To4() + r, err = rand.Int(rand.Reader, big.NewInt(int64(255))) + if err != nil { + return "", nil, err + } + ip[2] = byte(r.Int64()) + + vpcCIDR := fmt.Sprintf("%s/%d", ip.String(), privateRange.subnetMask) + subnet1CIDR := fmt.Sprintf("%s/%d", ip.String(), privateRange.subnetMask+1) + ip[3] = 128 + subnet2CIDR := fmt.Sprintf("%s/%d", ip.String(), privateRange.subnetMask+1) + + return vpcCIDR, []string{subnet1CIDR, subnet2CIDR}, nil +} + +func GetAtlasRegionByProvider(cloudProvider string) (string, error) { + regionMap := map[string][]string{ + "AWS": { + "US_WEST_2", // North America - Oregon + "CA_CENTRAL_1", // North America - Canada Central + "SA_EAST_1", // South America - São Paulo + "EU_NORTH_1", // Europe - Stockholm + "EU_WEST_3", // Europe - Paris + "ME_SOUTH_1", // Middle East - Bahrain + "AP_SOUTH_2", // Asia - Hyderabad + "AP_NORTHEAST_2", // Asia - Seoul + "AP_SOUTHEAST_2", // Oceania - Sydney + }, + "AZURE": { + "US_WEST_3", // North America - West US + "US_EAST_2", // North America - East US + "BRAZIL_SOUTHEAST", // South America - Brazil Southeast + "EUROPE_NORTH", // Europe - North Europe (Ireland) + "NORWAY_EAST", // Europe - Norway East + "FRANCE_SOUTH", // Europe - France South + "UAE_CENTRAL", // Middle East - UAE Central + "KOREA_CENTRAL", // Asia - Korea Central + "INDIA_CENTRAL", // Asia - India Central + "AUSTRALIA_CENTRAL_2", // Oceania - Australia Central + "SOUTH_AFRICA_WEST", // Africa - South Africa West + }, + "GCP": { + "US_WEST_3", // North America - Salt Lake City + "US_EAST_5", // North America - Columbus + "SOUTH_AMERICA_EAST_1", // South America - São Paulo + "EUROPE_WEST_3", // Europe - Frankfurt + "EUROPE_NORTH_1", // Europe - Finland + "EUROPE_WEST_6", // Europe - Zurich + "ASIA_EAST_2", // Asia - Hong Kong + "ASIA_NORTHEAST_2", // Asia - Osaka + "AUSTRALIA_SOUTHEAST_2", // Oceania - Melbourne + }, + } + + // Validate the input provider + regions := regionMap[cloudProvider] + r, err := rand.Int(rand.Reader, big.NewInt(int64(len(regions)))) + if err != nil { + return "", err + } + + return regions[r.Int64()], nil +} + +func MapCloudProviderRegion(cloudProvider, atlasRegion string) string { + regionMap := map[string]map[string]string{ + "AWS": { + "US_WEST_2": "us-west-2", // North America - Oregon + "CA_CENTRAL_1": "ca-central-1", // North America - Canada Central + "SA_EAST_1": "sa-east-1", // South America - São Paulo + "EU_NORTH_1": "eu-north-1", // Europe - Stockholm + "EU_WEST_3": "eu-west-3", // Europe - Paris + "ME_SOUTH_1": "me-south-1", // Middle East - Bahrain + "AP_SOUTH_2": "ap-south-2", // Asia - Hyderabad + "AP_NORTHEAST_2": "ap-northeast-2", // Asia - Seoul + "AP_SOUTHEAST_2": "ap-southeast-2", // Oceania - Sydney + }, + "AZURE": { + "US_WEST_3": "westus3", // North America - West US + "US_EAST_2": "eastus2", // North America - East US + "BRAZIL_SOUTHEAST": "brazilsoutheast", // South America - Brazil Southeast + "EUROPE_NORTH": "northeurope", // Europe - North Europe (Ireland) + "NORWAY_EAST": "norwayeast", // Europe - Norway East + "FRANCE_SOUTH": "francesouth", // Europe - France South + "UAE_CENTRAL": "uaecentral", // Middle East - UAE Central + "KOREA_CENTRAL": "koreacentral", // Asia - Korea Central + "INDIA_CENTRAL": "centralindia", // Asia - India Central + "AUSTRALIA_CENTRAL_2": "australiacentral2", // Oceania - Australia Central + "SOUTH_AFRICA_WEST": "southafricawest", // Africa - South Africa West + }, + "GCP": { + "US_WEST_3": "us-west3", // North America - Salt Lake City + "US_EAST_5": "us-east5", // North America - Columbus + "SOUTH_AMERICA_EAST_1": "southamerica-east1", // South America - São Paulo + "EUROPE_WEST_3": "europe-west3", // Europe - Frankfurt + "EUROPE_NORTH_1": "europe-north1", // Europe - Finland + "EUROPE_WEST_6": "europe-west6", // Europe - Zurich + "ASIA_EAST_2": "asia-east2", // Asia - Hong Kong + "ASIA_NORTHEAST_2": "asia-northeast2", // Asia - Osaka + "AUSTRALIA_SOUTHEAST_2": "australia-southeast2", // Oceania - Melbourne + }, + } + + if _, ok := regionMap[cloudProvider]; !ok { + return "" + } + + if _, ok := regionMap[cloudProvider][atlasRegion]; !ok { + return "" + } + + return regionMap[cloudProvider][atlasRegion] +}