Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support pause and resume reconciliation of a cluster #7435

Open
wants to merge 28 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
6deb80e
wip
yipeng1030 May 27, 2024
a64cc39
support cluster pause and resume
yipeng1030 May 28, 2024
42d509e
merge from main
yipeng1030 May 28, 2024
94c6462
pause configuration as well
yipeng1030 May 28, 2024
4ed5dd5
Merge remote-tracking branch 'upstream/main'
yipeng1030 May 28, 2024
a89df33
Merge remote-tracking branch 'upstream/main' into featue/support_clus…
yipeng1030 May 31, 2024
25b85f6
merge from main and improve some implements
yipeng1030 Jun 4, 2024
e4e0740
tidy import order
yipeng1030 Jun 4, 2024
8466b1d
Merge remote-tracking branch 'upstream/main'
yipeng1030 Jun 18, 2024
46b47c8
Merge branch 'main' into featue/support_cluster_pause_and_resume
yipeng1030 Jun 18, 2024
1cc3982
merge from main
yipeng1030 Jun 18, 2024
a42ebcf
merge from main
yipeng1030 Jun 25, 2024
578967e
fix pr
yipeng1030 Jun 25, 2024
4aeae0f
fix pr
yipeng1030 Jun 25, 2024
5398f89
fix pr
yipeng1030 Jun 25, 2024
72f8e43
Merge branch 'featue/support_cluster_pause_and_resume' of github.com:…
yipeng1030 Jul 15, 2024
1a588d2
fix several reviews
yipeng1030 Jul 16, 2024
419c5ba
fix
yipeng1030 Jul 16, 2024
d730712
Merge remote-tracking branch 'upstream/main' into featue/support_clus…
yipeng1030 Jul 16, 2024
77908af
fix
yipeng1030 Jul 16, 2024
55b118d
fix
yipeng1030 Jul 16, 2024
ad1ac48
fix
yipeng1030 Jul 16, 2024
23b44dc
add ut
yipeng1030 Jul 17, 2024
59d0c45
fix
yipeng1030 Jul 17, 2024
cef893c
fix
yipeng1030 Jul 17, 2024
c5e8e34
pause back up as well
yipeng1030 Jul 18, 2024
5bc432a
fix typo
yipeng1030 Jul 19, 2024
f7b3b48
make code flatter
yipeng1030 Jul 22, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions controllers/apps/cluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,8 @@ func (r *ClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct
&clusterHaltTransformer{},
// handle cluster deletion
&clusterDeletionTransformer{},
// handle cluster pause and resume
&clusterPauseTransformer{},
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should the pause and resume operations be executed before all transformers? According to your design, can a cluster that is being deleted be paused?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Deletion has a higher priority than pause in my design, which is refer to the design of rollout pause of the k8s deployment.

// check is recovering from halted cluster
&clusterHaltRecoveryTransformer{},
// update finalizer and cd&cv labels
Expand Down
154 changes: 153 additions & 1 deletion controllers/apps/cluster_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"

"github.com/sethvargo/go-password/password"
"golang.org/x/exp/slices"
corev1 "k8s.io/api/core/v1"
Expand All @@ -36,9 +35,11 @@ import (

appsv1alpha1 "github.com/apecloud/kubeblocks/apis/apps/v1alpha1"
dpv1alpha1 "github.com/apecloud/kubeblocks/apis/dataprotection/v1alpha1"
"github.com/apecloud/kubeblocks/apis/workloads/v1alpha1"
"github.com/apecloud/kubeblocks/pkg/constant"
"github.com/apecloud/kubeblocks/pkg/controller/builder"
"github.com/apecloud/kubeblocks/pkg/controller/component"
"github.com/apecloud/kubeblocks/pkg/controller/model"
"github.com/apecloud/kubeblocks/pkg/controller/scheduling"
"github.com/apecloud/kubeblocks/pkg/generics"
testapps "github.com/apecloud/kubeblocks/pkg/testutil/apps"
Expand Down Expand Up @@ -1087,6 +1088,140 @@ var _ = Describe("Cluster Controller", func() {
deleteClusterWithBackup(appsv1alpha1.WipeOut, backupRetainPolicy)
}

testDeletePausedCluster := func(compName, compDefName string) {
By("creating and checking a cluster with multi component")
clusterObj = testapps.NewClusterFactory(testCtx.DefaultNamespace, clusterName, clusterDefObj.Name, "").
AddComponent(compName, compDefName).SetReplicas(1).
AddComponent(multiConsensusCompName, compDefName).SetReplicas(1).
WithRandomName().
Create(&testCtx).
GetObject()
clusterKey = client.ObjectKeyFromObject(clusterObj)

By("waiting for the cluster controller to create resources completely")
waitForCreatingResourceCompletely(clusterKey, compName, multiConsensusCompName)

By("pause the cluster")
Expect(testapps.GetAndChangeObj(&testCtx, clusterKey, func(cluster *appsv1alpha1.Cluster) {
SetPauseAnnotation(cluster)
})()).ShouldNot(HaveOccurred())

By("waiting for resources of cluster turn paused")
kindsToPause := []client.ObjectList{
&appsv1alpha1.ComponentList{},
&v1alpha1.InstanceSetList{},
&appsv1alpha1.ConfigurationList{},
}
Eventually(func() bool {
owningObjects, err := getOwningNamespacedObjects(ctx, testCtx.Cli, clusterObj.Namespace, getAppInstanceML(*clusterObj), kindsToPause)
if len(owningObjects) == 0 || err != nil {
return false
}
for _, obj := range owningObjects {
if !model.IsReconciliationPaused(obj) {
return false
}
}
return true
}).Should(Equal(true))

By("delete the cluster")
testapps.DeleteObject(&testCtx, clusterKey, &appsv1alpha1.Cluster{})

By("wait for the cluster to terminate")
Eventually(testapps.CheckObjExists(&testCtx, clusterKey, &appsv1alpha1.Cluster{}, false)).Should(Succeed())

By("check all other resources deleted")
transCtx := &clusterTransformContext{
Context: testCtx.Ctx,
Client: testCtx.Cli,
}
var namespacedKinds, clusteredKinds []client.ObjectList
kindsToDelete := append(namespacedKinds, clusteredKinds...)
otherObjs, err := getOwningNamespacedObjects(transCtx.Context, transCtx.Client, clusterObj.Namespace, getAppInstanceML(*clusterObj), kindsToDelete)
Expect(err).Should(Succeed())
Expect(otherObjs).Should(HaveLen(0))
}

testScaleInPausedCluster := func(compName, compDefName string) {
By("creating and checking a cluster with multi component")
clusterObj = testapps.NewClusterFactory(testCtx.DefaultNamespace, clusterName, clusterDefObj.Name, "").
AddComponent(compName, compDefName).SetReplicas(3).
WithRandomName().
Create(&testCtx).
GetObject()
clusterKey = client.ObjectKeyFromObject(clusterObj)

By("waiting for the cluster controller to create resources completely")
waitForCreatingResourceCompletely(clusterKey, compName)

By("pause the cluster")
Expect(testapps.GetAndChangeObj(&testCtx, clusterKey, func(cluster *appsv1alpha1.Cluster) {
SetPauseAnnotation(cluster)
})()).ShouldNot(HaveOccurred())

By("waiting for resources of cluster turn paused")
kindsToPause := []client.ObjectList{
&appsv1alpha1.ComponentList{},
&v1alpha1.InstanceSetList{},
&appsv1alpha1.ConfigurationList{},
}
instanceSetName := ""
Eventually(func() bool {
owningObjects, err := getOwningNamespacedObjects(ctx, testCtx.Cli, clusterObj.Namespace, getAppInstanceML(*clusterObj), kindsToPause)
if len(owningObjects) == 0 || err != nil {
return false
}
for _, obj := range owningObjects {
instanceSet, ok := obj.(*v1alpha1.InstanceSet)
if ok {
instanceSetName = instanceSet.Name
}
if !model.IsReconciliationPaused(obj) {
return false
}
}
return true
}).Should(Equal(true))

By("scale in")
Expect(testapps.GetAndChangeObj(&testCtx, clusterKey, func(cluster *appsv1alpha1.Cluster) {
cluster.Spec.ComponentSpecs[0].Replicas = 1
})()).ShouldNot(HaveOccurred())

By("InstanceSet is not changed after paused")
instanceSetNamespacedName := types.NamespacedName{
Namespace: clusterObj.Namespace,
Name: instanceSetName,
}
Eventually(testapps.CheckObj(&testCtx, instanceSetNamespacedName, func(g Gomega, instanceSet *v1alpha1.InstanceSet) {
g.Expect(int(*instanceSet.Spec.Replicas)).Should(Equal(1))
})).ShouldNot(Succeed())

By("resume the cluster")
Expect(testapps.GetAndChangeObj(&testCtx, clusterKey, func(cluster *appsv1alpha1.Cluster) {
RemovePauseAnnotation(cluster)
})()).ShouldNot(HaveOccurred())

By("waiting for resources of cluster turn resumed")
Eventually(func() bool {
owningObjects, err := getOwningNamespacedObjects(ctx, testCtx.Cli, clusterObj.Namespace, getAppInstanceML(*clusterObj), kindsToPause)
if len(owningObjects) == 0 || err != nil {
return false
}
for _, obj := range owningObjects {
if model.IsReconciliationPaused(obj) {
return false
}
}
return true
}).Should(Equal(true))

By("InstanceSet is changed after resumed")
Eventually(testapps.CheckObj(&testCtx, instanceSetNamespacedName, func(g Gomega, instanceSet *v1alpha1.InstanceSet) {
g.Expect(int(*instanceSet.Spec.Replicas)).Should(Equal(1))
})).Should(Succeed())
}
Context("cluster provisioning", func() {
BeforeEach(func() {
createAllWorkloadTypesClusterDef()
Expand Down Expand Up @@ -1559,6 +1694,23 @@ var _ = Describe("Cluster Controller", func() {
})).Should(Succeed())
})
})
Context("cluster pause and resume", func() {
BeforeEach(func() {
createAllWorkloadTypesClusterDef()
})

AfterEach(func() {
cleanEnv()
})

It("delete paused cluster", func() {
testDeletePausedCluster(consensusCompName, consensusCompDefName)
})

It("scale in component when cluster is paused and resumed", func() {
testScaleInPausedCluster(consensusCompName, consensusCompDefName)
})
})
})

func createBackupPolicyTpl(clusterDefObj *appsv1alpha1.ClusterDefinition, compDef string, mappingClusterVersions ...string) {
Expand Down
2 changes: 2 additions & 0 deletions controllers/apps/component_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,8 @@ func (r *ComponentReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
&componentMetaTransformer{},
// validate referenced componentDefinition objects, and build synthesized component
&componentLoadResourcesTransformer{},
// handle component pause and resume
&componentPauseTransformer{Client: r.Client},
// do validation for the spec & definition consistency
&componentValidationTransformer{},
// handle sidecar container
Expand Down
5 changes: 5 additions & 0 deletions controllers/apps/configuration/configuration_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"github.com/apecloud/kubeblocks/pkg/constant"
"github.com/apecloud/kubeblocks/pkg/controller/component"
configctrl "github.com/apecloud/kubeblocks/pkg/controller/configuration"
"github.com/apecloud/kubeblocks/pkg/controller/model"
"github.com/apecloud/kubeblocks/pkg/controller/multicluster"
intctrlutil "github.com/apecloud/kubeblocks/pkg/controllerutil"
viper "github.com/apecloud/kubeblocks/pkg/viperx"
Expand Down Expand Up @@ -113,6 +114,10 @@ func (r *ConfigurationReconciler) Reconcile(ctx context.Context, req ctrl.Reques
reqCtx.Log.Info("cluster is deleting, skip reconcile")
return intctrlutil.Reconciled()
}
if model.IsReconciliationPaused(config) {
reqCtx.Log.Info("cluster is paused, skip reconcile")
return intctrlutil.Reconciled()
}
if fetcherTask.ClusterComObj == nil || fetcherTask.ComponentObj == nil {
return r.failWithInvalidComponent(config, reqCtx)
}
Expand Down
6 changes: 6 additions & 0 deletions controllers/apps/configuration/reconfigure_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import (
"github.com/apecloud/kubeblocks/pkg/configuration/core"
"github.com/apecloud/kubeblocks/pkg/constant"
configctrl "github.com/apecloud/kubeblocks/pkg/controller/configuration"
"github.com/apecloud/kubeblocks/pkg/controller/model"
"github.com/apecloud/kubeblocks/pkg/controller/multicluster"
intctrlutil "github.com/apecloud/kubeblocks/pkg/controllerutil"
viper "github.com/apecloud/kubeblocks/pkg/viperx"
Expand Down Expand Up @@ -200,6 +201,11 @@ func (r *ReconfigureReconciler) sync(reqCtx intctrlutil.RequestCtx, configMap *c
return intctrlutil.RequeueWithErrorAndRecordEvent(configMap, r.Recorder, err, reqCtx.Log)
}

if model.IsReconciliationPaused(configMap) {
reqCtx.Log.Info(fmt.Sprintf("reconfigure is paused because cluster %s is paused", resources.clusterName))
return intctrlutil.Reconciled()
}

// Assumption: It is required that the cluster must have a component.
if reconcileContext.ClusterComObj == nil {
reqCtx.Log.Info("not found component.")
Expand Down
67 changes: 67 additions & 0 deletions controllers/apps/transform_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/discovery"
"k8s.io/client-go/tools/record"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand All @@ -38,6 +39,8 @@ import (

appsv1alpha1 "github.com/apecloud/kubeblocks/apis/apps/v1alpha1"
workloads "github.com/apecloud/kubeblocks/apis/workloads/v1alpha1"
"github.com/apecloud/kubeblocks/controllers/extensions"
cfgcore "github.com/apecloud/kubeblocks/pkg/configuration/core"
"github.com/apecloud/kubeblocks/pkg/constant"
"github.com/apecloud/kubeblocks/pkg/controller/graph"
"github.com/apecloud/kubeblocks/pkg/controller/model"
Expand Down Expand Up @@ -258,3 +261,67 @@ func isOwnedByInstanceSet(obj client.Object) bool {
}
return false
}

func SetPauseAnnotation(object client.Object) (client.Object, bool) {
if !model.IsReconciliationPaused(object) {
annotations := object.GetAnnotations()
if annotations == nil {
annotations = make(map[string]string)
}
if val, ok := annotations[extensions.ControllerPaused]; !ok || val != trueVal {
annotations[extensions.ControllerPaused] = trueVal
object.SetAnnotations(annotations)
return object, true
}
}
return object, false
}

func RemovePauseAnnotation(object client.Object) (client.Object, bool) {
if model.IsReconciliationPaused(object) {
annotations := object.GetAnnotations()
if _, ok := annotations[extensions.ControllerPaused]; ok {
delete(object.GetAnnotations(), extensions.ControllerPaused)
return object, true
}
}
return object, false
}

func getInstanceSet(transCtx *componentTransformContext) *workloads.InstanceSet {
instanceName := transCtx.Component.Name
instanceSet := &workloads.InstanceSet{}
err := transCtx.Client.Get(transCtx.Context, types.NamespacedName{Name: instanceName, Namespace: transCtx.Component.Namespace}, instanceSet)
if err != nil {
return nil
}
return instanceSet
}

func getConfiguration(transCtx *componentTransformContext) *appsv1alpha1.Configuration {
configuration := &appsv1alpha1.Configuration{}
configurationName := cfgcore.GenerateComponentConfigurationName(transCtx.SynthesizeComponent.ClusterName, transCtx.SynthesizeComponent.Name)
configurationNamespacedName := &types.NamespacedName{
Name: configurationName,
Namespace: transCtx.Component.Namespace,
}
if err := transCtx.Client.Get(transCtx.Context, *configurationNamespacedName, configuration); err != nil {
return nil
}
return configuration
}

func listConfigMaps(transCtx *componentTransformContext) *corev1.ConfigMapList {
cmList := &corev1.ConfigMapList{}
ml := constant.GetComponentWellKnownLabels(transCtx.Component.Labels[constant.AppInstanceLabelKey], transCtx.Component.Labels[constant.KBAppComponentLabelKey])

listOpts := []client.ListOption{
client.InNamespace(transCtx.Component.Namespace),
client.MatchingLabels(ml),
}
err := transCtx.Client.List(transCtx, cmList, listOpts...)
if err != nil {
return nil
}
return cmList
}
Loading
Loading