Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support pause and resume reconciliation of a cluster #7435

Open
wants to merge 28 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
6deb80e
wip
yipeng1030 May 27, 2024
a64cc39
support cluster pause and resume
yipeng1030 May 28, 2024
42d509e
merge from main
yipeng1030 May 28, 2024
94c6462
pause configuration as well
yipeng1030 May 28, 2024
4ed5dd5
Merge remote-tracking branch 'upstream/main'
yipeng1030 May 28, 2024
a89df33
Merge remote-tracking branch 'upstream/main' into featue/support_clus…
yipeng1030 May 31, 2024
25b85f6
merge from main and improve some implements
yipeng1030 Jun 4, 2024
e4e0740
tidy import order
yipeng1030 Jun 4, 2024
8466b1d
Merge remote-tracking branch 'upstream/main'
yipeng1030 Jun 18, 2024
46b47c8
Merge branch 'main' into featue/support_cluster_pause_and_resume
yipeng1030 Jun 18, 2024
1cc3982
merge from main
yipeng1030 Jun 18, 2024
a42ebcf
merge from main
yipeng1030 Jun 25, 2024
578967e
fix pr
yipeng1030 Jun 25, 2024
4aeae0f
fix pr
yipeng1030 Jun 25, 2024
5398f89
fix pr
yipeng1030 Jun 25, 2024
72f8e43
Merge branch 'featue/support_cluster_pause_and_resume' of github.com:…
yipeng1030 Jul 15, 2024
1a588d2
fix several reviews
yipeng1030 Jul 16, 2024
419c5ba
fix
yipeng1030 Jul 16, 2024
d730712
Merge remote-tracking branch 'upstream/main' into featue/support_clus…
yipeng1030 Jul 16, 2024
77908af
fix
yipeng1030 Jul 16, 2024
55b118d
fix
yipeng1030 Jul 16, 2024
ad1ac48
fix
yipeng1030 Jul 16, 2024
23b44dc
add ut
yipeng1030 Jul 17, 2024
59d0c45
fix
yipeng1030 Jul 17, 2024
cef893c
fix
yipeng1030 Jul 17, 2024
c5e8e34
pause back up as well
yipeng1030 Jul 18, 2024
5bc432a
fix typo
yipeng1030 Jul 19, 2024
f7b3b48
make code flatter
yipeng1030 Jul 22, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions controllers/apps/cluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,8 @@ func (r *ClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct
AddTransformer(
// handle cluster deletion first
&clusterDeletionTransformer{},
// handle cluster pause and resume
&clusterPauseTransformer{Client: r.Client},
// check is recovering from halted cluster
&clusterHaltRecoveryTransformer{},
// update finalizer and cd&cv labels
Expand Down
2 changes: 2 additions & 0 deletions controllers/apps/component_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,8 @@ func (r *ComponentReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
AddTransformer(
// handle component deletion and pre-terminate
&componentDeletionTransformer{},
// handle component pause and resume
&componentPauseTransformer{},
// handle finalizers and referenced definition labels
&componentMetaTransformer{},
// validate referenced componentDefinition objects, and build synthesized component
Expand Down
8 changes: 8 additions & 0 deletions controllers/apps/configuration/configuration_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/log"

appsv1alpha1 "github.com/apecloud/kubeblocks/apis/apps/v1alpha1"
"github.com/apecloud/kubeblocks/controllers/extensions"
"github.com/apecloud/kubeblocks/pkg/constant"
"github.com/apecloud/kubeblocks/pkg/controller/component"
configctrl "github.com/apecloud/kubeblocks/pkg/controller/configuration"
Expand Down Expand Up @@ -113,6 +114,13 @@ func (r *ConfigurationReconciler) Reconcile(ctx context.Context, req ctrl.Reques
reqCtx.Log.Info("cluster is deleting, skip reconcile")
return intctrlutil.Reconciled()
}
// if cluster is paused, pausing reconfigure as well
if fetcherTask.ClusterObj != nil {
yipeng1030 marked this conversation as resolved.
Show resolved Hide resolved
if val, ok := fetcherTask.ClusterObj.Annotations[extensions.ControllerPaused]; ok && val == "true" {
yipeng1030 marked this conversation as resolved.
Show resolved Hide resolved
reqCtx.Log.Info(fmt.Sprintf("cluster is paused, skip reconcile"))
return intctrlutil.Reconciled()
}
}
if fetcherTask.ClusterComObj == nil || fetcherTask.ComponentObj == nil {
return r.failWithInvalidComponent(config, reqCtx)
}
Expand Down
9 changes: 9 additions & 0 deletions controllers/apps/configuration/reconfigure_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/reconcile"

appsv1alpha1 "github.com/apecloud/kubeblocks/apis/apps/v1alpha1"
"github.com/apecloud/kubeblocks/controllers/extensions"
cfgcm "github.com/apecloud/kubeblocks/pkg/configuration/config_manager"
"github.com/apecloud/kubeblocks/pkg/configuration/core"
"github.com/apecloud/kubeblocks/pkg/constant"
Expand Down Expand Up @@ -200,6 +201,14 @@ func (r *ReconfigureReconciler) sync(reqCtx intctrlutil.RequestCtx, configMap *c
return intctrlutil.RequeueWithErrorAndRecordEvent(configMap, r.Recorder, err, reqCtx.Log)
}

// if cluster is paused, pausing reconfigure as well
if reconcileContext.ClusterObj != nil {
if val, ok := reconcileContext.ClusterObj.Annotations[extensions.ControllerPaused]; ok && val == "true" {
yipeng1030 marked this conversation as resolved.
Show resolved Hide resolved
reqCtx.Log.Info(fmt.Sprintf("reconfigure is paused beacuse cluster %s is paused", resources.clusterName))
return intctrlutil.Reconciled()
}
}

// Assumption: It is required that the cluster must have a component.
if reconcileContext.ClusterComObj == nil {
reqCtx.Log.Info("not found component.")
Expand Down
142 changes: 142 additions & 0 deletions controllers/apps/transformer_cluster_pause.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
/*
Copyright (C) 2022-2024 ApeCloud Co., Ltd

This file is part of KubeBlocks project

This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.

This program is distributed in the hope that it will be useful
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.

You should have received a copy of the GNU Affero General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/

package apps

import (
appsv1alpha1 "github.com/apecloud/kubeblocks/apis/apps/v1alpha1"
"github.com/apecloud/kubeblocks/controllers/extensions"
"github.com/apecloud/kubeblocks/pkg/constant"
"github.com/apecloud/kubeblocks/pkg/controller/graph"
"github.com/apecloud/kubeblocks/pkg/controller/model"
corev1 "k8s.io/api/core/v1"
"sigs.k8s.io/controller-runtime/pkg/client"
yipeng1030 marked this conversation as resolved.
Show resolved Hide resolved
)

// clusterPauseTransformer handles cluster pause and resume
type clusterPauseTransformer struct {
client.Client
}

var _ graph.Transformer = &clusterPauseTransformer{}

func (t *clusterPauseTransformer) Transform(ctx graph.TransformContext, dag *graph.DAG) error {
yipeng1030 marked this conversation as resolved.
Show resolved Hide resolved
transCtx, _ := ctx.(*clusterTransformContext)
cluster := transCtx.OrigCluster
graphCli, _ := transCtx.Client.(model.GraphClient)
if checkPaused(cluster) {
// set paused for all components
compList := &appsv1alpha1.ComponentList{}
yipeng1030 marked this conversation as resolved.
Show resolved Hide resolved
labels := constant.GetClusterWellKnownLabels(cluster.Name)
if err := transCtx.Client.List(transCtx.Context, compList, client.InNamespace(cluster.Namespace), client.MatchingLabels(labels)); err != nil {
return err
}
notPaused := false
yipeng1030 marked this conversation as resolved.
Show resolved Hide resolved
for _, comp := range compList.Items {
newComp := comp.DeepCopy()
yipeng1030 marked this conversation as resolved.
Show resolved Hide resolved
if !checkPaused(newComp) {
annotations := comp.GetAnnotations()
if annotations == nil {
annotations = make(map[string]string)
}
annotations[extensions.ControllerPaused] = "true"
newComp.SetAnnotations(annotations)
graphCli.Update(dag, comp.DeepCopy(), newComp)
notPaused = true
}

}
if notPaused {
transCtx.EventRecorder.Eventf(cluster, corev1.EventTypeNormal, "Paused",
"cluster is paused")
}
return graph.ErrPrematureStop

} else {
// set resumed for all components
compList := &appsv1alpha1.ComponentList{}
labels := constant.GetClusterWellKnownLabels(cluster.Name)
if err := transCtx.Client.List(transCtx.Context, compList, client.InNamespace(cluster.Namespace), client.MatchingLabels(labels)); err != nil {
return err
}
hasPaused := false
yipeng1030 marked this conversation as resolved.
Show resolved Hide resolved

// remove resumed annotation of configmaps
var cmList, err = t.listConfigMaps(transCtx, cluster)
yipeng1030 marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return err
}
for _, cm := range cmList.Items {
if val, ok := cm.GetAnnotations()[extensions.ControllerResumed]; ok && val == "true" {
yipeng1030 marked this conversation as resolved.
Show resolved Hide resolved
newCm := cm.DeepCopy()
delete(newCm.Annotations, extensions.ControllerResumed)
graphCli.Update(dag, cm.DeepCopy(), newCm)
}
}

for _, comp := range compList.Items {
newComp := comp.DeepCopy()
if checkPaused(newComp) {
delete(newComp.Annotations, extensions.ControllerPaused)
hasPaused = true
graphCli.Update(dag, comp.DeepCopy(), newComp)
}
}

if hasPaused {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hasPaused is calculated based on component objects, so it should set the dependencies for components to those CM objects explicitly.

// add annotations to all configmaps for reconfiguring after resumed
var cmList, err = t.listConfigMaps(transCtx, cluster)
if err != nil {
return err
}
for _, cm := range cmList.Items {
newCm := cm.DeepCopy()
annotations := cm.GetAnnotations()
if annotations == nil {
annotations = make(map[string]string)
}
annotations[extensions.ControllerResumed] = "true"
yipeng1030 marked this conversation as resolved.
Show resolved Hide resolved
newCm.SetAnnotations(annotations)
graphCli.Update(dag, cm.DeepCopy(), newCm)
}

// add event
transCtx.EventRecorder.Eventf(cluster, corev1.EventTypeNormal, "Resumed",
"cluster is resumed")
}
return nil
}
}

func (t *clusterPauseTransformer) listConfigMaps(transCtx *clusterTransformContext, cluster *appsv1alpha1.Cluster) (*corev1.ConfigMapList, error) {
cmList := &corev1.ConfigMapList{}
ml := map[string]string{
constant.AppManagedByLabelKey: constant.AppName,
constant.AppInstanceLabelKey: cluster.Name,
}
listOpts := []client.ListOption{
client.InNamespace(cluster.Namespace),
client.MatchingLabels(ml),
}
err := t.Client.List(transCtx, cmList, listOpts...)
if err != nil {
return nil, err
}
return cmList, nil
}
80 changes: 80 additions & 0 deletions controllers/apps/transformer_component_pause.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
Copyright (C) 2022-2024 ApeCloud Co., Ltd

This file is part of KubeBlocks project

This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.

This program is distributed in the hope that it will be useful
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.

You should have received a copy of the GNU Affero General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/

package apps

import (
"fmt"
appsv1alpha1 "github.com/apecloud/kubeblocks/apis/apps/v1alpha1"
workloads "github.com/apecloud/kubeblocks/apis/workloads/v1alpha1"
"github.com/apecloud/kubeblocks/pkg/controller/graph"
"github.com/apecloud/kubeblocks/pkg/controller/model"
"github.com/pkg/errors"
"k8s.io/apimachinery/pkg/types"
)

// componentDeletionTransformer handles component deletion
yipeng1030 marked this conversation as resolved.
Show resolved Hide resolved
type componentPauseTransformer struct {
}

var _ graph.Transformer = &componentDeletionTransformer{}
yipeng1030 marked this conversation as resolved.
Show resolved Hide resolved

func (t *componentPauseTransformer) Transform(ctx graph.TransformContext, dag *graph.DAG) error {
transCtx, _ := ctx.(*componentTransformContext)

graphCli, _ := transCtx.Client.(model.GraphClient)
comp := transCtx.Component
// if paused
if checkPaused(comp) {
// get instanceSet and set paused
oldInstanceSet, err := t.getInstanceSet(transCtx, comp)
newInstanceSet := oldInstanceSet.DeepCopy()
if err != nil {
return err
}
newInstanceSet.Spec.Paused = true
yipeng1030 marked this conversation as resolved.
Show resolved Hide resolved
// update in dag
graphCli.Update(dag, oldInstanceSet, newInstanceSet)
return graph.ErrPrematureStop
} else {
yipeng1030 marked this conversation as resolved.
Show resolved Hide resolved
// get instanceSet and cancel paused
oldInstanceSet, err := t.getInstanceSet(transCtx, comp)
if model.IsReconciliationPaused(oldInstanceSet) {
newInstanceSet := oldInstanceSet.DeepCopy()
if err != nil {
return err
}
newInstanceSet.Spec.Paused = false
yipeng1030 marked this conversation as resolved.
Show resolved Hide resolved
// update in dag
graphCli.Update(dag, oldInstanceSet, newInstanceSet)
return nil
}
return nil
yipeng1030 marked this conversation as resolved.
Show resolved Hide resolved
}
}

func (t *componentPauseTransformer) getInstanceSet(transCtx *componentTransformContext, comp *appsv1alpha1.Component) (*workloads.InstanceSet, error) {
instanceName := comp.Name
instanceSet := &workloads.InstanceSet{}
err := transCtx.Client.Get(transCtx.Context, types.NamespacedName{Name: instanceName, Namespace: comp.Namespace}, instanceSet)
if err != nil {
return nil, errors.New(fmt.Sprintf("failed to get instanceSet %s: %v", instanceName, err))
}
return instanceSet, nil
}
12 changes: 12 additions & 0 deletions controllers/apps/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (

"sigs.k8s.io/controller-runtime/pkg/client"

"github.com/apecloud/kubeblocks/controllers/extensions"
"github.com/apecloud/kubeblocks/pkg/constant"
"github.com/apecloud/kubeblocks/pkg/controller/model"
"github.com/apecloud/kubeblocks/pkg/controller/multicluster"
Expand Down Expand Up @@ -84,3 +85,14 @@ func clientOption(v *model.ObjectVertex) *multicluster.ClientOption {
}
return multicluster.InControlContext()
}

func checkPaused(object client.Object) bool {
yipeng1030 marked this conversation as resolved.
Show resolved Hide resolved
annotations := object.GetAnnotations()
if annotations == nil {
return false
}
if val, ok := annotations[extensions.ControllerPaused]; ok && val == "true" {
return true
}
return false
}
1 change: 1 addition & 0 deletions controllers/extensions/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ const (

// annotation keys
ControllerPaused = "controller.kubeblocks.io/controller-paused"
ControllerResumed = "controller.kubeblocks.io/controller-resumed"
yipeng1030 marked this conversation as resolved.
Show resolved Hide resolved
SkipInstallableCheck = "extensions.kubeblocks.io/skip-installable-check"
NoDeleteJobs = "extensions.kubeblocks.io/no-delete-jobs"
AddonDefaultIsEmpty = "addons.extensions.kubeblocks.io/default-is-empty"
Expand Down
4 changes: 1 addition & 3 deletions pkg/controller/instanceset/reconciler_deletion.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,7 @@ func (r *deletionReconciler) PreCondition(tree *kubebuilderx.ObjectTree) *kubebu
if tree.GetRoot() == nil || !model.IsObjectDeleting(tree.GetRoot()) {
return kubebuilderx.ResultUnsatisfied
}
if model.IsReconciliationPaused(tree.GetRoot()) {
yipeng1030 marked this conversation as resolved.
Show resolved Hide resolved
return kubebuilderx.ResultUnsatisfied
}
// deletion need to be handled while paused
return kubebuilderx.ResultSatisfied
}

Expand Down
Loading