-
Notifications
You must be signed in to change notification settings - Fork 94
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
✨ Implement ManifestWorkReplicaSet RollOut strategy #259
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -12,12 +12,12 @@ import ( | |
|
||
clusterlister "open-cluster-management.io/api/client/cluster/listers/cluster/v1beta1" | ||
worklisterv1 "open-cluster-management.io/api/client/work/listers/work/v1" | ||
clusterv1beta1 "open-cluster-management.io/api/cluster/v1beta1" | ||
clusterv1alpha1 "open-cluster-management.io/api/cluster/v1alpha1" | ||
"open-cluster-management.io/api/utils/work/v1/workapplier" | ||
workv1 "open-cluster-management.io/api/work/v1" | ||
workapiv1alpha1 "open-cluster-management.io/api/work/v1alpha1" | ||
|
||
"open-cluster-management.io/ocm/pkg/common/helpers" | ||
"open-cluster-management.io/ocm/pkg/work/helper" | ||
) | ||
|
||
// deployReconciler is to manage ManifestWork based on the placement. | ||
|
@@ -31,90 +31,126 @@ type deployReconciler struct { | |
func (d *deployReconciler) reconcile(ctx context.Context, mwrSet *workapiv1alpha1.ManifestWorkReplicaSet, | ||
) (*workapiv1alpha1.ManifestWorkReplicaSet, reconcileState, error) { | ||
// Manifestwork create/update/delete logic. | ||
var placements []*clusterv1beta1.Placement | ||
var errs []error | ||
var plcsSummary []workapiv1alpha1.PlacementSummary | ||
count, total := 0, 0 | ||
// Getting the placements and the created ManifestWorks related to each placement | ||
for _, placementRef := range mwrSet.Spec.PlacementRefs { | ||
var existingRolloutClsStatus []clusterv1alpha1.ClusterRolloutStatus | ||
existingClusterNames := sets.New[string]() | ||
placement, err := d.placementLister.Placements(mwrSet.Namespace).Get(placementRef.Name) | ||
|
||
if errors.IsNotFound(err) { | ||
apimeta.SetStatusCondition(&mwrSet.Status.Conditions, GetPlacementDecisionVerified(workapiv1alpha1.ReasonPlacementDecisionNotFound, "")) | ||
return mwrSet, reconcileStop, nil | ||
} | ||
|
||
if err != nil { | ||
return mwrSet, reconcileContinue, fmt.Errorf("failed get placement %w", err) | ||
} | ||
placements = append(placements, placement) | ||
} | ||
|
||
manifestWorks, err := listManifestWorksByManifestWorkReplicaSet(mwrSet, d.manifestWorkLister) | ||
if err != nil { | ||
return mwrSet, reconcileContinue, err | ||
} | ||
manifestWorks, err := listManifestWorksByMWRSetPlacementRef(mwrSet, placementRef.Name, d.manifestWorkLister) | ||
if err != nil { | ||
return mwrSet, reconcileContinue, err | ||
} | ||
|
||
var errs []error | ||
addedClusters, deletedClusters, existingClusters := sets.New[string](), sets.New[string](), sets.New[string]() | ||
for _, mw := range manifestWorks { | ||
existingClusters.Insert(mw.Namespace) | ||
} | ||
for _, mw := range manifestWorks { | ||
// Check if ManifestWorkTemplate changes, ManifestWork will need to be updated. | ||
newMW := &workv1.ManifestWork{} | ||
mw.ObjectMeta.DeepCopyInto(&newMW.ObjectMeta) | ||
mwrSet.Spec.ManifestWorkTemplate.DeepCopyInto(&newMW.Spec) | ||
|
||
// TODO: Create NeedToApply function by workApplier to check the manifestWork->spec hash value from the cache. | ||
if !workapplier.ManifestWorkEqual(newMW, mw) { | ||
continue | ||
} | ||
|
||
existingClusterNames.Insert(mw.Namespace) | ||
rolloutClusterStatus, err := d.clusterRolloutStatusFunc(mw.Namespace, *mw) | ||
|
||
for _, placement := range placements { | ||
added, deleted, err := helpers.GetClusterChanges(d.placeDecisionLister, placement, existingClusters) | ||
if err != nil { | ||
errs = append(errs, err) | ||
continue | ||
} | ||
existingRolloutClsStatus = append(existingRolloutClsStatus, rolloutClusterStatus) | ||
} | ||
|
||
placeTracker := helper.GetPlacementTracker(d.placeDecisionLister, placement, existingClusterNames) | ||
rolloutHandler, err := clusterv1alpha1.NewRolloutHandler(placeTracker, d.clusterRolloutStatusFunc) | ||
if err != nil { | ||
apimeta.SetStatusCondition(&mwrSet.Status.Conditions, GetPlacementDecisionVerified(workapiv1alpha1.ReasonNotAsExpected, "")) | ||
|
||
return mwrSet, reconcileContinue, utilerrors.NewAggregate(errs) | ||
} | ||
|
||
addedClusters = addedClusters.Union(added) | ||
deletedClusters = deletedClusters.Union(deleted) | ||
} | ||
|
||
// Create manifestWork for added clusters | ||
for cls := range addedClusters { | ||
mw, err := CreateManifestWork(mwrSet, cls) | ||
err = placeTracker.Refresh() | ||
if err != nil { | ||
errs = append(errs, err) | ||
continue | ||
} | ||
|
||
_, err = d.workApplier.Apply(ctx, mw) | ||
_, rolloutResult, err := rolloutHandler.GetRolloutCluster(placementRef.RolloutStrategy, existingRolloutClsStatus) | ||
|
||
if err != nil { | ||
errs = append(errs, err) | ||
apimeta.SetStatusCondition(&mwrSet.Status.Conditions, GetPlacementDecisionVerified(workapiv1alpha1.ReasonNotAsExpected, "")) | ||
|
||
continue | ||
} | ||
} | ||
|
||
// Update manifestWorks in case there are changes at ManifestWork or ManifestWorkReplicaSet | ||
for cls := range existingClusters { | ||
// Delete manifestWork for deleted clusters | ||
if deletedClusters.Has(cls) { | ||
err = d.workApplier.Delete(ctx, cls, mwrSet.Name) | ||
// Create ManifestWorks | ||
for _, rolloutStatue := range rolloutResult.ClustersToRollout { | ||
if rolloutStatue.Status == clusterv1alpha1.ToApply { | ||
mw, err := CreateManifestWork(mwrSet, rolloutStatue.ClusterName, placementRef.Name) | ||
if err != nil { | ||
errs = append(errs, err) | ||
continue | ||
} | ||
|
||
_, err = d.workApplier.Apply(ctx, mw) | ||
if err != nil { | ||
errs = append(errs, err) | ||
} | ||
if !existingClusterNames.Has(rolloutStatue.ClusterName) { | ||
existingClusterNames.Insert(rolloutStatue.ClusterName) | ||
} | ||
} | ||
} | ||
|
||
for _, cls := range rolloutResult.ClustersRemoved { | ||
// Delete manifestWork for removed clusters | ||
err = d.workApplier.Delete(ctx, cls.ClusterName, mwrSet.Name) | ||
if err != nil { | ||
errs = append(errs, err) | ||
continue | ||
} | ||
continue | ||
existingClusterNames.Delete(cls.ClusterName) | ||
} | ||
|
||
mw, err := CreateManifestWork(mwrSet, cls) | ||
if err != nil { | ||
errs = append(errs, err) | ||
continue | ||
total = total + int(placement.Status.NumberOfSelectedClusters) | ||
plcSummary := workapiv1alpha1.PlacementSummary{ | ||
Name: placementRef.Name, | ||
AvailableDecisionGroups: getAvailableDecisionGroupProgressMessage(len(placement.Status.DecisionGroups), | ||
len(existingClusterNames), placement.Status.NumberOfSelectedClusters), | ||
} | ||
|
||
_, err = d.workApplier.Apply(ctx, mw) | ||
if err != nil { | ||
errs = append(errs, err) | ||
mwrSetSummary := workapiv1alpha1.ManifestWorkReplicaSetSummary{ | ||
Total: len(existingClusterNames), | ||
} | ||
plcSummary.Summary = mwrSetSummary | ||
plcsSummary = append(plcsSummary, plcSummary) | ||
|
||
count = count + len(existingClusterNames) | ||
} | ||
// Set the placements summary | ||
mwrSet.Status.PlacementsSummary = plcsSummary | ||
|
||
// Set the Summary | ||
if mwrSet.Status.Summary == (workapiv1alpha1.ManifestWorkReplicaSetSummary{}) { | ||
mwrSet.Status.Summary = workapiv1alpha1.ManifestWorkReplicaSetSummary{} | ||
} | ||
total := len(existingClusters) - len(deletedClusters) + len(addedClusters) | ||
if total < 0 { | ||
total = 0 | ||
} | ||
|
||
mwrSet.Status.Summary.Total = total | ||
if total == 0 { | ||
mwrSet.Status.Summary.Total = count | ||
if count == 0 { | ||
mwrSet.Status.Summary.Applied = 0 | ||
mwrSet.Status.Summary.Available = 0 | ||
mwrSet.Status.Summary.Degraded = 0 | ||
|
@@ -124,9 +160,54 @@ func (d *deployReconciler) reconcile(ctx context.Context, mwrSet *workapiv1alpha | |
apimeta.SetStatusCondition(&mwrSet.Status.Conditions, GetPlacementDecisionVerified(workapiv1alpha1.ReasonAsExpected, "")) | ||
} | ||
|
||
if total == count { | ||
apimeta.SetStatusCondition(&mwrSet.Status.Conditions, GetPlacementRollOut(workapiv1alpha1.ReasonComplete, "")) | ||
} else { | ||
apimeta.SetStatusCondition(&mwrSet.Status.Conditions, GetPlacementRollOut(workapiv1alpha1.ReasonProgressing, "")) | ||
} | ||
|
||
return mwrSet, reconcileContinue, utilerrors.NewAggregate(errs) | ||
} | ||
|
||
func (d *deployReconciler) clusterRolloutStatusFunc(clusterName string, manifestWork workv1.ManifestWork) (clusterv1alpha1.ClusterRolloutStatus, error) { | ||
clsRolloutStatus := clusterv1alpha1.ClusterRolloutStatus{ | ||
ClusterName: clusterName, | ||
LastTransitionTime: &manifestWork.CreationTimestamp, | ||
// Default status is ToApply | ||
Status: clusterv1alpha1.ToApply, | ||
} | ||
|
||
appliedCondition := apimeta.FindStatusCondition(manifestWork.Status.Conditions, workv1.WorkApplied) | ||
|
||
// Applied condition not exist return status as ToApply. | ||
if appliedCondition == nil { | ||
return clsRolloutStatus, nil | ||
} else if appliedCondition.Status == metav1.ConditionTrue || | ||
apimeta.IsStatusConditionTrue(manifestWork.Status.Conditions, workv1.WorkProgressing) { | ||
// Applied OR Progressing conditions status true return status as Progressing | ||
// ManifestWork Progressing status is not defined however the check is made for future work availability. | ||
clsRolloutStatus.Status = clusterv1alpha1.Progressing | ||
} else if appliedCondition.Status == metav1.ConditionFalse { | ||
// Applied Condition status false return status as failed | ||
clsRolloutStatus.Status = clusterv1alpha1.Failed | ||
return clsRolloutStatus, nil | ||
} | ||
|
||
// Available condition return status as Succeeded | ||
if apimeta.IsStatusConditionTrue(manifestWork.Status.Conditions, workv1.WorkAvailable) { | ||
clsRolloutStatus.Status = clusterv1alpha1.Succeeded | ||
return clsRolloutStatus, nil | ||
} | ||
|
||
// Degraded condition return status as Failed | ||
// ManifestWork Degraded status is not defined however the check is made for future work availability. | ||
if apimeta.IsStatusConditionTrue(manifestWork.Status.Conditions, workv1.WorkDegraded) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There is not degraded status defined yet. We should add some note here. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, same there is no degraded status defined. I Considered the Applied condition with False status as Failed, added comments to clarify that. |
||
clsRolloutStatus.Status = clusterv1alpha1.Failed | ||
} | ||
|
||
return clsRolloutStatus, nil | ||
} | ||
|
||
// GetManifestworkApplied return only True status if there all clusters have manifests applied as expected | ||
func GetManifestworkApplied(reason string, message string) metav1.Condition { | ||
if reason == workapiv1alpha1.ReasonAsExpected { | ||
|
@@ -146,6 +227,15 @@ func GetPlacementDecisionVerified(reason string, message string) metav1.Conditio | |
return getCondition(workapiv1alpha1.ManifestWorkReplicaSetConditionPlacementVerified, reason, message, metav1.ConditionFalse) | ||
} | ||
|
||
// GetPlacementRollout return only True status if there are clusters selected | ||
func GetPlacementRollOut(reason string, message string) metav1.Condition { | ||
if reason == workapiv1alpha1.ReasonComplete { | ||
return getCondition(workapiv1alpha1.ManifestWorkReplicaSetConditionPlacementRolledOut, reason, message, metav1.ConditionTrue) | ||
} | ||
|
||
return getCondition(workapiv1alpha1.ManifestWorkReplicaSetConditionPlacementRolledOut, reason, message, metav1.ConditionFalse) | ||
} | ||
|
||
func getCondition(conditionType string, reason string, message string, status metav1.ConditionStatus) metav1.Condition { | ||
return metav1.Condition{ | ||
Type: conditionType, | ||
|
@@ -156,7 +246,7 @@ func getCondition(conditionType string, reason string, message string, status me | |
} | ||
} | ||
|
||
func CreateManifestWork(mwrSet *workapiv1alpha1.ManifestWorkReplicaSet, clusterNS string) (*workv1.ManifestWork, error) { | ||
func CreateManifestWork(mwrSet *workapiv1alpha1.ManifestWorkReplicaSet, clusterNS string, placementRefName string) (*workv1.ManifestWork, error) { | ||
if clusterNS == "" { | ||
return nil, fmt.Errorf("invalid cluster namespace") | ||
} | ||
|
@@ -165,7 +255,12 @@ func CreateManifestWork(mwrSet *workapiv1alpha1.ManifestWorkReplicaSet, clusterN | |
ObjectMeta: metav1.ObjectMeta{ | ||
Name: mwrSet.Name, | ||
Namespace: clusterNS, | ||
Labels: map[string]string{ManifestWorkReplicaSetControllerNameLabelKey: manifestWorkReplicaSetKey(mwrSet)}, | ||
Labels: map[string]string{ManifestWorkReplicaSetControllerNameLabelKey: manifestWorkReplicaSetKey(mwrSet), | ||
ManifestWorkReplicaSetPlacementNameLabelKey: placementRefName}, | ||
}, | ||
Spec: mwrSet.Spec.ManifestWorkTemplate}, nil | ||
} | ||
|
||
func getAvailableDecisionGroupProgressMessage(groupNum int, existingClsCount int, totalCls int32) string { | ||
return fmt.Sprintf("%d (%d / %d clusters applied)", groupNum, existingClsCount, totalCls) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we have a WorkProgressing status right now?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think so unless you can point me. I checked the manifestWork controllers sounds like there is no WorkProgressing status defined yet. I added comment to clarify that.