Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

✨ Implement ManifestWorkReplicaSet RollOut strategy #259

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/common/testing/assertion.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func AssertErrorWithPrefix(t *testing.T, actual error, expectedErrorPrefix strin
func AssertActions(t *testing.T, actualActions []clienttesting.Action, expectedVerbs ...string) {
t.Helper()
if len(actualActions) != len(expectedVerbs) {
t.Fatalf("expected %d call but got: %#v", len(expectedVerbs), actualActions)
t.Fatalf("expected %d call but got %d: %#v", len(expectedVerbs), len(actualActions), actualActions)
}
for i, expected := range expectedVerbs {
if actualActions[i].GetVerb() != expected {
Expand Down
26 changes: 26 additions & 0 deletions pkg/work/helper/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,16 @@ import (
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/dynamic"
"k8s.io/klog/v2"

clusterlister "open-cluster-management.io/api/client/cluster/listers/cluster/v1beta1"
clusterv1beta1 "open-cluster-management.io/api/cluster/v1beta1"
workapiv1 "open-cluster-management.io/api/work/v1"
)

Expand Down Expand Up @@ -467,3 +471,25 @@ func BuildResourceMeta(
resourceMeta.Resource = mapping.Resource.Resource
return resourceMeta, mapping.Resource, err
}

type PlacementDecisionGetter struct {
Client clusterlister.PlacementDecisionLister
}

func (pdl PlacementDecisionGetter) List(selector labels.Selector, namespace string) ([]*clusterv1beta1.PlacementDecision, error) {
return pdl.Client.PlacementDecisions(namespace).List(selector)
}

// Get added and deleted clusters names
func GetClusters(client clusterlister.PlacementDecisionLister, placement *clusterv1beta1.Placement,
existingClusters sets.Set[string]) (sets.Set[string], sets.Set[string], error) {
pdtracker := GetPlacementTracker(client, placement, existingClusters)

return pdtracker.GetClusterChanges()
}

func GetPlacementTracker(client clusterlister.PlacementDecisionLister, placement *clusterv1beta1.Placement,
existingClusters sets.Set[string]) *clusterv1beta1.PlacementDecisionClustersTracker {

return clusterv1beta1.NewPlacementDecisionClustersTracker(placement, PlacementDecisionGetter{Client: client}, existingClusters)
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ const (
// TODO move this to the api repo
ManifestWorkReplicaSetControllerNameLabelKey = "work.open-cluster-management.io/manifestworkreplicaset"

// ManifestWorkReplicaSetPlacementNameLabelKey is the label key on manifestwork to ref to the Placement that select
// the managedCluster on the manifestWorkReplicaSet's PlacementRef.
ManifestWorkReplicaSetPlacementNameLabelKey = "work.open-cluster-management.io/placementname"

// ManifestWorkReplicaSetFinalizer is the name of the finalizer added to ManifestWorkReplicaSet. It is used to ensure
// related manifestworks is deleted
ManifestWorkReplicaSetFinalizer = "work.open-cluster-management.io/manifest-work-cleanup"
Expand Down Expand Up @@ -124,7 +128,7 @@ func newController(workClient workclientset.Interface,
}
}

// sync is the main reconcile loop for placeManifest work. It is triggered every 15sec
// sync is the main reconcile loop for ManifestWorkReplicaSet. It is triggered every 15sec
func (m *ManifestWorkReplicaSetController) sync(ctx context.Context, controllerContext factory.SyncContext) error {
key := controllerContext.QueueKey()
klog.V(4).Infof("Reconciling ManifestWorkReplicaSet %q", key)
Expand Down Expand Up @@ -180,3 +184,19 @@ func listManifestWorksByManifestWorkReplicaSet(mwrs *workapiv1alpha1.ManifestWor
selector := labels.NewSelector().Add(*req)
return manifestWorkLister.List(selector)
}

func listManifestWorksByMWRSetPlacementRef(mwrs *workapiv1alpha1.ManifestWorkReplicaSet, placementName string,
manifestWorkLister worklisterv1.ManifestWorkLister) ([]*workapiv1.ManifestWork, error) {
reqMWRSet, err := labels.NewRequirement(ManifestWorkReplicaSetControllerNameLabelKey, selection.Equals, []string{manifestWorkReplicaSetKey(mwrs)})
if err != nil {
return nil, err
}

reqPlacementRef, err := labels.NewRequirement(ManifestWorkReplicaSetPlacementNameLabelKey, selection.Equals, []string{placementName})
if err != nil {
return nil, err
}

selector := labels.NewSelector().Add(*reqMWRSet, *reqPlacementRef)
return manifestWorkLister.List(selector)
}
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ func TestManifestWorkReplicaSetControllerPatchStatus(t *testing.T) {
w.Finalizers = []string{ManifestWorkReplicaSetFinalizer}
return w
}(),
works: helpertest.CreateTestManifestWorks("test", "default", "cluster1", "cluster2"),
works: helpertest.CreateTestManifestWorks("test", "default", "placement", "cluster1", "cluster2"),
placement: func() *clusterv1beta1.Placement {
p, _ := helpertest.CreateTestPlacement("placement", "default", "cluster1", "cluster2")
return p
Expand Down Expand Up @@ -181,7 +181,7 @@ func TestManifestWorkReplicaSetControllerPatchStatus(t *testing.T) {
w.Finalizers = []string{ManifestWorkReplicaSetFinalizer}
return w
}(),
works: helpertest.CreateTestManifestWorks("test", "default", "cluster1", "cluster2"),
works: helpertest.CreateTestManifestWorks("test", "default", "placement", "cluster1", "cluster2"),
placement: func() *clusterv1beta1.Placement {
p, _ := helpertest.CreateTestPlacement("placement", "default", "cluster2", "cluster3", "cluster4")
return p
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,12 @@ import (

clusterlister "open-cluster-management.io/api/client/cluster/listers/cluster/v1beta1"
worklisterv1 "open-cluster-management.io/api/client/work/listers/work/v1"
clusterv1beta1 "open-cluster-management.io/api/cluster/v1beta1"
clusterv1alpha1 "open-cluster-management.io/api/cluster/v1alpha1"
"open-cluster-management.io/api/utils/work/v1/workapplier"
workv1 "open-cluster-management.io/api/work/v1"
workapiv1alpha1 "open-cluster-management.io/api/work/v1alpha1"

"open-cluster-management.io/ocm/pkg/common/helpers"
"open-cluster-management.io/ocm/pkg/work/helper"
)

// deployReconciler is to manage ManifestWork based on the placement.
Expand All @@ -31,90 +31,126 @@ type deployReconciler struct {
func (d *deployReconciler) reconcile(ctx context.Context, mwrSet *workapiv1alpha1.ManifestWorkReplicaSet,
) (*workapiv1alpha1.ManifestWorkReplicaSet, reconcileState, error) {
// Manifestwork create/update/delete logic.
var placements []*clusterv1beta1.Placement
var errs []error
var plcsSummary []workapiv1alpha1.PlacementSummary
count, total := 0, 0
// Getting the placements and the created ManifestWorks related to each placement
for _, placementRef := range mwrSet.Spec.PlacementRefs {
var existingRolloutClsStatus []clusterv1alpha1.ClusterRolloutStatus
existingClusterNames := sets.New[string]()
placement, err := d.placementLister.Placements(mwrSet.Namespace).Get(placementRef.Name)

if errors.IsNotFound(err) {
apimeta.SetStatusCondition(&mwrSet.Status.Conditions, GetPlacementDecisionVerified(workapiv1alpha1.ReasonPlacementDecisionNotFound, ""))
return mwrSet, reconcileStop, nil
}

if err != nil {
return mwrSet, reconcileContinue, fmt.Errorf("failed get placement %w", err)
}
placements = append(placements, placement)
}

manifestWorks, err := listManifestWorksByManifestWorkReplicaSet(mwrSet, d.manifestWorkLister)
if err != nil {
return mwrSet, reconcileContinue, err
}
manifestWorks, err := listManifestWorksByMWRSetPlacementRef(mwrSet, placementRef.Name, d.manifestWorkLister)
if err != nil {
return mwrSet, reconcileContinue, err
}

var errs []error
addedClusters, deletedClusters, existingClusters := sets.New[string](), sets.New[string](), sets.New[string]()
for _, mw := range manifestWorks {
existingClusters.Insert(mw.Namespace)
}
for _, mw := range manifestWorks {
// Check if ManifestWorkTemplate changes, ManifestWork will need to be updated.
newMW := &workv1.ManifestWork{}
mw.ObjectMeta.DeepCopyInto(&newMW.ObjectMeta)
mwrSet.Spec.ManifestWorkTemplate.DeepCopyInto(&newMW.Spec)

// TODO: Create NeedToApply function by workApplier to check the manifestWork->spec hash value from the cache.
if !workapplier.ManifestWorkEqual(newMW, mw) {
continue
}

existingClusterNames.Insert(mw.Namespace)
rolloutClusterStatus, err := d.clusterRolloutStatusFunc(mw.Namespace, *mw)

for _, placement := range placements {
added, deleted, err := helpers.GetClusterChanges(d.placeDecisionLister, placement, existingClusters)
if err != nil {
errs = append(errs, err)
continue
}
existingRolloutClsStatus = append(existingRolloutClsStatus, rolloutClusterStatus)
}

placeTracker := helper.GetPlacementTracker(d.placeDecisionLister, placement, existingClusterNames)
rolloutHandler, err := clusterv1alpha1.NewRolloutHandler(placeTracker, d.clusterRolloutStatusFunc)
if err != nil {
apimeta.SetStatusCondition(&mwrSet.Status.Conditions, GetPlacementDecisionVerified(workapiv1alpha1.ReasonNotAsExpected, ""))

return mwrSet, reconcileContinue, utilerrors.NewAggregate(errs)
}

addedClusters = addedClusters.Union(added)
deletedClusters = deletedClusters.Union(deleted)
}

// Create manifestWork for added clusters
for cls := range addedClusters {
mw, err := CreateManifestWork(mwrSet, cls)
err = placeTracker.Refresh()
if err != nil {
errs = append(errs, err)
continue
}

_, err = d.workApplier.Apply(ctx, mw)
_, rolloutResult, err := rolloutHandler.GetRolloutCluster(placementRef.RolloutStrategy, existingRolloutClsStatus)

if err != nil {
errs = append(errs, err)
apimeta.SetStatusCondition(&mwrSet.Status.Conditions, GetPlacementDecisionVerified(workapiv1alpha1.ReasonNotAsExpected, ""))

continue
}
}

// Update manifestWorks in case there are changes at ManifestWork or ManifestWorkReplicaSet
for cls := range existingClusters {
// Delete manifestWork for deleted clusters
if deletedClusters.Has(cls) {
err = d.workApplier.Delete(ctx, cls, mwrSet.Name)
// Create ManifestWorks
for _, rolloutStatue := range rolloutResult.ClustersToRollout {
if rolloutStatue.Status == clusterv1alpha1.ToApply {
mw, err := CreateManifestWork(mwrSet, rolloutStatue.ClusterName, placementRef.Name)
if err != nil {
errs = append(errs, err)
continue
}

_, err = d.workApplier.Apply(ctx, mw)
if err != nil {
errs = append(errs, err)
}
if !existingClusterNames.Has(rolloutStatue.ClusterName) {
existingClusterNames.Insert(rolloutStatue.ClusterName)
}
}
}

for _, cls := range rolloutResult.ClustersRemoved {
// Delete manifestWork for removed clusters
err = d.workApplier.Delete(ctx, cls.ClusterName, mwrSet.Name)
if err != nil {
errs = append(errs, err)
continue
}
continue
existingClusterNames.Delete(cls.ClusterName)
}

mw, err := CreateManifestWork(mwrSet, cls)
if err != nil {
errs = append(errs, err)
continue
total = total + int(placement.Status.NumberOfSelectedClusters)
plcSummary := workapiv1alpha1.PlacementSummary{
Name: placementRef.Name,
AvailableDecisionGroups: getAvailableDecisionGroupProgressMessage(len(placement.Status.DecisionGroups),
len(existingClusterNames), placement.Status.NumberOfSelectedClusters),
}

_, err = d.workApplier.Apply(ctx, mw)
if err != nil {
errs = append(errs, err)
mwrSetSummary := workapiv1alpha1.ManifestWorkReplicaSetSummary{
Total: len(existingClusterNames),
}
plcSummary.Summary = mwrSetSummary
plcsSummary = append(plcsSummary, plcSummary)

count = count + len(existingClusterNames)
}
// Set the placements summary
mwrSet.Status.PlacementsSummary = plcsSummary

// Set the Summary
if mwrSet.Status.Summary == (workapiv1alpha1.ManifestWorkReplicaSetSummary{}) {
mwrSet.Status.Summary = workapiv1alpha1.ManifestWorkReplicaSetSummary{}
}
total := len(existingClusters) - len(deletedClusters) + len(addedClusters)
if total < 0 {
total = 0
}

mwrSet.Status.Summary.Total = total
if total == 0 {
mwrSet.Status.Summary.Total = count
if count == 0 {
mwrSet.Status.Summary.Applied = 0
mwrSet.Status.Summary.Available = 0
mwrSet.Status.Summary.Degraded = 0
Expand All @@ -124,9 +160,54 @@ func (d *deployReconciler) reconcile(ctx context.Context, mwrSet *workapiv1alpha
apimeta.SetStatusCondition(&mwrSet.Status.Conditions, GetPlacementDecisionVerified(workapiv1alpha1.ReasonAsExpected, ""))
}

if total == count {
apimeta.SetStatusCondition(&mwrSet.Status.Conditions, GetPlacementRollOut(workapiv1alpha1.ReasonComplete, ""))
} else {
apimeta.SetStatusCondition(&mwrSet.Status.Conditions, GetPlacementRollOut(workapiv1alpha1.ReasonProgressing, ""))
}

return mwrSet, reconcileContinue, utilerrors.NewAggregate(errs)
}

func (d *deployReconciler) clusterRolloutStatusFunc(clusterName string, manifestWork workv1.ManifestWork) (clusterv1alpha1.ClusterRolloutStatus, error) {
clsRolloutStatus := clusterv1alpha1.ClusterRolloutStatus{
ClusterName: clusterName,
LastTransitionTime: &manifestWork.CreationTimestamp,
// Default status is ToApply
Status: clusterv1alpha1.ToApply,
}

appliedCondition := apimeta.FindStatusCondition(manifestWork.Status.Conditions, workv1.WorkApplied)

// Applied condition not exist return status as ToApply.
if appliedCondition == nil {
return clsRolloutStatus, nil
} else if appliedCondition.Status == metav1.ConditionTrue ||
apimeta.IsStatusConditionTrue(manifestWork.Status.Conditions, workv1.WorkProgressing) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we have a WorkProgressing status right now?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think so unless you can point me. I checked the manifestWork controllers sounds like there is no WorkProgressing status defined yet. I added comment to clarify that.

// Applied OR Progressing conditions status true return status as Progressing
// ManifestWork Progressing status is not defined however the check is made for future work availability.
clsRolloutStatus.Status = clusterv1alpha1.Progressing
} else if appliedCondition.Status == metav1.ConditionFalse {
// Applied Condition status false return status as failed
clsRolloutStatus.Status = clusterv1alpha1.Failed
return clsRolloutStatus, nil
}

// Available condition return status as Succeeded
if apimeta.IsStatusConditionTrue(manifestWork.Status.Conditions, workv1.WorkAvailable) {
clsRolloutStatus.Status = clusterv1alpha1.Succeeded
return clsRolloutStatus, nil
}

// Degraded condition return status as Failed
// ManifestWork Degraded status is not defined however the check is made for future work availability.
if apimeta.IsStatusConditionTrue(manifestWork.Status.Conditions, workv1.WorkDegraded) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is not degraded status defined yet. We should add some note here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, same there is no degraded status defined. I Considered the Applied condition with False status as Failed, added comments to clarify that.

clsRolloutStatus.Status = clusterv1alpha1.Failed
}

return clsRolloutStatus, nil
}

// GetManifestworkApplied return only True status if there all clusters have manifests applied as expected
func GetManifestworkApplied(reason string, message string) metav1.Condition {
if reason == workapiv1alpha1.ReasonAsExpected {
Expand All @@ -146,6 +227,15 @@ func GetPlacementDecisionVerified(reason string, message string) metav1.Conditio
return getCondition(workapiv1alpha1.ManifestWorkReplicaSetConditionPlacementVerified, reason, message, metav1.ConditionFalse)
}

// GetPlacementRollout return only True status if there are clusters selected
func GetPlacementRollOut(reason string, message string) metav1.Condition {
if reason == workapiv1alpha1.ReasonComplete {
return getCondition(workapiv1alpha1.ManifestWorkReplicaSetConditionPlacementRolledOut, reason, message, metav1.ConditionTrue)
}

return getCondition(workapiv1alpha1.ManifestWorkReplicaSetConditionPlacementRolledOut, reason, message, metav1.ConditionFalse)
}

func getCondition(conditionType string, reason string, message string, status metav1.ConditionStatus) metav1.Condition {
return metav1.Condition{
Type: conditionType,
Expand All @@ -156,7 +246,7 @@ func getCondition(conditionType string, reason string, message string, status me
}
}

func CreateManifestWork(mwrSet *workapiv1alpha1.ManifestWorkReplicaSet, clusterNS string) (*workv1.ManifestWork, error) {
func CreateManifestWork(mwrSet *workapiv1alpha1.ManifestWorkReplicaSet, clusterNS string, placementRefName string) (*workv1.ManifestWork, error) {
if clusterNS == "" {
return nil, fmt.Errorf("invalid cluster namespace")
}
Expand All @@ -165,7 +255,12 @@ func CreateManifestWork(mwrSet *workapiv1alpha1.ManifestWorkReplicaSet, clusterN
ObjectMeta: metav1.ObjectMeta{
Name: mwrSet.Name,
Namespace: clusterNS,
Labels: map[string]string{ManifestWorkReplicaSetControllerNameLabelKey: manifestWorkReplicaSetKey(mwrSet)},
Labels: map[string]string{ManifestWorkReplicaSetControllerNameLabelKey: manifestWorkReplicaSetKey(mwrSet),
ManifestWorkReplicaSetPlacementNameLabelKey: placementRefName},
},
Spec: mwrSet.Spec.ManifestWorkTemplate}, nil
}

func getAvailableDecisionGroupProgressMessage(groupNum int, existingClsCount int, totalCls int32) string {
return fmt.Sprintf("%d (%d / %d clusters applied)", groupNum, existingClsCount, totalCls)
}
Loading
Loading