From 7d6d551ce1e2922ee7c0dc81333897f764287a6e Mon Sep 17 00:00:00 2001 From: rkthtrifork Date: Tue, 18 Jun 2024 12:58:47 +0200 Subject: [PATCH] Rewrote ISM Policy reconciler --- .github/workflows/cheetah-release.yaml | 21 ++ .github/workflows/docker-create-snapshot.yaml | 55 ++++ .../opensearch-gateway/requests/IsmPolicy.go | 9 +- .../responses/ISMPolicyResponse.go | 5 - .../opensearch-gateway/responses/IsmPolicy.go | 10 + .../services/os_ism_service.go | 33 +- .../pkg/reconcilers/indextemplate.go | 1 + .../pkg/reconcilers/ismpolicy.go | 285 +++++++++-------- .../pkg/reconcilers/ismpolicy_test.go | 289 ++++++++++-------- .../pkg/reconcilers/reconcilers.go | 16 +- 10 files changed, 419 insertions(+), 305 deletions(-) create mode 100644 .github/workflows/cheetah-release.yaml create mode 100644 .github/workflows/docker-create-snapshot.yaml delete mode 100644 opensearch-operator/opensearch-gateway/responses/ISMPolicyResponse.go create mode 100644 opensearch-operator/opensearch-gateway/responses/IsmPolicy.go diff --git a/.github/workflows/cheetah-release.yaml b/.github/workflows/cheetah-release.yaml new file mode 100644 index 000000000..07ed3b0ae --- /dev/null +++ b/.github/workflows/cheetah-release.yaml @@ -0,0 +1,21 @@ +name: Cheetah Release +on: + workflow_dispatch: + push: + +concurrency: + group: ${{ github.ref }}-${{ github.workflow }} + cancel-in-progress: true + +permissions: + contents: read + packages: write + +jobs: + create-snapshot: + uses: ./.github/workflows/docker-create-snapshot.yaml + with: + context: opensearch-operator + image-name: opensearch-k8s-operator + secrets: + TRIFORK_GITHUB_PAT: ${{ secrets.GITHUB_TOKEN }} \ No newline at end of file diff --git a/.github/workflows/docker-create-snapshot.yaml b/.github/workflows/docker-create-snapshot.yaml new file mode 100644 index 000000000..3bc0191c6 --- /dev/null +++ b/.github/workflows/docker-create-snapshot.yaml @@ -0,0 +1,55 @@ +name: Docker Create Snapshot + +on: + workflow_call: + inputs: + image-name: + description: The name of the image to create a snapshot for + required: true + type: string + context: + description: The directory to run the workflow inside + required: false + type: string + default: . + dockerfile-path: + description: The path to the Dockerfile. Defaults to {context}/Dockerfile + required: false + type: string + secrets: + TRIFORK_GITHUB_PAT: + description: A personal access token with permission to publish a package to the Trifork GitHub container registry + required: true + +jobs: + create-snapshot: + runs-on: ubuntu-latest + steps: + - name: Checkout + uses: actions/checkout@v4 + + - name: Log in to the container registry + uses: docker/login-action@1220aa36aaf257e736f1d64e3b87c4878665836f + with: + registry: ghcr.io + username: ${{ github.actor }} + password: ${{ secrets.TRIFORK_GITHUB_PAT }} + + - name: Get package suffix + id: get-package-suffix + run: echo "branch-name=$(echo '${{ github.ref_name }}' | sed 's/[^a-zA-Z0-9]/-/g')" >> $GITHUB_OUTPUT + + - name: Set up Docker Buildx + uses: docker/setup-buildx-action@f95db51fddba0c2d1ec667646a06c2ce06100226 # v3 + + - name: Build and push Docker image + uses: docker/build-push-action@0565240e2d4ab88bba5387d719585280857ece09 # v5 + with: + context: ${{ inputs.context }} + file: ${{ inputs.dockerfile-path }} + platforms: linux/amd64 + tags: ghcr.io/trifork/${{ inputs.image-name }}:2.6.0-${{ steps.get-package-suffix.outputs.branch-name }}-SNAPSHOT-${{ github.run_number }} + push: true + secrets: | + GITHUB_ACTOR=${{ github.actor }} + GITHUB_TOKEN=${{ secrets.TRIFORK_GITHUB_PAT }} \ No newline at end of file diff --git a/opensearch-operator/opensearch-gateway/requests/IsmPolicy.go b/opensearch-operator/opensearch-gateway/requests/IsmPolicy.go index 2f820a0ac..023fc79ed 100644 --- a/opensearch-operator/opensearch-gateway/requests/IsmPolicy.go +++ b/opensearch-operator/opensearch-gateway/requests/IsmPolicy.go @@ -1,14 +1,11 @@ package requests -type Policy struct { - PolicyID string `json:"_id,omitempty"` - PrimaryTerm *int `json:"_primary_term,omitempty"` - SequenceNumber *int `json:"_seq_no,omitempty"` - Policy ISMPolicy `json:"policy"` +type ISMPolicy struct { + Policy ISMPolicySpec `json:"policy"` } // ISMPolicySpec is the specification for the ISM policy for OS. -type ISMPolicy struct { +type ISMPolicySpec struct { // The default starting state for each index that uses this policy. DefaultState string `json:"default_state"` // A human-readable description of the policy. diff --git a/opensearch-operator/opensearch-gateway/responses/ISMPolicyResponse.go b/opensearch-operator/opensearch-gateway/responses/ISMPolicyResponse.go deleted file mode 100644 index 753314cc8..000000000 --- a/opensearch-operator/opensearch-gateway/responses/ISMPolicyResponse.go +++ /dev/null @@ -1,5 +0,0 @@ -package responses - -import "github.com/Opster/opensearch-k8s-operator/opensearch-operator/opensearch-gateway/requests" - -type GetISMPoliciesResponse requests.Policy diff --git a/opensearch-operator/opensearch-gateway/responses/IsmPolicy.go b/opensearch-operator/opensearch-gateway/responses/IsmPolicy.go new file mode 100644 index 000000000..29174466e --- /dev/null +++ b/opensearch-operator/opensearch-gateway/responses/IsmPolicy.go @@ -0,0 +1,10 @@ +package responses + +import "github.com/Opster/opensearch-k8s-operator/opensearch-operator/opensearch-gateway/requests" + +type GetISMPolicyResponse struct { + PolicyID string `json:"_id"` + PrimaryTerm int `json:"_primary_term"` + SequenceNumber int `json:"_seq_no"` + Policy requests.ISMPolicySpec +} diff --git a/opensearch-operator/opensearch-gateway/services/os_ism_service.go b/opensearch-operator/opensearch-gateway/services/os_ism_service.go index d82c0e90f..040e2e26c 100644 --- a/opensearch-operator/opensearch-gateway/services/os_ism_service.go +++ b/opensearch-operator/opensearch-gateway/services/os_ism_service.go @@ -7,6 +7,7 @@ import ( "fmt" "github.com/Opster/opensearch-k8s-operator/opensearch-operator/opensearch-gateway/requests" + "github.com/Opster/opensearch-k8s-operator/opensearch-operator/opensearch-gateway/responses" "github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp/cmpopts" "github.com/opensearch-project/opensearch-go/opensearchutil" @@ -16,7 +17,7 @@ import ( var ErrNotFound = errors.New("policy not found") // ShouldUpdateISMPolicy checks if the passed policy is same as existing or needs update -func ShouldUpdateISMPolicy(ctx context.Context, newPolicy, existingPolicy requests.Policy) (bool, error) { +func ShouldUpdateISMPolicy(ctx context.Context, newPolicy, existingPolicy requests.ISMPolicy) (bool, error) { if cmp.Equal(newPolicy, existingPolicy, cmpopts.EquateEmpty()) { return false, nil } @@ -27,23 +28,8 @@ func ShouldUpdateISMPolicy(ctx context.Context, newPolicy, existingPolicy reques return true, nil } -// PolicyExists checks if the passed policy already exists or not -func PolicyExists(ctx context.Context, service *OsClusterClient, policyName string) (bool, error) { - resp, err := service.GetISMConfig(ctx, policyName) - if err != nil { - return false, err - } - defer resp.Body.Close() - if resp.StatusCode == 404 { - return false, nil - } else if resp.IsError() { - return false, fmt.Errorf("response from API is %s", resp.Status()) - } - return true, nil -} - // GetPolicy fetches the passed policy -func GetPolicy(ctx context.Context, service *OsClusterClient, policyName string) (*requests.Policy, error) { +func GetPolicy(ctx context.Context, service *OsClusterClient, policyName string) (*responses.GetISMPolicyResponse, error) { resp, err := service.GetISMConfig(ctx, policyName) if err != nil { return nil, err @@ -51,10 +37,11 @@ func GetPolicy(ctx context.Context, service *OsClusterClient, policyName string) defer resp.Body.Close() if resp.StatusCode == 404 { return nil, ErrNotFound - } else if resp.IsError() { + } + if resp.IsError() { return nil, fmt.Errorf("response from API is %s", resp.Status()) } - ismResponse := requests.Policy{} + ismResponse := responses.GetISMPolicyResponse{} if resp != nil && resp.Body != nil { err := json.NewDecoder(resp.Body).Decode(&ismResponse) if err != nil { @@ -66,7 +53,7 @@ func GetPolicy(ctx context.Context, service *OsClusterClient, policyName string) } // CreateISMPolicy creates the passed policy -func CreateISMPolicy(ctx context.Context, service *OsClusterClient, ismpolicy requests.Policy, policyId string) error { +func CreateISMPolicy(ctx context.Context, service *OsClusterClient, ismpolicy requests.ISMPolicy, policyId string) error { spec := opensearchutil.NewJSONReader(ismpolicy) resp, err := service.PutISMConfig(ctx, policyId, spec) if err != nil { @@ -80,15 +67,15 @@ func CreateISMPolicy(ctx context.Context, service *OsClusterClient, ismpolicy re } // UpdateISMPolicy updates the given policy -func UpdateISMPolicy(ctx context.Context, service *OsClusterClient, ismpolicy requests.Policy, seqno, primterm *int, policyName string) error { +func UpdateISMPolicy(ctx context.Context, service *OsClusterClient, ismpolicy requests.ISMPolicy, seqno, primterm *int, policyId string) error { spec := opensearchutil.NewJSONReader(ismpolicy) - resp, err := service.UpdateISMConfig(ctx, policyName, *seqno, *primterm, spec) + resp, err := service.UpdateISMConfig(ctx, policyId, *seqno, *primterm, spec) if err != nil { return err } defer resp.Body.Close() if resp.IsError() { - return fmt.Errorf("failed to create ism policy: %s", resp.String()) + return fmt.Errorf("failed to update ism policy: %s", resp.String()) } return nil } diff --git a/opensearch-operator/pkg/reconcilers/indextemplate.go b/opensearch-operator/pkg/reconcilers/indextemplate.go index 1ae6f62a1..45f4f63e4 100644 --- a/opensearch-operator/pkg/reconcilers/indextemplate.go +++ b/opensearch-operator/pkg/reconcilers/indextemplate.go @@ -173,6 +173,7 @@ func (r *IndexTemplateReconciler) Reconcile() (result ctrl.Result, err error) { }) if err != nil { reason = fmt.Sprintf("failed to update status: %s", err) + // r.logger.Error(retErr, reason) should this be added? r.recorder.Event(r.instance, "Warning", statusError, reason) return } diff --git a/opensearch-operator/pkg/reconcilers/ismpolicy.go b/opensearch-operator/pkg/reconcilers/ismpolicy.go index f2954d457..8b757422c 100644 --- a/opensearch-operator/pkg/reconcilers/ismpolicy.go +++ b/opensearch-operator/pkg/reconcilers/ismpolicy.go @@ -13,6 +13,8 @@ import ( "github.com/Opster/opensearch-k8s-operator/opensearch-operator/pkg/reconcilers/util" "github.com/cisco-open/operator-tools/pkg/reconciler" "github.com/go-logr/logr" + "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/tools/record" "k8s.io/utils/pointer" @@ -22,7 +24,10 @@ import ( ) const ( - ismPolicyExists = "ism policy already exists in Opensearch" + opensearchIsmPolicyExists = "ISM Policy already exists in Opensearch" + opensearchIsmPolicyNameMismatch = "OpensearchISMPolicyNameMismatch" + opensearchClusterRequeueAfter = 10 * time.Second + defaultRequeueAfter = 30 * time.Second ) type IsmPolicyReconciler struct { @@ -58,6 +63,7 @@ func NewIsmReconciler( func (r *IsmPolicyReconciler) Reconcile() (retResult ctrl.Result, retErr error) { var reason string var policyId string + defer func() { if !pointer.BoolDeref(r.updateStatus, true) { return @@ -71,24 +77,23 @@ func (r *IsmPolicyReconciler) Reconcile() (retResult ctrl.Result, retErr error) instance.Status.State = opsterv1.OpensearchISMPolicyError } // Requeue after is 10 seconds if waiting for OpenSearch cluster - if retResult.Requeue && retResult.RequeueAfter == 10*time.Second { + if retResult.Requeue && retResult.RequeueAfter == opensearchClusterRequeueAfter { instance.Status.State = opsterv1.OpensearchISMPolicyPending } - // Requeue is after 30 seconds for normal reconciliation after creation/update - if retErr == nil && retResult.RequeueAfter == 30*time.Second { + if retErr == nil && retResult.Requeue { instance.Status.State = opsterv1.OpensearchISMPolicyCreated instance.Status.PolicyId = policyId } - if reason == ismPolicyExists { + if reason == opensearchIsmPolicyExists { instance.Status.State = opsterv1.OpensearchISMPolicyIgnored } }) + if err != nil { r.logger.Error(err, "failed to update status") } }() - var err error r.cluster, retErr = util.FetchOpensearchCluster(r.client, r.ctx, types.NamespacedName{ Name: r.instance.Spec.OpensearchRef.Name, Namespace: r.instance.Namespace, @@ -97,167 +102,184 @@ func (r *IsmPolicyReconciler) Reconcile() (retResult ctrl.Result, retErr error) reason = "error fetching opensearch cluster" r.logger.Error(retErr, "failed to fetch opensearch cluster") r.recorder.Event(r.instance, "Warning", opensearchError, reason) - return + return ctrl.Result{ + Requeue: true, + RequeueAfter: opensearchClusterRequeueAfter, + }, retErr } if r.cluster == nil { r.logger.Info("opensearch cluster does not exist, requeueing") reason = "waiting for opensearch cluster to exist" r.recorder.Event(r.instance, "Normal", opensearchPending, reason) - retResult = ctrl.Result{ + return ctrl.Result{ Requeue: true, - RequeueAfter: 10 * time.Second, - } - return + RequeueAfter: opensearchClusterRequeueAfter, + }, nil } + // Check cluster ref has not changed - if r.instance.Status.ManagedCluster != nil { - if *r.instance.Status.ManagedCluster != r.cluster.UID { - reason = "cannot change the cluster a role refers to" - retErr = fmt.Errorf("%s", reason) - r.recorder.Event(r.instance, "Warning", opensearchRefMismatch, reason) - return - } - } else { - if pointer.BoolDeref(r.updateStatus, true) { - retErr = r.client.UdateObjectStatus(r.instance, func(object client.Object) { - instance := object.(*opsterv1.OpenSearchISMPolicy) - instance.Status.ManagedCluster = &r.cluster.UID - }) - if retErr != nil { - reason = fmt.Sprintf("failed to update status: %s", retErr) - r.recorder.Event(r.instance, "Warning", statusError, reason) - return - } + managedCluster := r.instance.Status.ManagedCluster + if managedCluster != nil && *managedCluster != r.cluster.UID { + reason = "cannot change the cluster a resource refers to" + retErr = fmt.Errorf("%s", reason) + r.recorder.Event(r.instance, "Warning", opensearchRefMismatch, reason) + return ctrl.Result{ + Requeue: false, + }, retErr + } + + if pointer.BoolDeref(r.updateStatus, true) { + retErr = r.client.UdateObjectStatus(r.instance, func(object client.Object) { + object.(*opsterv1.OpenSearchISMPolicy).Status.ManagedCluster = &r.cluster.UID + }) + if retErr != nil { + reason = fmt.Sprintf("failed to update status: %s", retErr) + r.recorder.Event(r.instance, "Warning", statusError, reason) + return ctrl.Result{ + Requeue: true, + RequeueAfter: opensearchClusterRequeueAfter, + }, retErr } } + // Check cluster is ready if r.cluster.Status.Phase != opsterv1.PhaseRunning { r.logger.Info("opensearch cluster is not running, requeueing") reason = "waiting for opensearch cluster status to be running" r.recorder.Event(r.instance, "Normal", opensearchPending, reason) - retResult = ctrl.Result{ + return ctrl.Result{ Requeue: true, - RequeueAfter: 10 * time.Second, - } - return + RequeueAfter: opensearchClusterRequeueAfter, + }, nil } - r.osClient, err = util.CreateClientForCluster(r.client, r.ctx, r.cluster, r.osClientTransport) - if err != nil { - reason := "error creating opensearch client" + r.osClient, retErr = util.CreateClientForCluster(r.client, r.ctx, r.cluster, r.osClientTransport) + if retErr != nil { + reason = "error creating opensearch client" r.recorder.Event(r.instance, "Warning", opensearchError, reason) - retResult = ctrl.Result{ + return ctrl.Result{ Requeue: true, - RequeueAfter: 30 * time.Second, - } - retErr = err - return - } - - // If PolicyID not provided explicitly, use metadata.name by default - policyId = r.instance.Spec.PolicyID - if r.instance.Spec.PolicyID == "" { - policyId = r.instance.Name - } - // Check ism policy state to make sure we don't touch preexisting ism policy - if r.instance.Status.ExistingISMPolicy == nil { - var exists bool - exists, retErr = services.PolicyExists(r.ctx, r.osClient, policyId) - if retErr != nil { - reason = "failed to get policy status from Opensearch API" - r.logger.Error(retErr, reason) - r.recorder.Event(r.instance, "Warning", opensearchAPIError, reason) - return - } - if pointer.BoolDeref(r.updateStatus, true) { - retErr = r.client.UdateObjectStatus(r.instance, func(object client.Object) { - instance := object.(*opsterv1.OpenSearchISMPolicy) - instance.Status.ExistingISMPolicy = &exists - }) - if retErr != nil { - reason = fmt.Sprintf("failed to update status: %s", retErr) - r.recorder.Event(r.instance, "Warning", statusError, reason) - return - } - } else { - // Emit an event for unit testing assertion - r.recorder.Event(r.instance, "Normal", "UnitTest", fmt.Sprintf("exists is %t", exists)) - return - } + RequeueAfter: opensearchClusterRequeueAfter, + }, retErr } - // If ism policy is existing do nothing - if *r.instance.Status.ExistingISMPolicy { - reason = ismPolicyExists - return + // If PolicyID is not provided explicitly, use metadata.name by default + policyId = r.instance.Name + if r.instance.Spec.PolicyID != "" { + policyId = r.instance.Spec.PolicyID } - ismpolicy, retErr := r.CreateISMPolicyRequest() + newPolicy, retErr := r.CreateISMPolicy() if retErr != nil { - reason = "failed to get create the ism policy request" r.logger.Error(retErr, reason) - r.recorder.Event(r.instance, "Warning", opensearchAPIError, reason) - return + return ctrl.Result{ + Requeue: true, + RequeueAfter: defaultRequeueAfter, + }, retErr } existingPolicy, retErr := services.GetPolicy(r.ctx, r.osClient, policyId) - if retErr != nil && retErr != services.ErrNotFound { - reason = "failed to get policy from Opensearch API" - r.logger.Error(retErr, reason) - r.recorder.Event(r.instance, "Warning", opensearchAPIError, reason) - return - } + // If not exists, create if errors.Is(retErr, services.ErrNotFound) { - r.logger.V(1).Info(fmt.Sprintf("policy %s not found, creating.", r.instance.Spec.PolicyID)) - retErr = services.CreateISMPolicy(r.ctx, r.osClient, *ismpolicy, policyId) + request := requests.ISMPolicy{ + Policy: *newPolicy, + } + retErr = services.CreateISMPolicy(r.ctx, r.osClient, request, policyId) if retErr != nil { reason = "failed to create ism policy" r.logger.Error(retErr, reason) r.recorder.Event(r.instance, "Warning", opensearchAPIError, reason) - return + return ctrl.Result{ + Requeue: true, + RequeueAfter: defaultRequeueAfter, + }, retErr } - r.recorder.Event(r.instance, "Normal", opensearchAPIUpdated, "policy created in opensearch") - return ctrl.Result{Requeue: true, RequeueAfter: 30 * time.Second}, retErr + // Mark the ISM Policy as not pre-existing (created by the operator) + retErr = r.client.UdateObjectStatus(r.instance, func(object client.Object) { + object.(*opsterv1.OpenSearchISMPolicy).Status.ExistingISMPolicy = pointer.Bool(false) + }) + if retErr != nil { + reason = "failed to update custom resource object" + r.logger.Error(retErr, reason) + return ctrl.Result{ + Requeue: true, + RequeueAfter: defaultRequeueAfter, + }, retErr + } + + r.recorder.Event(r.instance, "Normal", opensearchAPIUpdated, "policy successfully created in OpenSearch Cluster") + return ctrl.Result{ + Requeue: true, + RequeueAfter: defaultRequeueAfter, + }, nil } - priterm := existingPolicy.PrimaryTerm - seqno := existingPolicy.SequenceNumber - // Reset - existingPolicy.PrimaryTerm = nil - existingPolicy.SequenceNumber = nil - shouldUpdate, retErr := services.ShouldUpdateISMPolicy(r.ctx, *ismpolicy, *existingPolicy) + + // If other error, report if retErr != nil { - reason = "failed to compare the policies" + reason = "failed to get the ism policy from Opensearch API" r.logger.Error(retErr, reason) r.recorder.Event(r.instance, "Warning", opensearchAPIError, reason) - return + return ctrl.Result{ + Requeue: true, + RequeueAfter: defaultRequeueAfter, + }, retErr } - if !shouldUpdate { - r.logger.V(1).Info(fmt.Sprintf("policy %s is in sync", r.instance.Spec.PolicyID)) - return ctrl.Result{Requeue: true, RequeueAfter: 30 * time.Second}, retErr + // If the ISM policy exists in OpenSearch cluster and was not created by the operator, update the status and return + if r.instance.Status.ExistingISMPolicy == nil || *r.instance.Status.ExistingISMPolicy { + retErr = r.client.UdateObjectStatus(r.instance, func(object client.Object) { + object.(*opsterv1.OpenSearchISMPolicy).Status.ExistingISMPolicy = pointer.Bool(true) + }) + if retErr != nil { + reason = "failed to update custom resource object" + r.logger.Error(retErr, reason) + return ctrl.Result{ + Requeue: true, + RequeueAfter: defaultRequeueAfter, + }, retErr + } + reason = "the ISM policy already exists in the OpenSearch cluster" + r.logger.Error(errors.New(opensearchIsmPolicyExists), reason) + r.recorder.Event(r.instance, "Warning", opensearchIsmPolicyExists, reason) + return ctrl.Result{ + Requeue: true, + RequeueAfter: defaultRequeueAfter, + }, nil } - // the policyId is immutable, so check the old name (r.instance.Status.PolicyId) against the new - if r.instance.Status.PolicyId != "" && policyId != r.instance.Status.PolicyId { - reason = "can't change PolicyID" - r.recorder.Event(r.instance, "Warning", opensearchError, reason) - return + // Return if there are no changes + if r.instance.Spec.PolicyID == existingPolicy.PolicyID && cmp.Equal(*newPolicy, existingPolicy.Policy, cmpopts.EquateEmpty()) { + r.logger.V(1).Info(fmt.Sprintf("user %s is in sync", r.instance.Name)) + r.recorder.Event(r.instance, "Normal", opensearchAPIUnchanged, "policy is in sync") + return ctrl.Result{ + Requeue: true, + RequeueAfter: defaultRequeueAfter, + }, nil + } + request := requests.ISMPolicy{ + Policy: *newPolicy, } - retErr = services.UpdateISMPolicy(r.ctx, r.osClient, *ismpolicy, seqno, priterm, policyId) + retErr = services.UpdateISMPolicy(r.ctx, r.osClient, request, &existingPolicy.SequenceNumber, &existingPolicy.PrimaryTerm, existingPolicy.PolicyID) if retErr != nil { reason = "failed to update ism policy with Opensearch API" r.logger.Error(retErr, reason) r.recorder.Event(r.instance, "Warning", opensearchAPIError, reason) + return ctrl.Result{ + Requeue: true, + RequeueAfter: defaultRequeueAfter, + }, retErr } r.recorder.Event(r.instance, "Normal", opensearchAPIUpdated, "policy updated in opensearch") - - return ctrl.Result{Requeue: true, RequeueAfter: 30 * time.Second}, retErr + return ctrl.Result{ + Requeue: true, + RequeueAfter: defaultRequeueAfter, + }, nil } -func (r *IsmPolicyReconciler) CreateISMPolicyRequest() (*requests.Policy, error) { - policy := requests.ISMPolicy{ +func (r *IsmPolicyReconciler) CreateISMPolicy() (*requests.ISMPolicySpec, error) { + policy := requests.ISMPolicySpec{ DefaultState: r.instance.Spec.DefaultState, Description: r.instance.Spec.Description, } @@ -378,35 +400,35 @@ func (r *IsmPolicyReconciler) CreateISMPolicyRequest() (*requests.Policy, error) shrink.ForceUnsafe = action.Shrink.ForceUnsafe } if action.Shrink.MaxShardSize == nil && action.Shrink.NumNewShards == nil && action.Shrink.PercentageOfSourceShards == nil { - reason := "Either of MaxShardSize or NumNewShards or PercentageOfSourceShards is required" - r.recorder.Event(r.instance, "Error", opensearchError, reason) - return nil, nil + reason := "either of MaxShardSize or NumNewShards or PercentageOfSourceShards is required" + r.recorder.Event(r.instance, "Error", opensearchCustomResourceError, reason) + return nil, errors.New(reason) } if action.Shrink.MaxShardSize != nil { if action.Shrink.NumNewShards == nil && action.Shrink.PercentageOfSourceShards == nil { shrink.MaxShardSize = action.Shrink.MaxShardSize } else { - reason := "MaxShardSize can't exist with NumNewShards or PercentageOfSourceShards. Keep one of these." - r.recorder.Event(r.instance, "Error", opensearchError, reason) - return nil, nil + reason := "maxShardSize can't exist with NumNewShards or PercentageOfSourceShards. Keep one of these" + r.recorder.Event(r.instance, "Error", opensearchCustomResourceError, reason) + return nil, errors.New(reason) } if action.Shrink.NumNewShards != nil { if action.Shrink.MaxShardSize == nil && action.Shrink.PercentageOfSourceShards == nil { shrink.NumNewShards = action.Shrink.NumNewShards } else { - reason := "NumNewShards can't exist with MaxShardSize or PercentageOfSourceShards. Keep one of these." - r.recorder.Event(r.instance, "Error", opensearchError, reason) - return nil, nil + reason := "numNewShards can't exist with MaxShardSize or PercentageOfSourceShards. Keep one of these" + r.recorder.Event(r.instance, "Error", opensearchCustomResourceError, reason) + return nil, errors.New(reason) } } if action.Shrink.PercentageOfSourceShards != nil { if action.Shrink.NumNewShards == nil && action.Shrink.MaxShardSize == nil { shrink.PercentageOfSourceShards = action.Shrink.PercentageOfSourceShards } else { - reason := "PercentageOfSourceShards can't exist with MaxShardSize or NumNewShards. Keep one of these." - r.recorder.Event(r.instance, "Error", opensearchError, reason) - return nil, nil + reason := "percentageOfSourceShards can't exist with MaxShardSize or NumNewShards. Keep one of these" + r.recorder.Event(r.instance, "Error", opensearchCustomResourceError, reason) + return nil, errors.New(reason) } } if action.Shrink.TargetIndexNameTemplate != nil { @@ -515,10 +537,8 @@ func (r *IsmPolicyReconciler) CreateISMPolicyRequest() (*requests.Policy, error) policy.States = append(policy.States, requests.State{Actions: actions, Name: state.Name, Transitions: transitions}) } } - ismPolicy := requests.Policy{ - Policy: policy, - } - return &ismPolicy, nil + + return &policy, nil } // Delete ISM policy from the OS cluster @@ -527,10 +547,12 @@ func (r *IsmPolicyReconciler) Delete() error { if r.instance.Status.ExistingISMPolicy == nil { return nil } + if *r.instance.Status.ExistingISMPolicy { r.logger.Info("policy was pre-existing; not deleting") return nil } + var err error r.cluster, err = util.FetchOpensearchCluster(r.client, r.ctx, types.NamespacedName{ Name: r.instance.Spec.OpensearchRef.Name, @@ -544,15 +566,18 @@ func (r *IsmPolicyReconciler) Delete() error { // If the opensearch cluster doesn't exist, we don't need to delete anything return nil } + r.osClient, err = util.CreateClientForCluster(r.client, r.ctx, r.cluster, r.osClientTransport) if err != nil { return err } + // If PolicyID not provided explicitly, use metadata.name by default policyId := r.instance.Spec.PolicyID - if r.instance.Spec.PolicyID == "" { + if policyId == "" { policyId = r.instance.Name } + err = services.DeleteISMPolicy(r.ctx, r.osClient, policyId) if err != nil { return err diff --git a/opensearch-operator/pkg/reconcilers/ismpolicy_test.go b/opensearch-operator/pkg/reconcilers/ismpolicy_test.go index 22351613d..df3e700f6 100644 --- a/opensearch-operator/pkg/reconcilers/ismpolicy_test.go +++ b/opensearch-operator/pkg/reconcilers/ismpolicy_test.go @@ -21,7 +21,6 @@ import ( "sigs.k8s.io/controller-runtime/pkg/log" ) -var seqno *int = new(int) var _ = Describe("ism policy reconciler", func() { var ( transport *httpmock.MockTransport @@ -117,7 +116,7 @@ var _ = Describe("ism policy reconciler", func() { recorder = record.NewFakeRecorder(1) mockClient.EXPECT().GetOpenSearchCluster(mock.Anything, mock.Anything).Return(*cluster, nil) }) - It("should wait for the cluster to be running", func() { + It("should emit a unit test event and requeue", func() { go func() { defer GinkgoRecover() defer close(recorder.Events) @@ -140,6 +139,7 @@ var _ = Describe("ism policy reconciler", func() { cluster.Status.Phase = opsterv1.PhaseRunning cluster.Status.ComponentsStatus = []opsterv1.ComponentStatus{} mockClient.EXPECT().GetOpenSearchCluster(mock.Anything, mock.Anything).Return(*cluster, nil) + recorder = record.NewFakeRecorder(1) transport.RegisterResponder( http.MethodGet, @@ -162,44 +162,73 @@ var _ = Describe("ism policy reconciler", func() { ) }) - When("existing status is true", func() { + When("cluster reference mismatch", func() { BeforeEach(func() { - instance.Status.ExistingISMPolicy = pointer.Bool(true) + managedCluster := types.UID("different-uid") + instance.Status.ManagedCluster = &managedCluster }) - It("should do nothing", func() { - _, err := reconciler.Reconcile() - Expect(err).ToNot(HaveOccurred()) + It("should emit a unit test event and not requeue", func() { + go func() { + defer GinkgoRecover() + defer close(recorder.Events) + result, err := reconciler.Reconcile() + Expect(err).To(HaveOccurred()) + Expect(result.Requeue).To(BeFalse()) + }() + var events []string + for msg := range recorder.Events { + events = append(events, msg) + } + Expect(len(events)).To(Equal(1)) + Expect(events[0]).To(Equal(fmt.Sprintf("Warning %s cannot change the cluster a resource refers to", opensearchRefMismatch))) }) }) - When("existing status is nil", func() { - var localExtraCalls = 4 + When("policy does not exist in opensearch", func() { BeforeEach(func() { - policyRequest := requests.ISMPolicy{ - DefaultState: "abc", - Description: "test", - } + mockClient.EXPECT().UdateObjectStatus(mock.Anything, mock.Anything).Return(nil) - recorder = record.NewFakeRecorder(1) transport.RegisterResponder( http.MethodGet, fmt.Sprintf( - "https://%s.%s.svc.cluster.local:9200/", + "https://%s.%s.svc.cluster.local:9200/_plugins/_ism/policies/%s", cluster.Spec.General.ServiceName, cluster.Namespace, + instance.Name, ), - httpmock.NewStringResponder(200, "OK").Times(4, failMessage), + httpmock.NewStringResponder(404, "Not Found").Once(), ) transport.RegisterResponder( - http.MethodHead, + http.MethodPut, fmt.Sprintf( - "https://%s.%s.svc.cluster.local:9200/", + "https://%s.%s.svc.cluster.local:9200/_plugins/_ism/policies/%s", cluster.Spec.General.ServiceName, cluster.Namespace, + instance.Name, ), - httpmock.NewStringResponder(200, "OK").Times(2, failMessage), + httpmock.NewStringResponder(200, "OK").Once(), ) + }) + It("should create the policy, emit a unit test event, and requeue", func() { + go func() { + defer GinkgoRecover() + defer close(recorder.Events) + result, err := reconciler.Reconcile() + Expect(err).ToNot(HaveOccurred()) + Expect(result.Requeue).To(BeTrue()) + }() + var events []string + for msg := range recorder.Events { + events = append(events, msg) + } + Expect(len(events)).To(Equal(1)) + Expect(events[0]).To(Equal(fmt.Sprintf("Normal %s policy successfully created in OpenSearch Cluster", opensearchAPIUpdated))) + }) + }) + + When("failed to get policy from opensearch api", func() { + BeforeEach(func() { transport.RegisterResponder( http.MethodGet, fmt.Sprintf( @@ -208,118 +237,61 @@ var _ = Describe("ism policy reconciler", func() { cluster.Namespace, instance.Name, ), - httpmock.NewJsonResponderOrPanic(200, responses.GetISMPoliciesResponse{ - Policy: policyRequest, - }).Then( - httpmock.NewStringResponder(404, "does not exist"), - ).Then( - httpmock.NewNotFoundResponder(failMessage), - ), + httpmock.NewErrorResponder(fmt.Errorf("failed to get policy")).Once(), ) }) - - It("should do nothing and emit a unit test event", func() { + It("should emit a unit test event, requeue, and return an error", func() { go func() { defer GinkgoRecover() defer close(recorder.Events) - _, err := reconciler.Reconcile() - Expect(err).ToNot(HaveOccurred()) - _, err = reconciler.Reconcile() - Expect(err).ToNot(HaveOccurred()) - // Confirm all responders have been called - Expect(transport.GetTotalCallCount()).To(Equal(transport.NumResponders() + extraContextCalls + localExtraCalls)) + result, err := reconciler.Reconcile() + Expect(err).To(HaveOccurred()) + Expect(result.Requeue).To(BeTrue()) }() var events []string for msg := range recorder.Events { events = append(events, msg) } - Expect(len(events)).To(Equal(2)) - Expect(events[0]).To(Equal("Normal UnitTest exists is true")) - Expect(events[1]).To(Equal("Normal UnitTest exists is false")) + Expect(len(events)).To(Equal(1)) + Expect(events[0]).To(Equal(fmt.Sprintf("Warning %s failed to get the ism policy from Opensearch API", opensearchAPIError))) }) }) - When("existing status is true", func() { + When("policy exists in opensearch", func() { BeforeEach(func() { - instance.Status.ExistingISMPolicy = pointer.Bool(true) - }) - It("should do nothing", func() { - _, err := reconciler.Reconcile() - Expect(err).ToNot(HaveOccurred()) - }) - }) + instance.Spec.PolicyID = "test-policy-id" - When("existing status is false", func() { - BeforeEach(func() { - instance.Status.ExistingISMPolicy = pointer.Bool(false) + transport.RegisterResponder( + http.MethodGet, + fmt.Sprintf( + "https://%s.%s.svc.cluster.local:9200/_plugins/_ism/policies/%s", + cluster.Spec.General.ServiceName, + cluster.Namespace, + instance.Spec.PolicyID, + ), + httpmock.NewJsonResponderOrPanic(200, responses.GetISMPolicyResponse{ + PolicyID: "test-policy-id", + Policy: requests.ISMPolicySpec{ + DefaultState: "test-state", + Description: "test-policy", + }, + }).Once(), + ) }) - When("policy exists in opensearch and is the same", func() { - BeforeEach(func() { - policyRequest := requests.ISMPolicy{ - DefaultState: "", - Description: "", - } - transport.RegisterResponder( - http.MethodGet, - fmt.Sprintf( - "https://%s.%s.svc.cluster.local:9200/_plugins/_ism/policies/%s", - cluster.Spec.General.ServiceName, - cluster.Namespace, - instance.Name, - ), - httpmock.NewJsonResponderOrPanic(200, responses.GetISMPoliciesResponse{ - Policy: policyRequest, - SequenceNumber: seqno, - PrimaryTerm: seqno, - }).Once(failMessage), - ) - }) - It("should do nothing", func() { - _, err := reconciler.Reconcile() - Expect(err).ToNot(HaveOccurred()) - Expect(transport.GetTotalCallCount()).To(Equal(transport.NumResponders() + extraContextCalls)) - }) - }) - When("policy exists in opensearch and is not the same", func() { + When("existing status is nil", func() { BeforeEach(func() { - recorder = record.NewFakeRecorder(1) - policyRequest := requests.ISMPolicy{ - DefaultState: "policy", - Description: "test-policy", - } - transport.RegisterResponder( - http.MethodGet, - fmt.Sprintf( - "https://%s.%s.svc.cluster.local:9200/_plugins/_ism/policies/%s", - cluster.Spec.General.ServiceName, - cluster.Namespace, - instance.Name, - ), - httpmock.NewJsonResponderOrPanic(200, responses.GetISMPoliciesResponse{ - Policy: policyRequest, - SequenceNumber: seqno, - PrimaryTerm: seqno, - PolicyID: "test-policy", - }).Once(failMessage), - ) - transport.RegisterResponder( - http.MethodPut, - fmt.Sprintf( - "https://%s.%s.svc.cluster.local:9200/_plugins/_ism/policies/%s?if_seq_no=0&if_primary_term=0", - cluster.Spec.General.ServiceName, - cluster.Namespace, - instance.Name, - ), - httpmock.NewStringResponder(200, "OK").Once(failMessage), - ) + mockClient.EXPECT().UdateObjectStatus(mock.Anything, mock.Anything).Return(nil) + instance.Status.ExistingISMPolicy = nil }) - It("should update the policy", func() { + + It("should emit a unit test event and requeue", func() { go func() { defer GinkgoRecover() defer close(recorder.Events) - _, err := reconciler.Reconcile() + result, err := reconciler.Reconcile() Expect(err).ToNot(HaveOccurred()) + Expect(result.Requeue).To(BeTrue()) // Confirm all responders have been called Expect(transport.GetTotalCallCount()).To(Equal(transport.NumResponders() + extraContextCalls)) }() @@ -328,39 +300,23 @@ var _ = Describe("ism policy reconciler", func() { events = append(events, msg) } Expect(len(events)).To(Equal(1)) - Expect(events[0]).To(Equal(fmt.Sprintf("Normal %s policy updated in opensearch", opensearchAPIUpdated))) + Expect(events[0]).To(Equal(fmt.Sprintf("Warning %s the ISM policy already exists in the OpenSearch cluster", opensearchIsmPolicyExists))) }) }) - When("policy doesn't exist in opensearch", func() { + + When("existing status is true", func() { BeforeEach(func() { - recorder = record.NewFakeRecorder(1) - transport.RegisterResponder( - http.MethodGet, - fmt.Sprintf( - "https://%s.%s.svc.cluster.local:9200/_plugins/_ism/policies/%s", - cluster.Spec.General.ServiceName, - cluster.Namespace, - instance.Name, - ), - httpmock.NewStringResponder(404, "OK").Once(failMessage), - ) - transport.RegisterResponder( - http.MethodPut, - fmt.Sprintf( - "https://%s.%s.svc.cluster.local:9200/_plugins/_ism/policies/%s", - cluster.Spec.General.ServiceName, - cluster.Namespace, - instance.Name, - ), - httpmock.NewStringResponder(200, "OK").Once(failMessage), - ) + mockClient.EXPECT().UdateObjectStatus(mock.Anything, mock.Anything).Return(nil) + instance.Status.ExistingISMPolicy = pointer.Bool(true) }) - It("should create the policy", func() { + + It("should emit a unit test event and requeue", func() { go func() { defer GinkgoRecover() defer close(recorder.Events) - _, err := reconciler.Reconcile() + result, err := reconciler.Reconcile() Expect(err).ToNot(HaveOccurred()) + Expect(result.Requeue).To(BeTrue()) // Confirm all responders have been called Expect(transport.GetTotalCallCount()).To(Equal(transport.NumResponders() + extraContextCalls)) }() @@ -369,7 +325,72 @@ var _ = Describe("ism policy reconciler", func() { events = append(events, msg) } Expect(len(events)).To(Equal(1)) - Expect(events[0]).To(Equal(fmt.Sprintf("Normal %s policy created in opensearch", opensearchAPIUpdated))) + Expect(events[0]).To(Equal(fmt.Sprintf("Warning %s the ISM policy already exists in the OpenSearch cluster", opensearchIsmPolicyExists))) + }) + }) + + When("existing status is false", func() { + BeforeEach(func() { + instance.Status.ExistingISMPolicy = pointer.Bool(false) + }) + + When("policy is the same", func() { + BeforeEach(func() { + instance.Spec.DefaultState = "test-state" + instance.Spec.Description = "test-policy" + }) + It("should emit a unit test event and requeue", func() { + go func() { + defer GinkgoRecover() + defer close(recorder.Events) + result, err := reconciler.Reconcile() + Expect(err).ToNot(HaveOccurred()) + Expect(result.Requeue).To(BeTrue()) + // Confirm all responders have been called + Expect(transport.GetTotalCallCount()).To(Equal(transport.NumResponders() + extraContextCalls)) + }() + var events []string + for msg := range recorder.Events { + events = append(events, msg) + } + Expect(len(events)).To(Equal(1)) + Expect(events[0]).To(Equal(fmt.Sprintf("Normal %s policy is in sync", opensearchAPIUnchanged))) + }) + }) + + When("policy is not the same", func() { + BeforeEach(func() { + instance.Spec.DefaultState = "test-state2" + instance.Spec.Description = "test-policy2" + + transport.RegisterResponder( + http.MethodPut, + fmt.Sprintf( + "https://%s.%s.svc.cluster.local:9200/_plugins/_ism/policies/%s", + cluster.Spec.General.ServiceName, + cluster.Namespace, + instance.Spec.PolicyID, + ), + httpmock.NewStringResponder(200, "OK").Once(), + ) + }) + It("should update ism policy, emit a unit test event, and requeue", func() { + go func() { + defer GinkgoRecover() + defer close(recorder.Events) + result, err := reconciler.Reconcile() + Expect(err).ToNot(HaveOccurred()) + Expect(result.Requeue).To(BeTrue()) + // Confirm all responders have been called + Expect(transport.GetTotalCallCount()).To(Equal(transport.NumResponders() + extraContextCalls)) + }() + var events []string + for msg := range recorder.Events { + events = append(events, msg) + } + Expect(len(events)).To(Equal(1)) + Expect(events[0]).To(Equal(fmt.Sprintf("Normal %s policy updated in opensearch", opensearchAPIUpdated))) + }) }) }) }) diff --git a/opensearch-operator/pkg/reconcilers/reconcilers.go b/opensearch-operator/pkg/reconcilers/reconcilers.go index 6531bc6ff..c0e644ee3 100644 --- a/opensearch-operator/pkg/reconcilers/reconcilers.go +++ b/opensearch-operator/pkg/reconcilers/reconcilers.go @@ -14,13 +14,15 @@ import ( ) const ( - opensearchPending = "OpensearchPending" - opensearchError = "OpensearchError" - opensearchAPIError = "OpensearchAPIError" - opensearchRefMismatch = "OpensearchRefMismatch" - opensearchAPIUpdated = "OpensearchAPIUpdated" - passwordError = "PasswordError" - statusError = "StatusUpdateError" + opensearchPending = "OpensearchPending" + opensearchError = "OpensearchError" + opensearchAPIError = "OpensearchAPIError" + opensearchRefMismatch = "OpensearchRefMismatch" + opensearchAPIUpdated = "OpensearchAPIUpdated" + opensearchAPIUnchanged = "OpensearchAPIUnchanged" + opensearchCustomResourceError = "OpensearchCustomResourceError" + passwordError = "PasswordError" + statusError = "StatusUpdateError" ) type ComponentReconciler func() (reconcile.Result, error)