Skip to content

Commit

Permalink
Add placement rule status controller to watch for placement status ch…
Browse files Browse the repository at this point in the history
…ange to generate placement decisions accordingly (#135)

Signed-off-by: Mike Ng <ming@redhat.com>
  • Loading branch information
mikeshng authored Mar 24, 2022
1 parent 530c23d commit f4955b1
Show file tree
Hide file tree
Showing 8 changed files with 166 additions and 73 deletions.
7 changes: 2 additions & 5 deletions pkg/controller/mcmhub/mcmhub_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,12 +295,9 @@ func (mapper *placementDecisionMapper) Map(obj client.Object) []reconcile.Reques
placementDecision := &clusterapi.PlacementDecision{}
err := mapper.Get(context.TODO(), types.NamespacedName{Name: obj.GetName(), Namespace: obj.GetNamespace()}, placementDecision)

if err != nil {
if err != nil && !k8serrors.IsNotFound(err) {
klog.Error("failed to get placementdecision error:", err)
continue
}

if len(placementDecision.Status.Decisions) == 0 {
continue
}

Expand Down Expand Up @@ -351,7 +348,7 @@ func add(mgr manager.Manager, r reconcile.Reconciler) error {
pdMapper := &placementDecisionMapper{mgr.GetClient()}
err = c.Watch(
&source.Kind{Type: &clusterapi.PlacementDecision{}},
handler.EnqueueRequestsFromMapFunc(pdMapper.Map))
handler.EnqueueRequestsFromMapFunc(pdMapper.Map), utils.PlacementDecisionPredicateFunctions)

if err != nil {
return err
Expand Down
2 changes: 1 addition & 1 deletion pkg/placementrule/controller/add_placementrule.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,5 @@ import (

func init() {
// AddToManagerFuncs is a list of functions to create controllers and add them to a manager.
AddToManagerFuncs = append(AddToManagerFuncs, placementrule.Add)
AddToManagerFuncs = append(AddToManagerFuncs, placementrule.Add, placementrule.AddStatusRec)
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ const (

// syncPlacementDecisions create/update/delete placementdecisions based on PlacementRule's status.decisions
// based on https://github.com/open-cluster-management-io/placement/blob/v0.2.0/pkg/controllers/scheduling/scheduling_controller.go#L339
func (r *ReconcilePlacementRule) syncPlacementDecisions(ctx context.Context,
func (r *ReconcilePlacementRuleStatus) syncPlacementDecisions(ctx context.Context,
placementRule placementruleapi.PlacementRule) error {
klog.Info("syncPlacementDecisions placementrule ", placementRule.Namespace, "/", placementRule.Name)

Expand Down Expand Up @@ -139,7 +139,7 @@ func (r *ReconcilePlacementRule) syncPlacementDecisions(ctx context.Context,

// createOrUpdatePlacementDecision updates or creates a new PlacementDecision if it does not exist
// based on https://github.com/open-cluster-management-io/placement/blob/v0.2.0/pkg/controllers/scheduling/scheduling_controller.go#L419
func (r *ReconcilePlacementRule) createOrUpdatePlacementDecision(
func (r *ReconcilePlacementRuleStatus) createOrUpdatePlacementDecision(
ctx context.Context,
placementRule placementruleapi.PlacementRule,
placementDecisionName string,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,11 +227,6 @@ func (r *ReconcilePlacementRule) Reconcile(ctx context.Context, request reconcil
klog.V(1).Info("Status update", request.NamespacedName, " with err:", err)
}

err = r.syncPlacementDecisions(ctx, *instance)
if err != nil {
klog.Warning("err:", err)
}

klog.V(1).Info("Reconciling - finished.", request.NamespacedName)

return reconcile.Result{}, nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

clusterapi "open-cluster-management.io/api/cluster/v1beta1"
appv1alpha1 "open-cluster-management.io/multicloud-operators-subscription/pkg/apis/apps/placementrule/v1"
)

Expand All @@ -41,10 +40,6 @@ var (
Name: prulename,
Namespace: prulens,
}
pdkey = types.NamespacedName{
Name: prulename + "-decision-1",
Namespace: prulens,
}
)

var expectedRequest = reconcile.Request{NamespacedName: prulekey}
Expand Down Expand Up @@ -170,15 +165,6 @@ func TestClusterNames(t *testing.T) {
if len(result.Status.Decisions) != 1 || result.Status.Decisions[0].ClusterName != clusters[0].Name {
t.Errorf("Failed to get cluster by name, placementrule: %v", result)
}

decision := &clusterapi.PlacementDecision{}

err = c.Get(ctx, pdkey, decision)
g.Expect(err).NotTo(gomega.HaveOccurred())

if len(decision.Status.Decisions) != 1 || decision.Status.Decisions[0].ClusterName != clusters[0].Name {
t.Errorf("Failed to get cluster by name, placementdecision: %v", decision)
}
}

func TestClusterLabels(t *testing.T) {
Expand Down Expand Up @@ -246,15 +232,6 @@ func TestClusterLabels(t *testing.T) {
if len(result.Status.Decisions) != 1 || result.Status.Decisions[0].ClusterName != clusters[1].Name {
t.Errorf("Failed to get cluster by label, placementrule: %v", result)
}

decision := &clusterapi.PlacementDecision{}

err = c.Get(ctx, pdkey, decision)
g.Expect(err).NotTo(gomega.HaveOccurred())

if len(decision.Status.Decisions) != 1 || decision.Status.Decisions[0].ClusterName != clusters[1].Name {
t.Errorf("Failed to get cluster by label, placementdecision: %v", result)
}
}

func TestAllClusters(t *testing.T) {
Expand Down Expand Up @@ -350,20 +327,6 @@ func TestAllClusters(t *testing.T) {
if result.Status.Decisions[0].ClusterName == "clusteralpha" {
t.Errorf("Failed to sort cluster properly, placementrule: %v", result)
}

decision := &clusterapi.PlacementDecision{}

err = c.Get(ctx, pdkey, decision)
g.Expect(err).NotTo(gomega.HaveOccurred())

if len(decision.Status.Decisions) != 2 {
t.Errorf("Failed to get all clusters, placementdecision: %v", result)
}

// expect order of first clusterbeta "8" then second clusteralpha "10500m" for asc cpu sort
if decision.Status.Decisions[0].ClusterName == "clusteralpha" {
t.Errorf("Failed to sort cluster properly, placementdecision: %v", result)
}
}

func TestClusterReplica(t *testing.T) {
Expand Down Expand Up @@ -423,15 +386,6 @@ func TestClusterReplica(t *testing.T) {
if len(result.Status.Decisions) != 1 {
t.Errorf("Failed to get 1 from all clusters, placementrule: %v", result)
}

decision := &clusterapi.PlacementDecision{}

err = c.Get(ctx, pdkey, decision)
g.Expect(err).NotTo(gomega.HaveOccurred())

if len(decision.Status.Decisions) != 1 {
t.Errorf("Failed to get 1 from all clusters, placementdecision: %v", result)
}
}

func TestClusterChange(t *testing.T) {
Expand All @@ -448,6 +402,9 @@ func TestClusterChange(t *testing.T) {
recFn, requests := SetupTestReconcile(newReconciler(mgr))
g.Expect(add(mgr, recFn)).NotTo(gomega.HaveOccurred())

recFn2, _ := SetupTestReconcile(genReconciler(mgr))
g.Expect(add(mgr, recFn2)).NotTo(gomega.HaveOccurred())

ctx, cancel := context.WithTimeout(context.TODO(), 5*time.Minute)
mgrStopped := StartTestManager(ctx, mgr, g)

Expand Down Expand Up @@ -486,15 +443,6 @@ func TestClusterChange(t *testing.T) {
t.Errorf("Failed to get all(1) clusters, placementrule: %v", result)
}

decision := &clusterapi.PlacementDecision{}

err = c.Get(ctx, pdkey, decision)
g.Expect(err).NotTo(gomega.HaveOccurred())

if len(decision.Status.Decisions) != 1 {
t.Errorf("Failed to get all(1) clusters, placementdecision: %v", result)
}

clinstance = clusters[1].DeepCopy()
err = c.Create(context.TODO(), clinstance)

Expand All @@ -513,10 +461,23 @@ func TestClusterChange(t *testing.T) {
t.Errorf("Failed to get all(2) clusters, placementrule: %v", result)
}

err = c.Get(ctx, pdkey, decision)
result.Spec.SchedulerName = "test-scheduler"
err = c.Update(context.TODO(), result)
g.Expect(err).NotTo(gomega.HaveOccurred())

if len(decision.Status.Decisions) != 2 {
t.Errorf("Failed to get all(2) clusters, placementdecision: %v", decision)
var prDecs = []appv1alpha1.PlacementDecision{}
prDecs = append(prDecs, result.Status.Decisions[0])
result.Status.Decisions = prDecs

err = c.Status().Update(context.TODO(), result)
g.Expect(err).NotTo(gomega.HaveOccurred())

time.Sleep(5 * time.Second)

err = c.Get(context.TODO(), prulekey, result)
g.Expect(err).NotTo(gomega.HaveOccurred())

if len(result.Status.Decisions) != 1 {
t.Errorf("Failed to get all(1) clusters, placementrule: %v", result)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
// Copyright 2022 The Kubernetes Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package placementrule

import (
"context"
"reflect"

plrv1 "open-cluster-management.io/multicloud-operators-subscription/pkg/apis/apps/placementrule/v1"

"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/klog/v2"

"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/source"
)

// PlacementRuleStatusPredicateFunctions filters PlacementRule status decisions update
var placementRuleStatusPredicateFunctions = predicate.Funcs{
UpdateFunc: func(e event.UpdateEvent) bool {
newPlr := e.ObjectNew.(*plrv1.PlacementRule)
oldPlr := e.ObjectOld.(*plrv1.PlacementRule)

return !reflect.DeepEqual(newPlr.Status.Decisions, oldPlr.Status.Decisions)
},
CreateFunc: func(e event.CreateEvent) bool {
return true
},

DeleteFunc: func(e event.DeleteEvent) bool {
return true
},
}

func AddStatusRec(mgr manager.Manager) error {
return addRec(mgr, genReconciler(mgr))
}

func genReconciler(mgr manager.Manager) reconcile.Reconciler {
authCfg := mgr.GetConfig()
authCfg.QPS = 100.0
authCfg.Burst = 200

return &ReconcilePlacementRuleStatus{Client: mgr.GetClient()}
}

func addRec(mgr manager.Manager, r reconcile.Reconciler) error {
c, err := controller.New("placementrule-status-controller", mgr, controller.Options{
Reconciler: r,
MaxConcurrentReconciles: 10,
})
if err != nil {
return err
}

// Watch for changes to PlacementRule Status
err = c.Watch(&source.Kind{Type: &plrv1.PlacementRule{}}, &handler.EnqueueRequestForObject{},
placementRuleStatusPredicateFunctions)
if err != nil {
return err
}

klog.Info("Successfully added placementrule-status-controller watching PlacementRule Status changes")

return nil
}

var _ reconcile.Reconciler = &ReconcilePlacementRuleStatus{}

type ReconcilePlacementRuleStatus struct {
client.Client
}

// Reconcile reads that state of the cluster for a PlacementRule object and makes changes based on the state read
// and what is in the PlacementRule.Status
// a Deployment as an example
// Automatically generate RBAC rules to allow the Controller to read and write Deployments
// +kubebuilder:rbac:groups=multicloud-apps.io,resources=placementrules,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=multicloud.io,resources=placementrules/status,verbs=get;update;patch
func (r *ReconcilePlacementRuleStatus) Reconcile(ctx context.Context, request reconcile.Request) (reconcile.Result, error) {
// Fetch the PlacementRule instance
instance := &plrv1.PlacementRule{}
err := r.Get(ctx, request.NamespacedName, instance)

klog.Info("Reconciling Status:", request.NamespacedName, " with Get err:", err)

if err != nil {
if errors.IsNotFound(err) {
// Object not found, return. Created objects are automatically garbage collected.
// For additional cleanup logic use finalizers.
return reconcile.Result{}, nil
}
// Error reading the object - requeue the request.
return reconcile.Result{}, err
}

err = r.syncPlacementDecisions(ctx, *instance)
if err != nil {
klog.Error("err:", err)
}

klog.V(1).Info("Reconciling Status - finished.", request.NamespacedName)

return reconcile.Result{}, err
}
17 changes: 17 additions & 0 deletions pkg/utils/subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,23 @@ const (
addonServiceAccountNamespace = "open-cluster-management-agent-addon"
)

// PlacementDecisionPredicateFunctions filters PlacementDecision status decisions update
var PlacementDecisionPredicateFunctions = predicate.Funcs{
UpdateFunc: func(e event.UpdateEvent) bool {
newPd := e.ObjectNew.(*clusterapi.PlacementDecision)
oldPd := e.ObjectOld.(*clusterapi.PlacementDecision)

return !reflect.DeepEqual(newPd.Status.Decisions, oldPd.Status.Decisions)
},
CreateFunc: func(e event.CreateEvent) bool {
return true
},

DeleteFunc: func(e event.DeleteEvent) bool {
return true
},
}

func IsSubscriptionResourceChanged(oSub, nSub *appv1.Subscription) bool {
if IsSubscriptionBasicChanged(oSub, nSub) {
return true
Expand Down
4 changes: 2 additions & 2 deletions sonar-project.properties
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
sonar.projectKey=open-cluster-management_multicloud-operators-subscription
sonar.projectName=multicloud-operators-subscription
sonar.sources=.
sonar.exclusions=**/*_test.go,**/*_generated*.go,**/*_generated/**,**/vendor/**,**/vbh/**,**/build/**,**/build-harness/**,**/build-harness-extensions/**
sonar.exclusions=**/*_test.go,**/*_generated*.go,**/*_generated/**,**/vendor/**,**/vbh/**,**/build/**,**/build-harness/**,**/build-harness-extensions/**,**/bindata/**
sonar.tests=.
sonar.test.inclusions=**/*_test.go
sonar.test.exclusions=**/*_generated*.go,**/*_generated/**,**/vendor/**,**/vbh/**,**/build/**,**/build-harness/**,**/build-harness-extensions/**,**/e2e/**
sonar.test.exclusions=**/*_generated*.go,**/*_generated/**,**/vendor/**,**/vbh/**,**/build/**,**/build-harness/**,**/build-harness-extensions/**,**/e2e/**,**/bindata/**
sonar.go.tests.reportPaths=report.json
sonar.go.coverage.reportPaths=coverage.out
sonar.externalIssuesReportPaths=gosec.json

0 comments on commit f4955b1

Please sign in to comment.