Skip to content

AUTH-543: Add optional operand deletion condition #1902

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions pkg/apiserver/jsonpatch/jsonpatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,22 @@ func New() *PatchSet {
return &PatchSet{}
}

func Merge(patches ...*PatchSet) *PatchSet {
merged := New()

if len(patches) == 0 {
return merged
}

for _, p := range patches {
if p != nil && !p.IsEmpty() {
merged.patches = append(merged.patches, p.patches...)
}
}

return merged
}

func (p *PatchSet) WithRemove(path string, test TestCondition) *PatchSet {
p.WithTest(test.path, test.value)
p.addOperation(patchRemoveOperation, path, nil)
Expand Down
62 changes: 61 additions & 1 deletion pkg/apiserver/jsonpatch/jsonpatch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,67 @@ func TestJSONPatch(t *testing.T) {
t.Fatal(err)
}
if string(patchBytes) != scenario.expectedOutput {
t.Fatalf("expected = %s, got = %s", scenario.expectedOutput, patchBytes)
t.Fatalf("expected = %s, got = %s", scenario.expectedOutput, string(patchBytes))
}
})
}
}

func TestJSONPatchMerge(t *testing.T) {
for _, scenario := range []struct {
name string
patches []*PatchSet
expectedOutput string
}{
{
name: "nil patch slice to merge",
patches: nil,
expectedOutput: `null`,
},
{
name: "empty patch slice to merge",
patches: []*PatchSet{},
expectedOutput: `null`,
},
{
name: "one empty patch to merge",
patches: []*PatchSet{{}},
expectedOutput: `null`,
},
{
name: "merge one patch",
patches: []*PatchSet{
New().WithRemove("/path1", NewTestCondition("/path1", "value1")),
},
expectedOutput: `[{"op":"test","path":"/path1","value":"value1"},{"op":"remove","path":"/path1"}]`,
},
{
name: "merge multiple patches",
patches: []*PatchSet{
New().WithRemove("/path1", NewTestCondition("/path1", "value1")),
New().WithRemove("/path2", NewTestCondition("/path2", "value2")),
},
expectedOutput: `[{"op":"test","path":"/path1","value":"value1"},{"op":"remove","path":"/path1"},{"op":"test","path":"/path2","value":"value2"},{"op":"remove","path":"/path2"}]`,
},
{
name: "merge multiple patches ignoring empty ones",
patches: []*PatchSet{
{},
New().WithRemove("/path1", NewTestCondition("/path1", "value1")),
{},
New().WithRemove("/path2", NewTestCondition("/path2", "value2")),
{},
},
expectedOutput: `[{"op":"test","path":"/path1","value":"value1"},{"op":"remove","path":"/path1"},{"op":"test","path":"/path2","value":"value2"},{"op":"remove","path":"/path2"}]`,
},
} {
t.Run(scenario.name, func(t *testing.T) {
patchBytes, err := Merge(scenario.patches...).Marshal()
if err != nil {
t.Fatal(err)
}
if string(patchBytes) != scenario.expectedOutput {
t.Fatalf("expected = %s, got = %s", scenario.expectedOutput, string(patchBytes))
}
})
}
Expand Down
183 changes: 133 additions & 50 deletions pkg/operator/apiserver/controller/workload/workload.go

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be beneficial to add some unit tests for the new deletion behavior?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The unit tests would end up using mocks for most of the stuff that the deletion does; but on second thought it might be beneficial to test the operator status, so I'll add some 👍

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be good to import and use this change in one of the consumers (e.g. cluster-authentication-operator) and show that the CI is passing there.

Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"context"
"errors"
"fmt"
"strings"

appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
Expand All @@ -14,11 +16,11 @@ import (
corev1listers "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
"strings"

operatorv1 "github.com/openshift/api/operator/v1"
openshiftconfigclientv1 "github.com/openshift/client-go/config/clientset/versioned/typed/config/v1"
applyoperatorv1 "github.com/openshift/client-go/operator/applyconfigurations/operator/v1"
"github.com/openshift/library-go/pkg/apiserver/jsonpatch"
"github.com/openshift/library-go/pkg/apps/deployment"
"github.com/openshift/library-go/pkg/controller/factory"
"github.com/openshift/library-go/pkg/operator/events"
Expand All @@ -33,7 +35,25 @@ const (
// Delegate captures a set of methods that hold a custom logic
type Delegate interface {
// Sync a method that will be used for delegation. It should bring the desired workload into operation.
Sync(ctx context.Context, controllerContext factory.SyncContext) (*appsv1.Deployment, bool, []error)
//
// It returns a reference to the workload, a bool indicating whether the operator config is at the highest
// generation, a bool indicating whether the workload references must be removed from the operator status
// (e.g. in case the workload has been removed), two strings indicating the name & namespace
// of the workload that must be cleaned up from the operator status (respective to the conditions to be
// removed) and a list of errors.
//
// When a workload is removed (as indicated by the removeWorkload boolean), not only its respective
// status conditions will removed, but also its generations, and its version string will be
// set to "". To make sure that the StatusSyncer removes any versions that are equal to the empty
// string completely, use StatusSyncer.WithEmptyVersionRemoval().
Sync(ctx context.Context, controllerContext factory.SyncContext) (
delegateWorkload *appsv1.Deployment,
operatorConfigAtHighestGeneration bool,
removeWorkload bool,
removedWorkloadName string,
removedWorkloadNamespace string,
errs []error,
)

// PreconditionFulfilled a method that indicates whether all prerequisites are met and we can Sync.
//
Expand Down Expand Up @@ -81,7 +101,7 @@ func NewController(instanceName, operatorNamespace, targetNamespace, targetOpera
kubeClient kubernetes.Interface,
podLister corev1listers.PodLister,
informers []factory.Informer,
tagetNamespaceInformers []factory.Informer,
targetNamespaceInformers []factory.Informer,
delegate Delegate,
openshiftClusterConfigClient openshiftconfigclientv1.ClusterOperatorInterface,
eventRecorder events.Recorder,
Expand All @@ -100,11 +120,11 @@ func NewController(instanceName, operatorNamespace, targetNamespace, targetOpera
delegate: delegate,
openshiftClusterConfigClient: openshiftClusterConfigClient,
versionRecorder: versionRecorder,
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), instanceName),
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultTypedControllerRateLimiter[any](), instanceName),
}

c := factory.New()
for _, nsi := range tagetNamespaceInformers {
for _, nsi := range targetNamespaceInformers {
c.WithNamespaceInformer(nsi, targetNamespace)
}

Expand All @@ -128,14 +148,14 @@ func (c *Controller) sync(ctx context.Context, controllerContext factory.SyncCon
}

if fulfilled, err := c.delegate.PreconditionFulfilled(ctx); err != nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wouldn't it be better to let the delegate communicate everything as we do with the preconditions?

return c.updateOperatorStatus(ctx, operatorStatus, nil, false, false, []error{err})
return c.updateOperatorStatus(ctx, operatorStatus, nil, false, false, false, "", "", []error{err})
} else if !fulfilled {
return c.updateOperatorStatus(ctx, operatorStatus, nil, false, false, nil)
return c.updateOperatorStatus(ctx, operatorStatus, nil, false, false, false, "", "", nil)
}

workload, operatorConfigAtHighestGeneration, errs := c.delegate.Sync(ctx, controllerContext)
workload, operatorConfigAtHighestGeneration, removeWorkload, removedWorkloadName, removedWorkloadNamespace, errs := c.delegate.Sync(ctx, controllerContext)

return c.updateOperatorStatus(ctx, operatorStatus, workload, operatorConfigAtHighestGeneration, true, errs)
return c.updateOperatorStatus(ctx, operatorStatus, workload, operatorConfigAtHighestGeneration, true, removeWorkload, removedWorkloadName, removedWorkloadNamespace, errs)
}

// shouldSync checks ManagementState to determine if we can run this operator, probably set by a cluster administrator.
Expand All @@ -156,48 +176,78 @@ func (c *Controller) shouldSync(ctx context.Context, operatorSpec *operatorv1.Op
}
}

// updateOperatorStatus updates the status based on the actual workload and errors that might have occurred during synchronization.
func (c *Controller) updateOperatorStatus(ctx context.Context, previousStatus *operatorv1.OperatorStatus, workload *appsv1.Deployment, operatorConfigAtHighestGeneration bool, preconditionsReady bool, errs []error) (err error) {
func (c *Controller) updateOperatorStatus(ctx context.Context,
previousStatus *operatorv1.OperatorStatus,
workload *appsv1.Deployment,
operatorConfigAtHighestGeneration, preconditionsReady, removeWorkload bool,
removedWorkloadName, removedWorkloadNamespace string,
errs []error,
) (err error) {

if errs == nil {
errs = []error{}
}

deploymentAvailableCondition := applyoperatorv1.OperatorCondition().
WithType(fmt.Sprintf("%sDeployment%s", c.conditionsPrefix, operatorv1.OperatorStatusTypeAvailable))
typeAvailable := fmt.Sprintf("%sDeployment%s", c.conditionsPrefix, operatorv1.OperatorStatusTypeAvailable)
typeDegraded := fmt.Sprintf("%sDeployment%s", c.conditionsPrefix, operatorv1.OperatorStatusTypeDegraded)
typeProgressing := fmt.Sprintf("%sDeployment%s", c.conditionsPrefix, operatorv1.OperatorStatusTypeProgressing)
typeWorkloadDegraded := fmt.Sprintf("%sWorkload%s", c.conditionsPrefix, operatorv1.OperatorStatusTypeDegraded)

workloadDegradedCondition := applyoperatorv1.OperatorCondition().
WithType(fmt.Sprintf("%sWorkloadDegraded", c.conditionsPrefix))
if removeWorkload {
if len(removedWorkloadName) == 0 || len(removedWorkloadNamespace) == 0 {
return kerrors.NewAggregate(append(errs, fmt.Errorf("workload marked as removed but no name and/or namespace provided (name=%s, ns=%s)", removedWorkloadName, removedWorkloadNamespace)))
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might make more sense to still consider preconditions even when the workload will be deleted later. Thoughts?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given that the delete would happen during the delegate's sync, we shouldn't normally reach the point of removing the conditions if preconditions are failing. But you are right, we should safe-guard against this -- I'll add a check before removing conditions.

deploymentDegradedCondition := applyoperatorv1.OperatorCondition().
WithType(fmt.Sprintf("%sDeploymentDegraded", c.conditionsPrefix))
// the workload has been removed; remove conditions, generations and version
patch := jsonpatch.Merge(
v1helpers.RemoveConditionsJSONPatch(previousStatus, []string{typeAvailable, typeDegraded, typeProgressing, typeWorkloadDegraded}),
v1helpers.RemoveWorkloadGenerationsJSONPatch(previousStatus, removedWorkloadName, removedWorkloadNamespace),
)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we could do without this newline

deploymentProgressingCondition := applyoperatorv1.OperatorCondition().
WithType(fmt.Sprintf("%sDeployment%s", c.conditionsPrefix, operatorv1.OperatorStatusTypeProgressing))
if !patch.IsEmpty() {
c.setVersion(removedWorkloadName, "")
patchErr := c.operatorClient.PatchOperatorStatus(ctx, patch)
if patchErr != nil {
errs = append(errs, patchErr)
}
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

superfluous line

status := applyoperatorv1.OperatorStatus()
if workload != nil {
// The Hash field is not required since the LastGeneration field is enough to uniquely identify a Deployment's desired state
status = status.WithGenerations(applyoperatorv1.GenerationStatus().
WithGroup("apps").
WithResource("deployments").
WithNamespace(workload.Namespace).
WithName(workload.Name).
WithLastGeneration(workload.Generation),
)
return kerrors.NewAggregate(errs)
}

defer func() {
status = status.WithConditions(
deploymentAvailableCondition,
deploymentDegradedCondition,
deploymentProgressingCondition,
workloadDegradedCondition,
)
deploymentAvailableCondition := applyoperatorv1.OperatorCondition().WithType(typeAvailable)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, easier to read in two separate block

deploymentDegradedCondition := applyoperatorv1.OperatorCondition().WithType(typeDegraded)
deploymentProgressingCondition := applyoperatorv1.OperatorCondition().WithType(typeProgressing)
workloadDegradedCondition := applyoperatorv1.OperatorCondition().WithType(typeWorkloadDegraded)

if applyError := c.operatorClient.ApplyOperatorStatus(ctx, c.controllerInstanceName, status); applyError != nil {
err = applyError
}
}()
// update workload conditions, generations and version
statusApplyConfig := applyoperatorv1.OperatorStatus()
statusApplyConfig = c.updateOperatorStatusConditions(previousStatus, statusApplyConfig, workload, preconditionsReady, deploymentAvailableCondition, deploymentDegradedCondition, deploymentProgressingCondition, workloadDegradedCondition, errs)
statusApplyConfig = c.updateOperatorStatusGenerationsVersion(statusApplyConfig, workload, operatorConfigAtHighestGeneration, preconditionsReady)

applyErr := c.operatorClient.ApplyOperatorStatus(ctx, c.controllerInstanceName, statusApplyConfig)
if applyErr != nil {
errs = append(errs, applyErr)
}

return kerrors.NewAggregate(errs)
}

func (c *Controller) updateOperatorStatusConditions(
previousStatus *operatorv1.OperatorStatus,
statusApplyConfig *applyoperatorv1.OperatorStatusApplyConfiguration,
workload *appsv1.Deployment,
preconditionsReady bool,
deploymentAvailableCondition, deploymentDegradedCondition, deploymentProgressingCondition, workloadDegradedCondition *applyoperatorv1.OperatorConditionApplyConfiguration,
errs []error,
) *applyoperatorv1.OperatorStatusApplyConfiguration {

defer statusApplyConfig.WithConditions(
deploymentAvailableCondition,
deploymentDegradedCondition,
deploymentProgressingCondition,
workloadDegradedCondition,
)

if !preconditionsReady {
var message string
Expand Down Expand Up @@ -228,7 +278,7 @@ func (c *Controller) updateOperatorStatus(ctx context.Context, previousStatus *o
WithReason("PreconditionNotFulfilled").
WithMessage(message)

return kerrors.NewAggregate(errs)
return statusApplyConfig
}

if len(errs) > 0 {
Expand Down Expand Up @@ -267,7 +317,7 @@ func (c *Controller) updateOperatorStatus(ctx context.Context, previousStatus *o
WithReason("NoDeployment").
WithMessage(message)

return kerrors.NewAggregate(errs)
return statusApplyConfig
}

if workload.Status.AvailableReplicas == 0 {
Expand Down Expand Up @@ -336,22 +386,55 @@ func (c *Controller) updateOperatorStatus(ctx context.Context, previousStatus *o
WithReason("AsExpected")
}

return statusApplyConfig
}

func (c *Controller) updateOperatorStatusGenerationsVersion(
statusApplyConfig *applyoperatorv1.OperatorStatusApplyConfiguration,
workload *appsv1.Deployment,
operatorConfigAtHighestGeneration, preconditionsReady bool,
) *applyoperatorv1.OperatorStatusApplyConfiguration {

if workload == nil {
return statusApplyConfig
}

statusApplyConfig = statusApplyConfig.WithGenerations(applyoperatorv1.GenerationStatus().
WithGroup("apps").
WithResource("deployments").
WithNamespace(workload.Namespace).
WithName(workload.Name).
WithLastGeneration(workload.Generation),
)

if !preconditionsReady {
return statusApplyConfig
}

desiredReplicas := int32(1)
if workload.Spec.Replicas != nil {
desiredReplicas = *(workload.Spec.Replicas)
}

workloadAtHighestGeneration := workload.ObjectMeta.Generation == workload.Status.ObservedGeneration
workloadHasAllPodsAvailable := workload.Status.AvailableReplicas >= desiredReplicas

// if the deployment is all available and at the expected generation, then update the version to the latest
// when we update, the image pull spec should immediately be different, which should immediately cause a deployment rollout
// which should immediately result in a deployment generation diff, which should cause this block to be skipped until it is ready.
workloadHasAllPodsUpdated := workload.Status.UpdatedReplicas == desiredReplicas
if workloadAtHighestGeneration && workloadHasAllPodsAvailable && workloadHasAllPodsUpdated && operatorConfigAtHighestGeneration {
operandName := workload.Name
if len(c.operandNamePrefix) > 0 {
operandName = fmt.Sprintf("%s-%s", c.operandNamePrefix, workload.Name)
}
c.versionRecorder.SetVersion(operandName, c.targetOperandVersion)
c.setVersion(workload.Name, c.targetOperandVersion)
}

if len(errs) > 0 {
return kerrors.NewAggregate(errs)
return statusApplyConfig
}

func (c *Controller) setVersion(operandName, version string) {
if len(c.operandNamePrefix) > 0 {
operandName = fmt.Sprintf("%s-%s", c.operandNamePrefix, operandName)
}
return nil
c.versionRecorder.SetVersion(operandName, version)
}

// hasDeploymentProgressed returns true if the deployment reports NewReplicaSetAvailable
Expand Down
Loading