diff --git a/apis/apps/v1alpha1/opsrequest_types.go b/apis/apps/v1alpha1/opsrequest_types.go index fb04c283e08..97298c98e59 100644 --- a/apis/apps/v1alpha1/opsrequest_types.go +++ b/apis/apps/v1alpha1/opsrequest_types.go @@ -293,7 +293,7 @@ type Instance struct { // +kubebuilder:validation:Required Name string `json:"name"` - // The instance will rebuild on the specified node when the instance uses local PersistentVolume as the storage disk. + // The instance will rebuild on the specified node. // If not set, it will rebuild on a random node. // +optional TargetNodeName string `json:"targetNodeName,omitempty"` diff --git a/config/crd/bases/apps.kubeblocks.io_opsrequests.yaml b/config/crd/bases/apps.kubeblocks.io_opsrequests.yaml index 19ade2dca1e..e1dc71b8afa 100644 --- a/config/crd/bases/apps.kubeblocks.io_opsrequests.yaml +++ b/config/crd/bases/apps.kubeblocks.io_opsrequests.yaml @@ -4020,7 +4020,7 @@ spec: type: string targetNodeName: description: |- - The instance will rebuild on the specified node when the instance uses local PersistentVolume as the storage disk. + The instance will rebuild on the specified node. If not set, it will rebuild on a random node. type: string required: diff --git a/controllers/apps/operations/rebuild_instance.go b/controllers/apps/operations/rebuild_instance.go index 8f462bfdbe4..e5f1b4c4887 100644 --- a/controllers/apps/operations/rebuild_instance.go +++ b/controllers/apps/operations/rebuild_instance.go @@ -28,12 +28,14 @@ import ( "golang.org/x/exp/slices" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" "k8s.io/utils/pointer" "sigs.k8s.io/controller-runtime/pkg/client" appsv1 "github.com/apecloud/kubeblocks/apis/apps/v1" appsv1alpha1 "github.com/apecloud/kubeblocks/apis/apps/v1alpha1" dpv1alpha1 "github.com/apecloud/kubeblocks/apis/dataprotection/v1alpha1" + workloads "github.com/apecloud/kubeblocks/apis/workloads/v1" "github.com/apecloud/kubeblocks/pkg/constant" "github.com/apecloud/kubeblocks/pkg/controller/component" "github.com/apecloud/kubeblocks/pkg/controller/instanceset" @@ -297,8 +299,8 @@ func (r rebuildInstanceOpsHandler) rebuildInstancesWithHScaling(reqCtx intctrlut ) if len(compStatus.ProgressDetails) == 0 { // 1. scale out the required instances - r.scaleOutRequiredInstances(opsRes, rebuildInstance, compStatus) - return 0, 0, nil + err := r.scaleOutRequiredInstances(reqCtx, cli, opsRes, rebuildInstance, compStatus) + return 0, 0, err } for i := range opsRes.Cluster.Spec.ComponentSpecs { compSpec := &opsRes.Cluster.Spec.ComponentSpecs[i] @@ -321,9 +323,11 @@ func (r rebuildInstanceOpsHandler) rebuildInstancesWithHScaling(reqCtx intctrlut return completedCount, failedCount, nil } -func (r rebuildInstanceOpsHandler) scaleOutRequiredInstances(opsRes *OpsResource, +func (r rebuildInstanceOpsHandler) scaleOutRequiredInstances(reqCtx intctrlutil.RequestCtx, + cli client.Client, + opsRes *OpsResource, rebuildInstance appsv1alpha1.RebuildInstance, - compStatus *appsv1alpha1.OpsRequestComponentStatus) { + compStatus *appsv1alpha1.OpsRequestComponentStatus) error { // 1. sort the instances slices.SortFunc(rebuildInstance.Instances, func(a, b appsv1alpha1.Instance) bool { return a.Name < b.Name @@ -345,9 +349,9 @@ func (r rebuildInstanceOpsHandler) scaleOutRequiredInstances(opsRes *OpsResource opsRes.Recorder.Eventf(opsRes.OpsRequest, corev1.EventTypeWarning, reasonCompReplicasChanged, "then replicas of the component %s has been changed", compName) continue } - r.scaleOutCompReplicasAndSyncProgress(opsRes, compSpec, rebuildInstance, compStatus, rebuildInsWrapper) - break + return r.scaleOutCompReplicasAndSyncProgress(reqCtx, cli, opsRes, compSpec, rebuildInstance, compStatus, rebuildInsWrapper) } + return nil } // getRebuildInstanceWrapper assembles the corresponding replicas and instances based on the template @@ -365,11 +369,13 @@ func (r rebuildInstanceOpsHandler) getRebuildInstanceWrapper(opsRes *OpsResource return rebuildInsWrapper } -func (r rebuildInstanceOpsHandler) scaleOutCompReplicasAndSyncProgress(opsRes *OpsResource, +func (r rebuildInstanceOpsHandler) scaleOutCompReplicasAndSyncProgress(reqCtx intctrlutil.RequestCtx, + cli client.Client, + opsRes *OpsResource, compSpec *appsv1.ClusterComponentSpec, rebuildInstance appsv1alpha1.RebuildInstance, compStatus *appsv1alpha1.OpsRequestComponentStatus, - rebuildInsWrapper map[string]*rebuildInstanceWrapper) { + rebuildInsWrapper map[string]*rebuildInstanceWrapper) error { scaleOutInsMap := map[string]string{} setScaleOutInsMap := func(workloadName, templateName string, replicas int32, offlineInstances []string, wrapper *rebuildInstanceWrapper) { @@ -393,8 +399,14 @@ func (r rebuildInstanceOpsHandler) scaleOutCompReplicasAndSyncProgress(opsRes *O if wrapper, ok := rebuildInsWrapper[""]; ok { setScaleOutInsMap(workloadName, "", compSpec.Replicas-allTemplateReplicas, compSpec.OfflineInstances, wrapper) } - // set progress details + + its := &workloads.InstanceSet{} + if err := cli.Get(reqCtx.Ctx, types.NamespacedName{Name: workloadName, Namespace: opsRes.OpsRequest.Namespace}, its); err != nil { + return err + } + itsUpdated := false for _, ins := range rebuildInstance.Instances { + // set progress details scaleOutInsName := scaleOutInsMap[ins.Name] setComponentStatusProgressDetail(opsRes.Recorder, opsRes.OpsRequest, &compStatus.ProgressDetails, appsv1alpha1.ProgressStatusDetail{ @@ -402,7 +414,22 @@ func (r rebuildInstanceOpsHandler) scaleOutCompReplicasAndSyncProgress(opsRes *O Status: appsv1alpha1.ProcessingProgressStatus, Message: r.buildScalingOutPodMessage(scaleOutInsName, "Processing"), }) + + // specify node to scale out + if ins.TargetNodeName != "" { + if err := instanceset.MergeNodeSelectorOnceAnnotation(its, map[string]string{scaleOutInsName: ins.TargetNodeName}); err != nil { + return err + } + itsUpdated = true + } } + + if itsUpdated { + if err := cli.Update(reqCtx.Ctx, its); err != nil { + return err + } + } + return nil } // checkProgressForScalingOutPods checks if the new pods are available. diff --git a/controllers/apps/operations/rebuild_instance_inplace.go b/controllers/apps/operations/rebuild_instance_inplace.go index fd71bc072e1..404240d715a 100644 --- a/controllers/apps/operations/rebuild_instance_inplace.go +++ b/controllers/apps/operations/rebuild_instance_inplace.go @@ -31,11 +31,13 @@ import ( appsv1alpha1 "github.com/apecloud/kubeblocks/apis/apps/v1alpha1" dpv1alpha1 "github.com/apecloud/kubeblocks/apis/dataprotection/v1alpha1" + workloads "github.com/apecloud/kubeblocks/apis/workloads/v1" "github.com/apecloud/kubeblocks/pkg/common" "github.com/apecloud/kubeblocks/pkg/constant" "github.com/apecloud/kubeblocks/pkg/controller/builder" "github.com/apecloud/kubeblocks/pkg/controller/component" "github.com/apecloud/kubeblocks/pkg/controller/factory" + "github.com/apecloud/kubeblocks/pkg/controller/instanceset" intctrlutil "github.com/apecloud/kubeblocks/pkg/controllerutil" dputils "github.com/apecloud/kubeblocks/pkg/dataprotection/utils" viper "github.com/apecloud/kubeblocks/pkg/viperx" @@ -357,7 +359,7 @@ func (inPlaceHelper *inplaceRebuildHelper) rebuildSourcePVCsAndRecreateInstance( } // set volumeName to tmp pvc, it will be used when recreating the source pvc. tmpPVC.Spec.VolumeName = pv.Name - // 5. recreate the source pbc. + // 5. recreate the source pvc. if err = inPlaceHelper.recreateSourcePVC(reqCtx, cli, tmpPVC, sourcePVCName, opsRequest.Name); err != nil { return err } @@ -368,6 +370,23 @@ func (inPlaceHelper *inplaceRebuildHelper) rebuildSourcePVCsAndRecreateInstance( if opsRequest.Spec.Force { options = append(options, client.GracePeriodSeconds(0)) } + + if inPlaceHelper.instance.TargetNodeName != "" { + // under the circumstance of using cloud disks, need to set node selector again to make sure pod + // goes to the specified node + its := &workloads.InstanceSet{} + itsName := constant.GenerateWorkloadNamePattern(inPlaceHelper.synthesizedComp.ClusterName, inPlaceHelper.synthesizedComp.Name) + if err := cli.Get(reqCtx.Ctx, types.NamespacedName{Name: itsName, Namespace: inPlaceHelper.synthesizedComp.Namespace}, its); err != nil { + return err + } + if err := instanceset.MergeNodeSelectorOnceAnnotation(its, map[string]string{inPlaceHelper.targetPod.Name: inPlaceHelper.instance.TargetNodeName}); err != nil { + return err + } + if err := cli.Update(reqCtx.Ctx, its); err != nil { + return err + } + } + return intctrlutil.BackgroundDeleteObject(cli, reqCtx.Ctx, inPlaceHelper.targetPod, options...) } diff --git a/controllers/apps/operations/rebuild_instance_test.go b/controllers/apps/operations/rebuild_instance_test.go index fec78ad841a..84eae959a09 100644 --- a/controllers/apps/operations/rebuild_instance_test.go +++ b/controllers/apps/operations/rebuild_instance_test.go @@ -33,9 +33,11 @@ import ( appsv1 "github.com/apecloud/kubeblocks/apis/apps/v1" appsv1alpha1 "github.com/apecloud/kubeblocks/apis/apps/v1alpha1" dpv1alpha1 "github.com/apecloud/kubeblocks/apis/dataprotection/v1alpha1" + workloads "github.com/apecloud/kubeblocks/apis/workloads/v1" "github.com/apecloud/kubeblocks/pkg/common" "github.com/apecloud/kubeblocks/pkg/constant" "github.com/apecloud/kubeblocks/pkg/controller/component" + "github.com/apecloud/kubeblocks/pkg/controller/instanceset" intctrlutil "github.com/apecloud/kubeblocks/pkg/controllerutil" dptypes "github.com/apecloud/kubeblocks/pkg/dataprotection/types" "github.com/apecloud/kubeblocks/pkg/generics" @@ -50,6 +52,7 @@ var _ = Describe("OpsUtil functions", func() { randomStr = testCtx.GetRandomStr() compDefName = "test-compdef-" + randomStr clusterName = "test-cluster-" + randomStr + targetNodeName = "test-node-1" rebuildInstanceCount = 2 ) @@ -70,6 +73,7 @@ var _ = Describe("OpsUtil functions", func() { testapps.ClearResources(&testCtx, generics.OpsRequestSignature, inNS, ml) testapps.ClearResources(&testCtx, generics.BackupSignature, inNS, ml) testapps.ClearResources(&testCtx, generics.RestoreSignature, inNS, ml) + testapps.ClearResources(&testCtx, generics.InstanceSetSignature, inNS, ml) // default GracePeriod is 30s testapps.ClearResources(&testCtx, generics.PodSignature, inNS, ml, client.GracePeriodSeconds(0)) testapps.ClearResourcesWithRemoveFinalizerOption(&testCtx, generics.PersistentVolumeClaimSignature, true, inNS, ml) @@ -90,7 +94,8 @@ var _ = Describe("OpsUtil functions", func() { var instances []appsv1alpha1.Instance for _, insName := range instanceNames { instances = append(instances, appsv1alpha1.Instance{ - Name: insName, + Name: insName, + TargetNodeName: targetNodeName, }) } ops.Spec.RebuildFrom = []appsv1alpha1.RebuildInstance{ @@ -258,6 +263,7 @@ var _ = Describe("OpsUtil functions", func() { It("test rebuild instance with no backup", func() { By("init operations resources ") opsRes := prepareOpsRes("", true) + its := testapps.MockInstanceSetComponent(&testCtx, clusterName, defaultCompName) opsRes.OpsRequest.Status.Phase = appsv1alpha1.OpsRunningPhase reqCtx := intctrlutil.RequestCtx{Ctx: testCtx.Ctx} @@ -299,8 +305,18 @@ var _ = Describe("OpsUtil functions", func() { })) By("expect to clean up the tmp pods") - _, _ = GetOpsManager().Reconcile(reqCtx, k8sClient, opsRes) + _, err := GetOpsManager().Reconcile(reqCtx, k8sClient, opsRes) + Expect(err).NotTo(HaveOccurred()) Eventually(testapps.List(&testCtx, generics.PodSignature, matchingLabels, client.InNamespace(opsRes.OpsRequest.Namespace))).Should(HaveLen(0)) + + By("check its' schedule once annotation") + podPrefix := constant.GenerateWorkloadNamePattern(clusterName, defaultCompName) + Eventually(testapps.CheckObj(&testCtx, client.ObjectKeyFromObject(its), func(g Gomega, its *workloads.InstanceSet) { + mapping, err := instanceset.ParseNodeSelectorOnceAnnotation(its) + g.Expect(err).NotTo(HaveOccurred()) + g.Expect(mapping).To(HaveKeyWithValue(podPrefix+"-0", targetNodeName)) + g.Expect(mapping).To(HaveKeyWithValue(podPrefix+"-1", targetNodeName)) + })).Should(Succeed()) }) testRebuildInstanceWithBackup := func(ignoreRoleCheck bool) { @@ -326,6 +342,7 @@ var _ = Describe("OpsUtil functions", func() { } })).Should(Succeed()) opsRes := prepareOpsRes(backup.Name, true) + _ = testapps.MockInstanceSetComponent(&testCtx, clusterName, defaultCompName) if ignoreRoleCheck { Expect(testapps.ChangeObj(&testCtx, opsRes.OpsRequest, func(request *appsv1alpha1.OpsRequest) { if request.Annotations == nil { @@ -435,8 +452,16 @@ var _ = Describe("OpsUtil functions", func() { _, _ = GetOpsManager().Reconcile(reqCtx, k8sClient, opsRes) Expect(opsRes.Cluster.Spec.GetComponentByName(defaultCompName).Replicas).Should(BeEquivalentTo(5)) - By("mock the new pods to available") + By("its have expected nodeSelector") podPrefix := constant.GenerateWorkloadNamePattern(clusterName, defaultCompName) + Eventually(testapps.CheckObj(&testCtx, client.ObjectKeyFromObject(its), func(g Gomega, its *workloads.InstanceSet) { + mapping, err := instanceset.ParseNodeSelectorOnceAnnotation(its) + g.Expect(err).NotTo(HaveOccurred()) + g.Expect(mapping).To(HaveKeyWithValue(podPrefix+"-3", targetNodeName)) + g.Expect(mapping).To(HaveKeyWithValue(podPrefix+"-4", targetNodeName)) + })).Should(Succeed()) + + By("mock the new pods to available") testapps.MockInstanceSetPod(&testCtx, nil, clusterName, defaultCompName, podPrefix+"-3", "follower", "Readonly") testapps.MockInstanceSetPod(&testCtx, nil, clusterName, defaultCompName, podPrefix+"-4", "follower", "Readonly") diff --git a/deploy/helm/crds/apps.kubeblocks.io_opsrequests.yaml b/deploy/helm/crds/apps.kubeblocks.io_opsrequests.yaml index 19ade2dca1e..e1dc71b8afa 100644 --- a/deploy/helm/crds/apps.kubeblocks.io_opsrequests.yaml +++ b/deploy/helm/crds/apps.kubeblocks.io_opsrequests.yaml @@ -4020,7 +4020,7 @@ spec: type: string targetNodeName: description: |- - The instance will rebuild on the specified node when the instance uses local PersistentVolume as the storage disk. + The instance will rebuild on the specified node. If not set, it will rebuild on a random node. type: string required: diff --git a/docs/developer_docs/api-reference/cluster.md b/docs/developer_docs/api-reference/cluster.md index 35b2d5306c1..80e00b8d380 100644 --- a/docs/developer_docs/api-reference/cluster.md +++ b/docs/developer_docs/api-reference/cluster.md @@ -22677,7 +22677,7 @@ string (Optional) -

The instance will rebuild on the specified node when the instance uses local PersistentVolume as the storage disk. +

The instance will rebuild on the specified node. If not set, it will rebuild on a random node.

diff --git a/pkg/constant/annotations.go b/pkg/constant/annotations.go index f80ed431312..eaf30423941 100644 --- a/pkg/constant/annotations.go +++ b/pkg/constant/annotations.go @@ -49,6 +49,9 @@ const ( // SkipImmutableCheckAnnotationKey specifies to skip the mutation check for the object. // The mutation check is only applied to the fields that are declared as immutable. SkipImmutableCheckAnnotationKey = "apps.kubeblocks.io/skip-immutable-check" + + // NodeSelectorOnceAnnotationKey adds nodeSelector in podSpec for one pod exactly once + NodeSelectorOnceAnnotationKey = "workloads.kubeblocks.io/node-selector-once" ) // annotations for multi-cluster diff --git a/pkg/controller/instanceset/instance_util.go b/pkg/controller/instanceset/instance_util.go index 7d9163465c8..c0861ff99ec 100644 --- a/pkg/controller/instanceset/instance_util.go +++ b/pkg/controller/instanceset/instance_util.go @@ -369,6 +369,58 @@ func ConvertOrdinalsToSortedList(ordinals workloads.Ordinals) ([]int32, error) { return sortedOrdinalList, nil } +// ParseNodeSelectorOnceAnnotation will return a non-nil map +func ParseNodeSelectorOnceAnnotation(its *workloads.InstanceSet) (map[string]string, error) { + podToNodeMapping := make(map[string]string) + data, ok := its.Annotations[constant.NodeSelectorOnceAnnotationKey] + if !ok { + return podToNodeMapping, nil + } + if err := json.Unmarshal([]byte(data), &podToNodeMapping); err != nil { + return nil, fmt.Errorf("can't unmarshal scheduling information: %w", err) + } + return podToNodeMapping, nil +} + +// sets annotation in place +func deleteNodeSelectorOnceAnnotation(its *workloads.InstanceSet, podName string) error { + podToNodeMapping, err := ParseNodeSelectorOnceAnnotation(its) + if err != nil { + return err + } + delete(podToNodeMapping, podName) + if len(podToNodeMapping) == 0 { + delete(its.Annotations, constant.NodeSelectorOnceAnnotationKey) + } else { + data, err := json.Marshal(podToNodeMapping) + if err != nil { + return err + } + its.Annotations[constant.NodeSelectorOnceAnnotationKey] = string(data) + } + return nil +} + +// MergeNodeSelectorOnceAnnotation merges its's nodeSelectorOnce annotation in place +func MergeNodeSelectorOnceAnnotation(its *workloads.InstanceSet, podToNodeMapping map[string]string) error { + origPodToNodeMapping, err := ParseNodeSelectorOnceAnnotation(its) + if err != nil { + return err + } + for k, v := range podToNodeMapping { + origPodToNodeMapping[k] = v + } + data, err := json.Marshal(origPodToNodeMapping) + if err != nil { + return err + } + if its.Annotations == nil { + its.Annotations = make(map[string]string) + } + its.Annotations[constant.NodeSelectorOnceAnnotationKey] = string(data) + return nil +} + func buildInstanceByTemplate(name string, template *instanceTemplateExt, parent *workloads.InstanceSet, revision string) (*instance, error) { // 1. build a pod from template var err error @@ -391,6 +443,18 @@ func buildInstanceByTemplate(name string, template *instanceTemplateExt, parent pod.Spec.Hostname = pod.Name pod.Spec.Subdomain = getHeadlessSvcName(parent.Name) + podToNodeMapping, err := ParseNodeSelectorOnceAnnotation(parent) + if err != nil { + return nil, err + } + if nodeName, ok := podToNodeMapping[name]; ok { + // don't specify nodeName directly here, because it may affect WaitForFirstConsumer StorageClass + if pod.Spec.NodeSelector == nil { + pod.Spec.NodeSelector = make(map[string]string) + } + pod.Spec.NodeSelector[corev1.LabelHostname] = nodeName + } + // 2. build pvcs from template pvcMap := make(map[string]*corev1.PersistentVolumeClaim) pvcNameMap := make(map[string]string) diff --git a/pkg/controller/instanceset/instance_util_test.go b/pkg/controller/instanceset/instance_util_test.go index c0980223e4d..80bd3fcd20a 100644 --- a/pkg/controller/instanceset/instance_util_test.go +++ b/pkg/controller/instanceset/instance_util_test.go @@ -213,6 +213,34 @@ var _ = Describe("instance util test", func() { Expect(instance.pvcs[0].Labels[constant.VolumeClaimTemplateNameLabelKey]).Should(Equal(volumeClaimTemplates[0].Name)) Expect(instance.pvcs[0].Spec.Resources).Should(Equal(volumeClaimTemplates[0].Spec.Resources)) }) + + It("adds nodeSelector according to annotation", func() { + itsExt, err := buildInstanceSetExt(its, nil) + Expect(err).Should(BeNil()) + nameTemplate, err := buildInstanceName2TemplateMap(itsExt) + Expect(err).Should(BeNil()) + name := name + "-0" + Expect(nameTemplate).Should(HaveKey(name)) + template := nameTemplate[name] + + node := "test-node-1" + Expect(MergeNodeSelectorOnceAnnotation(its, map[string]string{name: node})).To(Succeed()) + instance, err := buildInstanceByTemplate(name, template, its, "") + Expect(err).NotTo(HaveOccurred()) + Expect(instance.pod.Spec.NodeSelector[corev1.LabelHostname]).To(Equal(node)) + + By("test with an already existing annotation") + delete(its.Annotations, constant.NodeSelectorOnceAnnotationKey) + Expect(MergeNodeSelectorOnceAnnotation(its, map[string]string{"other-pod": "other-node"})).To(Succeed()) + Expect(MergeNodeSelectorOnceAnnotation(its, map[string]string{name: node})).To(Succeed()) + mapping, err := ParseNodeSelectorOnceAnnotation(its) + Expect(err).NotTo(HaveOccurred()) + Expect(mapping).To(HaveKeyWithValue("other-pod", "other-node")) + Expect(mapping).To(HaveKeyWithValue(name, node)) + instance, err = buildInstanceByTemplate(name, template, its, "") + Expect(err).NotTo(HaveOccurred()) + Expect(instance.pod.Spec.NodeSelector[corev1.LabelHostname]).To(Equal(node)) + }) }) Context("buildInstancePVCByTemplate", func() { diff --git a/pkg/controller/instanceset/reconciler_instance_alignment_test.go b/pkg/controller/instanceset/reconciler_instance_alignment_test.go index 53d2b93784c..9b89563c9ce 100644 --- a/pkg/controller/instanceset/reconciler_instance_alignment_test.go +++ b/pkg/controller/instanceset/reconciler_instance_alignment_test.go @@ -161,5 +161,26 @@ var _ = Describe("replicas alignment reconciler test", func() { })).Should(BeNumerically(">=", 0)) } }) + + It("handles nodeSelectorOnce Annotation", func() { + tree := kubebuilderx.NewObjectTree() + tree.SetRoot(its) + name := "bar-1" + node := "test-1" + Expect(MergeNodeSelectorOnceAnnotation(its, map[string]string{name: node})).To(Succeed()) + + res, err := reconciler.Reconcile(tree) + Expect(err).Should(BeNil()) + Expect(res).Should(Equal(kubebuilderx.Continue)) + pods := tree.List(&corev1.Pod{}) + for _, obj := range pods { + pod := obj.(*corev1.Pod) + if pod.Name == name { + Expect(pod.Spec.NodeSelector).To(Equal(map[string]string{ + corev1.LabelHostname: node, + })) + } + } + }) }) }) diff --git a/pkg/controller/instanceset/reconciler_status.go b/pkg/controller/instanceset/reconciler_status.go index 88cd62c1ff9..32b4e27054b 100644 --- a/pkg/controller/instanceset/reconciler_status.go +++ b/pkg/controller/instanceset/reconciler_status.go @@ -84,6 +84,11 @@ func (r *statusReconciler) Reconcile(tree *kubebuilderx.ObjectTree) (kubebuilder template2TotalReplicas[template.Name] = templateReplicas } + podToNodeMapping, err := ParseNodeSelectorOnceAnnotation(its) + if err != nil { + return kubebuilderx.Continue, err + } + for _, pod := range podList { parentName, _ := ParseParentNameAndOrdinal(pod.Name) templateName, _ := strings.CutPrefix(parentName, its.Name) @@ -126,6 +131,15 @@ func (r *statusReconciler) Reconcile(tree *kubebuilderx.ObjectTree) (kubebuilder template2TemplatesStatus[templateName].UpdatedReplicas++ } } + + if nodeName, ok := podToNodeMapping[pod.Name]; ok { + // there's chance that a pod is currently running and wait to be deleted so that it can be rescheduled + if pod.Spec.NodeName == nodeName { + if err := deleteNodeSelectorOnceAnnotation(its, pod.Name); err != nil { + return kubebuilderx.Continue, err + } + } + } } its.Status.Replicas = replicas its.Status.ReadyReplicas = readyReplicas diff --git a/pkg/controller/instanceset/reconciler_status_test.go b/pkg/controller/instanceset/reconciler_status_test.go index 054827e55fe..ddb2f652725 100644 --- a/pkg/controller/instanceset/reconciler_status_test.go +++ b/pkg/controller/instanceset/reconciler_status_test.go @@ -31,6 +31,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" workloads "github.com/apecloud/kubeblocks/apis/workloads/v1" + "github.com/apecloud/kubeblocks/pkg/constant" "github.com/apecloud/kubeblocks/pkg/controller/builder" "github.com/apecloud/kubeblocks/pkg/controller/kubebuilderx" ) @@ -50,6 +51,32 @@ var _ = Describe("status reconciler test", func() { }) Context("PreCondition & Reconcile", func() { + reconcilePods := func(tree *kubebuilderx.ObjectTree) { + By("fix meta") + reconciler = NewFixMetaReconciler() + res, err := reconciler.Reconcile(tree) + Expect(err).Should(BeNil()) + Expect(res).Should(Equal(kubebuilderx.Commit)) + + By("update revisions") + reconciler = NewRevisionUpdateReconciler() + res, err = reconciler.Reconcile(tree) + Expect(err).Should(BeNil()) + Expect(res).Should(Equal(kubebuilderx.Continue)) + + By("assistant object") + reconciler = NewAssistantObjectReconciler() + res, err = reconciler.Reconcile(tree) + Expect(err).Should(BeNil()) + Expect(res).Should(Equal(kubebuilderx.Continue)) + + By("replicas alignment") + reconciler = NewReplicasAlignmentReconciler() + res, err = reconciler.Reconcile(tree) + Expect(err).Should(BeNil()) + Expect(res).Should(Equal(kubebuilderx.Continue)) + } + It("should work well", func() { By("PreCondition") its.Generation = 1 @@ -74,34 +101,12 @@ var _ = Describe("status reconciler test", func() { its.Spec.Instances = append(its.Spec.Instances, instanceFoo) // prepare for update - By("fix meta") - reconciler = NewFixMetaReconciler() - res, err := reconciler.Reconcile(tree) - Expect(err).Should(BeNil()) - Expect(res).Should(Equal(kubebuilderx.Commit)) - - By("update revisions") - reconciler = NewRevisionUpdateReconciler() - res, err = reconciler.Reconcile(tree) - Expect(err).Should(BeNil()) - Expect(res).Should(Equal(kubebuilderx.Continue)) - - By("assistant object") - reconciler = NewAssistantObjectReconciler() - res, err = reconciler.Reconcile(tree) - Expect(err).Should(BeNil()) - Expect(res).Should(Equal(kubebuilderx.Continue)) - - By("replicas alignment") - reconciler = NewReplicasAlignmentReconciler() - res, err = reconciler.Reconcile(tree) - Expect(err).Should(BeNil()) - Expect(res).Should(Equal(kubebuilderx.Continue)) + reconcilePods(tree) By("all pods are not ready") reconciler = NewStatusReconciler() Expect(reconciler.PreCondition(tree)).Should(Equal(kubebuilderx.ConditionSatisfied)) - res, err = reconciler.Reconcile(tree) + res, err := reconciler.Reconcile(tree) Expect(err).Should(BeNil()) Expect(res).Should(Equal(kubebuilderx.Continue)) Expect(its.Status.Replicas).Should(BeEquivalentTo(0)) @@ -245,6 +250,31 @@ var _ = Describe("status reconciler test", func() { Expect(its.Status.Conditions[2].Reason).Should(BeEquivalentTo(workloads.ReasonInstanceFailure)) Expect(its.Status.Conditions[2].Message).Should(BeEquivalentTo(message)) }) + + It("updates nodeSelectorOnce Annotation", func() { + tree := kubebuilderx.NewObjectTree() + tree.SetRoot(its) + reconcilePods(tree) + + name := "bar-0" + nodeName := "foo" + Expect(MergeNodeSelectorOnceAnnotation(its, map[string]string{name: nodeName})).To(Succeed()) + By("mock pod's nodeName") + pods := tree.List(&corev1.Pod{}) + for _, podObj := range pods { + pod := podObj.(*corev1.Pod) + if pod.Name != name { + continue + } + pod.Spec.NodeName = nodeName + break + } + reconciler = NewStatusReconciler() + res, err := reconciler.Reconcile(tree) + Expect(err).Should(BeNil()) + Expect(res).Should(Equal(kubebuilderx.Continue)) + Expect(its.Annotations[constant.NodeSelectorOnceAnnotationKey]).To(BeEmpty()) + }) }) Context("setMembersStatus function", func() {