Skip to content

Commit

Permalink
Update API and new logic
Browse files Browse the repository at this point in the history
Signed-off-by: melserngawy <melserng@redhat.com>
  • Loading branch information
serngawy committed Sep 29, 2023
1 parent bf8bf67 commit 3de8620
Show file tree
Hide file tree
Showing 8 changed files with 274 additions and 179 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ require (
k8s.io/kube-aggregator v0.27.2
k8s.io/utils v0.0.0-20230313181309-38a27ef9d749
open-cluster-management.io/addon-framework v0.7.1-0.20230920005921-65bcbb446df8
open-cluster-management.io/api v0.12.0
open-cluster-management.io/api v0.12.1-0.20230925140632-bf4f47ea90d1
sigs.k8s.io/controller-runtime v0.15.0
sigs.k8s.io/kube-storage-version-migrator v0.0.5
)
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1158,8 +1158,8 @@ k8s.io/utils v0.0.0-20230313181309-38a27ef9d749 h1:xMMXJlJbsU8w3V5N2FLDQ8YgU8s1E
k8s.io/utils v0.0.0-20230313181309-38a27ef9d749/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0=
open-cluster-management.io/addon-framework v0.7.1-0.20230920005921-65bcbb446df8 h1:cVyjsSeboWwgg2bkMU2s78hkUTK3LzVyQMWwEf+/gRw=
open-cluster-management.io/addon-framework v0.7.1-0.20230920005921-65bcbb446df8/go.mod h1:xdIh8sARZ7zoH/KvHp9ATYoousIdotI+Js0VZt0+qtc=
open-cluster-management.io/api v0.12.0 h1:sNkj4k2XyWA/GLsTiFg82bLIZ7JDZKkLLLyZjJUlJMs=
open-cluster-management.io/api v0.12.0/go.mod h1:/CZhelEH+30/pX7vXGSZOzLMX0zvjthYOkT/5ZTzVTQ=
open-cluster-management.io/api v0.12.1-0.20230925140632-bf4f47ea90d1 h1:8r0fdost7Yhvvz+xJb7xkj/tLZ4DigxiGoEJGakXhUg=
open-cluster-management.io/api v0.12.1-0.20230925140632-bf4f47ea90d1/go.mod h1:/CZhelEH+30/pX7vXGSZOzLMX0zvjthYOkT/5ZTzVTQ=
rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8=
rsc.io/quote/v3 v3.1.0/go.mod h1:yEA65RcK8LyAZtP9Kv3t0HmxON59tX3rD+tICJqUlj0=
rsc.io/sampler v1.3.0/go.mod h1:T1hPZKmBbMNahiBKFy5HrXp6adAjACjK9JXDnKaTXpA=
Expand Down
19 changes: 10 additions & 9 deletions pkg/work/helper/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import (
"k8s.io/klog/v2"

clusterlister "open-cluster-management.io/api/client/cluster/listers/cluster/v1beta1"
clusterv1alpha1 "open-cluster-management.io/api/cluster/v1alpha1"
//clusterv1alpha1 "open-cluster-management.io/api/cluster/v1alpha1"
clusterv1beta1 "open-cluster-management.io/api/cluster/v1beta1"
workapiv1 "open-cluster-management.io/api/work/v1"
)
Expand Down Expand Up @@ -484,15 +484,16 @@ func (pdl PlacementDecisionGetter) List(selector labels.Selector, namespace stri
// Get added and deleted clusters names
func GetClusters(client clusterlister.PlacementDecisionLister, placement *clusterv1beta1.Placement,
existingClusters sets.Set[string]) (sets.Set[string], sets.Set[string], error) {
existingClusterGroups := make(map[clusterv1beta1.GroupKey]sets.Set[string])
pdtracker := clusterv1beta1.NewPlacementDecisionClustersTracker(placement, PlacementDecisionGetter{Client: client}, existingClusters, existingClusterGroups)
pdtracker := GetPlacementTracker(client, placement, existingClusters)

return pdtracker.Get()
return pdtracker.GetClusterChanges()
}

func GetRollOutHandler(client clusterlister.PlacementDecisionLister, placement *clusterv1beta1.Placement, existingClusters sets.Set[string]) (*clusterv1alpha1.RolloutHandler, error) {
existingClusterGroups := make(map[clusterv1beta1.GroupKey]sets.Set[string])
pdtracker := clusterv1beta1.NewPlacementDecisionClustersTracker(placement, PlacementDecisionGetter{Client: client}, existingClusters, existingClusterGroups)

return clusterv1alpha1.NewRolloutHandler(pdtracker)
func GetPlacementTracker(client clusterlister.PlacementDecisionLister, placement *clusterv1beta1.Placement, existingClusters sets.Set[string]) *clusterv1beta1.PlacementDecisionClustersTracker {
return clusterv1beta1.NewPlacementDecisionClustersTracker(placement, PlacementDecisionGetter{Client: client}, existingClusters)
}
//func GetRollOutHandler(client clusterlister.PlacementDecisionLister, placement *clusterv1beta1.Placement, existingClusters sets.Set[string], statusFunc clusterv1alpha1.ClusterRolloutStatusFunc[workapiv1.ManifestWork]) (*clusterv1alpha1.RolloutHandler, error) {
// pdtracker := clusterv1beta1.NewPlacementDecisionClustersTracker(placement, PlacementDecisionGetter{Client: client}, existingClusters)
//
// return clusterv1alpha1.NewRolloutHandler(pdtracker, statusFunc)
//}
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,13 @@ import (

clusterlister "open-cluster-management.io/api/client/cluster/listers/cluster/v1beta1"
worklisterv1 "open-cluster-management.io/api/client/work/listers/work/v1"
clusterv1beta1 "open-cluster-management.io/api/cluster/v1beta1"
//clusterv1beta1 "open-cluster-management.io/api/cluster/v1beta1"
clusterv1alpha1 "open-cluster-management.io/api/cluster/v1alpha1"
"open-cluster-management.io/api/utils/work/v1/workapplier"
workv1 "open-cluster-management.io/api/work/v1"
workapiv1alpha1 "open-cluster-management.io/api/work/v1alpha1"

"open-cluster-management.io/ocm/pkg/common/helpers"
"open-cluster-management.io/ocm/pkg/work/helper"
)

// deployReconciler is to manage ManifestWork based on the placement.
Expand All @@ -31,84 +32,99 @@ type deployReconciler struct {
func (d *deployReconciler) reconcile(ctx context.Context, mwrSet *workapiv1alpha1.ManifestWorkReplicaSet,
) (*workapiv1alpha1.ManifestWorkReplicaSet, reconcileState, error) {
// Manifestwork create/update/delete logic.
var placements []*clusterv1beta1.Placement

//var placements []*clusterv1beta1.Placement
//existingClusters := make(map[string]sets.String)
// placementName as key and RolloutResult is the value
//placementsRolloutResults := make(map[string]clusterv1alpha1.RolloutResult)
var errs []error
count := 0

// Getting the placements and the created ManifestWorks related to each placement
for _, placementRef := range mwrSet.Spec.PlacementRefs {
var existingRolloutClsStatus []clusterv1alpha1.ClusterRolloutStatus
existingClusterNames := sets.New[string]()
placement, err := d.placementLister.Placements(mwrSet.Namespace).Get(placementRef.Name)

if errors.IsNotFound(err) {
apimeta.SetStatusCondition(&mwrSet.Status.Conditions, GetPlacementDecisionVerified(workapiv1alpha1.ReasonPlacementDecisionNotFound, ""))
return mwrSet, reconcileStop, nil
}

if err != nil {
return mwrSet, reconcileContinue, fmt.Errorf("failed get placement %w", err)
}
placements = append(placements, placement)
}

manifestWorks, err := listManifestWorksByManifestWorkReplicaSet(mwrSet, d.manifestWorkLister)
if err != nil {
return mwrSet, reconcileContinue, err
}
//placements = append(placements, placement)
manifestWorks, err := listManifestWorksByMWRSetPlacementRef(mwrSet, placementRef.Name, d.manifestWorkLister)
if err != nil {
return mwrSet, reconcileContinue, err
}

var errs []error
addedClusters, deletedClusters, existingClusters := sets.New[string](), sets.New[string](), sets.New[string]()
for _, mw := range manifestWorks {
existingClusters.Insert(mw.Namespace)
}
for _, mw := range manifestWorks {
existingClusterNames.Insert(mw.Namespace)
rolloutClusterStatus, err := d.clusterRolloutStatusFunc(mw.Namespace, *mw)

for _, placement := range placements {
added, deleted, err := helpers.GetClusterChanges(d.placeDecisionLister, placement, existingClusters)
if err != nil {
errs = append(errs, err)
continue
}
existingRolloutClsStatus = append(existingRolloutClsStatus, rolloutClusterStatus)
}

placeTracker := helper.GetPlacementTracker(d.placeDecisionLister, placement, existingClusterNames)
rolloutHandler, err := clusterv1alpha1.NewRolloutHandler(placeTracker, d.clusterRolloutStatusFunc)
if err != nil {
apimeta.SetStatusCondition(&mwrSet.Status.Conditions, GetPlacementDecisionVerified(workapiv1alpha1.ReasonNotAsExpected, ""))

return mwrSet, reconcileContinue, utilerrors.NewAggregate(errs)
}

addedClusters = addedClusters.Union(added)
deletedClusters = deletedClusters.Union(deleted)
}

// Create manifestWork for added clusters
for cls := range addedClusters {
mw, err := CreateManifestWork(mwrSet, cls)
_, rolloutResult, err := rolloutHandler.GetRolloutCluster(placementRef.RolloutStrategy, existingRolloutClsStatus)

if err != nil {
errs = append(errs, err)
apimeta.SetStatusCondition(&mwrSet.Status.Conditions, GetPlacementDecisionVerified(workapiv1alpha1.ReasonNotAsExpected, ""))

//return mwrSet, reconcileContinue, utilerrors.NewAggregate(errs)
continue
}

_, err = d.workApplier.Apply(ctx, mw)
if err != nil {
errs = append(errs, err)
// Create ManifestWorks
for _, rolloutStatue := range rolloutResult.ClustersToRollout {
if rolloutStatue.Status == clusterv1alpha1.ToApply {
mw, err := CreateManifestWork(mwrSet, rolloutStatue.ClusterName, placementRef.Name)
if err != nil {
errs = append(errs, err)
continue
}

_, err = d.workApplier.Apply(ctx, mw)
if err != nil {
errs = append(errs, err)
}
}
}
}

// Update manifestWorks in case there are changes at ManifestWork or ManifestWorkReplicaSet
for cls := range existingClusters {
// Delete manifestWork for deleted clusters
if deletedClusters.Has(cls) {
err = d.workApplier.Delete(ctx, cls, mwrSet.Name)
for _, cls := range rolloutResult.ClustersRemoved {
// Delete manifestWork for removed clusters
err = d.workApplier.Delete(ctx, cls.ClusterName, mwrSet.Name)
if err != nil {
errs = append(errs, err)
continue
}
continue
}

mw, err := CreateManifestWork(mwrSet, cls)
if err != nil {
errs = append(errs, err)
continue
}

_, err = d.workApplier.Apply(ctx, mw)
if err != nil {
errs = append(errs, err)
existingClusterNames.Delete(cls.ClusterName)
}
count = count + len(rolloutResult.ClustersToRollout) + len(existingClusterNames)
}

// Set the Summary
if mwrSet.Status.Summary == (workapiv1alpha1.ManifestWorkReplicaSetSummary{}) {
mwrSet.Status.Summary = workapiv1alpha1.ManifestWorkReplicaSetSummary{}
}
total := len(existingClusters) - len(deletedClusters) + len(addedClusters)
//total := len(existingClusters) - len(deletedClusters) + len(addedClusters)
total := count
if total < 0 {
total = 0
}
Expand All @@ -127,6 +143,14 @@ func (d *deployReconciler) reconcile(ctx context.Context, mwrSet *workapiv1alpha
return mwrSet, reconcileContinue, utilerrors.NewAggregate(errs)
}

// TODO: Implement clusterRolloutStatusFunc logic.
func (d *deployReconciler) clusterRolloutStatusFunc(clusterName string, manifestWork workv1.ManifestWork) (clusterv1alpha1.ClusterRolloutStatus, error) {
manifestWork.Namespace = clusterName
clsRolloutStatus := clusterv1alpha1.ClusterRolloutStatus{}

return clsRolloutStatus, nil
}

// GetManifestworkApplied return only True status if there all clusters have manifests applied as expected
func GetManifestworkApplied(reason string, message string) metav1.Condition {
if reason == workapiv1alpha1.ReasonAsExpected {
Expand Down Expand Up @@ -156,7 +180,7 @@ func getCondition(conditionType string, reason string, message string, status me
}
}

func CreateManifestWork(mwrSet *workapiv1alpha1.ManifestWorkReplicaSet, clusterNS string) (*workv1.ManifestWork, error) {
func CreateManifestWork(mwrSet *workapiv1alpha1.ManifestWorkReplicaSet, clusterNS string, placementRefName string) (*workv1.ManifestWork, error) {
if clusterNS == "" {
return nil, fmt.Errorf("invalid cluster namespace")
}
Expand All @@ -165,7 +189,8 @@ func CreateManifestWork(mwrSet *workapiv1alpha1.ManifestWorkReplicaSet, clusterN
ObjectMeta: metav1.ObjectMeta{
Name: mwrSet.Name,
Namespace: clusterNS,
Labels: map[string]string{ManifestWorkReplicaSetControllerNameLabelKey: manifestWorkReplicaSetKey(mwrSet)},
Labels: map[string]string{ManifestWorkReplicaSetControllerNameLabelKey: manifestWorkReplicaSetKey(mwrSet),
ManifestWorkReplicaSetPlacementNameLabelKey: placementRefName},
},
Spec: mwrSet.Spec.ManifestWorkTemplate}, nil
}
11 changes: 10 additions & 1 deletion pkg/work/hub/test/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"k8s.io/apimachinery/pkg/runtime"

clusterv1beta1 "open-cluster-management.io/api/cluster/v1beta1"
clusterv1alpha1 "open-cluster-management.io/api/cluster/v1alpha1"
workapiv1 "open-cluster-management.io/api/work/v1"
workapiv1alpha1 "open-cluster-management.io/api/work/v1alpha1"

Expand All @@ -17,7 +18,15 @@ import (
func CreateTestManifestWorkReplicaSet(name string, ns string, placementName string) *workapiv1alpha1.ManifestWorkReplicaSet {
obj := spoketesting.NewUnstructured("v1", "kind", "test-ns", "test-name")
mw, _ := spoketesting.NewManifestWork(0, obj)
placementRef := workapiv1alpha1.LocalPlacementReference{Name: placementName}
placementRef := workapiv1alpha1.LocalPlacementReference{
Name: placementName,
RolloutStrategy: clusterv1alpha1.RolloutStrategy {
Type: clusterv1alpha1.All,
All: &clusterv1alpha1.RolloutAll{
Timeout: clusterv1alpha1.Timeout{Timeout: "None"},
},
},
}

mwrs := &workapiv1alpha1.ManifestWorkReplicaSet{
ObjectMeta: metav1.ObjectMeta{
Expand Down
2 changes: 1 addition & 1 deletion vendor/modules.txt
Original file line number Diff line number Diff line change
Expand Up @@ -1428,7 +1428,7 @@ open-cluster-management.io/addon-framework/pkg/index
open-cluster-management.io/addon-framework/pkg/manager/controllers/addonconfiguration
open-cluster-management.io/addon-framework/pkg/manager/controllers/addonowner
open-cluster-management.io/addon-framework/pkg/utils
# open-cluster-management.io/api v0.12.0
# open-cluster-management.io/api v0.12.1-0.20230925140632-bf4f47ea90d1
## explicit; go 1.19
open-cluster-management.io/api/addon/v1alpha1
open-cluster-management.io/api/client/addon/clientset/versioned
Expand Down
Loading

0 comments on commit 3de8620

Please sign in to comment.