Skip to content

Commit

Permalink
feat: specify node for rebuild instance ops (#8151)
Browse files Browse the repository at this point in the history
  • Loading branch information
cjc7373 authored Sep 18, 2024
1 parent 6e71071 commit 87e27f5
Show file tree
Hide file tree
Showing 13 changed files with 272 additions and 41 deletions.
2 changes: 1 addition & 1 deletion apis/apps/v1alpha1/opsrequest_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ type Instance struct {
// +kubebuilder:validation:Required
Name string `json:"name"`

// The instance will rebuild on the specified node when the instance uses local PersistentVolume as the storage disk.
// The instance will rebuild on the specified node.
// If not set, it will rebuild on a random node.
// +optional
TargetNodeName string `json:"targetNodeName,omitempty"`
Expand Down
2 changes: 1 addition & 1 deletion config/crd/bases/apps.kubeblocks.io_opsrequests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4020,7 +4020,7 @@ spec:
type: string
targetNodeName:
description: |-
The instance will rebuild on the specified node when the instance uses local PersistentVolume as the storage disk.
The instance will rebuild on the specified node.
If not set, it will rebuild on a random node.
type: string
required:
Expand Down
45 changes: 36 additions & 9 deletions controllers/apps/operations/rebuild_instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,14 @@ import (
"golang.org/x/exp/slices"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/utils/pointer"
"sigs.k8s.io/controller-runtime/pkg/client"

appsv1 "github.com/apecloud/kubeblocks/apis/apps/v1"
appsv1alpha1 "github.com/apecloud/kubeblocks/apis/apps/v1alpha1"
dpv1alpha1 "github.com/apecloud/kubeblocks/apis/dataprotection/v1alpha1"
workloads "github.com/apecloud/kubeblocks/apis/workloads/v1"
"github.com/apecloud/kubeblocks/pkg/constant"
"github.com/apecloud/kubeblocks/pkg/controller/component"
"github.com/apecloud/kubeblocks/pkg/controller/instanceset"
Expand Down Expand Up @@ -297,8 +299,8 @@ func (r rebuildInstanceOpsHandler) rebuildInstancesWithHScaling(reqCtx intctrlut
)
if len(compStatus.ProgressDetails) == 0 {
// 1. scale out the required instances
r.scaleOutRequiredInstances(opsRes, rebuildInstance, compStatus)
return 0, 0, nil
err := r.scaleOutRequiredInstances(reqCtx, cli, opsRes, rebuildInstance, compStatus)
return 0, 0, err
}
for i := range opsRes.Cluster.Spec.ComponentSpecs {
compSpec := &opsRes.Cluster.Spec.ComponentSpecs[i]
Expand All @@ -321,9 +323,11 @@ func (r rebuildInstanceOpsHandler) rebuildInstancesWithHScaling(reqCtx intctrlut
return completedCount, failedCount, nil
}

func (r rebuildInstanceOpsHandler) scaleOutRequiredInstances(opsRes *OpsResource,
func (r rebuildInstanceOpsHandler) scaleOutRequiredInstances(reqCtx intctrlutil.RequestCtx,
cli client.Client,
opsRes *OpsResource,
rebuildInstance appsv1alpha1.RebuildInstance,
compStatus *appsv1alpha1.OpsRequestComponentStatus) {
compStatus *appsv1alpha1.OpsRequestComponentStatus) error {
// 1. sort the instances
slices.SortFunc(rebuildInstance.Instances, func(a, b appsv1alpha1.Instance) bool {
return a.Name < b.Name
Expand All @@ -345,9 +349,9 @@ func (r rebuildInstanceOpsHandler) scaleOutRequiredInstances(opsRes *OpsResource
opsRes.Recorder.Eventf(opsRes.OpsRequest, corev1.EventTypeWarning, reasonCompReplicasChanged, "then replicas of the component %s has been changed", compName)
continue
}
r.scaleOutCompReplicasAndSyncProgress(opsRes, compSpec, rebuildInstance, compStatus, rebuildInsWrapper)
break
return r.scaleOutCompReplicasAndSyncProgress(reqCtx, cli, opsRes, compSpec, rebuildInstance, compStatus, rebuildInsWrapper)
}
return nil
}

// getRebuildInstanceWrapper assembles the corresponding replicas and instances based on the template
Expand All @@ -365,11 +369,13 @@ func (r rebuildInstanceOpsHandler) getRebuildInstanceWrapper(opsRes *OpsResource
return rebuildInsWrapper
}

func (r rebuildInstanceOpsHandler) scaleOutCompReplicasAndSyncProgress(opsRes *OpsResource,
func (r rebuildInstanceOpsHandler) scaleOutCompReplicasAndSyncProgress(reqCtx intctrlutil.RequestCtx,
cli client.Client,
opsRes *OpsResource,
compSpec *appsv1.ClusterComponentSpec,
rebuildInstance appsv1alpha1.RebuildInstance,
compStatus *appsv1alpha1.OpsRequestComponentStatus,
rebuildInsWrapper map[string]*rebuildInstanceWrapper) {
rebuildInsWrapper map[string]*rebuildInstanceWrapper) error {
scaleOutInsMap := map[string]string{}
setScaleOutInsMap := func(workloadName, templateName string,
replicas int32, offlineInstances []string, wrapper *rebuildInstanceWrapper) {
Expand All @@ -393,16 +399,37 @@ func (r rebuildInstanceOpsHandler) scaleOutCompReplicasAndSyncProgress(opsRes *O
if wrapper, ok := rebuildInsWrapper[""]; ok {
setScaleOutInsMap(workloadName, "", compSpec.Replicas-allTemplateReplicas, compSpec.OfflineInstances, wrapper)
}
// set progress details

its := &workloads.InstanceSet{}
if err := cli.Get(reqCtx.Ctx, types.NamespacedName{Name: workloadName, Namespace: opsRes.OpsRequest.Namespace}, its); err != nil {
return err
}
itsUpdated := false
for _, ins := range rebuildInstance.Instances {
// set progress details
scaleOutInsName := scaleOutInsMap[ins.Name]
setComponentStatusProgressDetail(opsRes.Recorder, opsRes.OpsRequest, &compStatus.ProgressDetails,
appsv1alpha1.ProgressStatusDetail{
ObjectKey: getProgressObjectKey(constant.PodKind, ins.Name),
Status: appsv1alpha1.ProcessingProgressStatus,
Message: r.buildScalingOutPodMessage(scaleOutInsName, "Processing"),
})

// specify node to scale out
if ins.TargetNodeName != "" {
if err := instanceset.MergeNodeSelectorOnceAnnotation(its, map[string]string{scaleOutInsName: ins.TargetNodeName}); err != nil {
return err
}
itsUpdated = true
}
}

if itsUpdated {
if err := cli.Update(reqCtx.Ctx, its); err != nil {
return err
}
}
return nil
}

// checkProgressForScalingOutPods checks if the new pods are available.
Expand Down
21 changes: 20 additions & 1 deletion controllers/apps/operations/rebuild_instance_inplace.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,13 @@ import (

appsv1alpha1 "github.com/apecloud/kubeblocks/apis/apps/v1alpha1"
dpv1alpha1 "github.com/apecloud/kubeblocks/apis/dataprotection/v1alpha1"
workloads "github.com/apecloud/kubeblocks/apis/workloads/v1"
"github.com/apecloud/kubeblocks/pkg/common"
"github.com/apecloud/kubeblocks/pkg/constant"
"github.com/apecloud/kubeblocks/pkg/controller/builder"
"github.com/apecloud/kubeblocks/pkg/controller/component"
"github.com/apecloud/kubeblocks/pkg/controller/factory"
"github.com/apecloud/kubeblocks/pkg/controller/instanceset"
intctrlutil "github.com/apecloud/kubeblocks/pkg/controllerutil"
dputils "github.com/apecloud/kubeblocks/pkg/dataprotection/utils"
viper "github.com/apecloud/kubeblocks/pkg/viperx"
Expand Down Expand Up @@ -357,7 +359,7 @@ func (inPlaceHelper *inplaceRebuildHelper) rebuildSourcePVCsAndRecreateInstance(
}
// set volumeName to tmp pvc, it will be used when recreating the source pvc.
tmpPVC.Spec.VolumeName = pv.Name
// 5. recreate the source pbc.
// 5. recreate the source pvc.
if err = inPlaceHelper.recreateSourcePVC(reqCtx, cli, tmpPVC, sourcePVCName, opsRequest.Name); err != nil {
return err
}
Expand All @@ -368,6 +370,23 @@ func (inPlaceHelper *inplaceRebuildHelper) rebuildSourcePVCsAndRecreateInstance(
if opsRequest.Spec.Force {
options = append(options, client.GracePeriodSeconds(0))
}

if inPlaceHelper.instance.TargetNodeName != "" {
// under the circumstance of using cloud disks, need to set node selector again to make sure pod
// goes to the specified node
its := &workloads.InstanceSet{}
itsName := constant.GenerateWorkloadNamePattern(inPlaceHelper.synthesizedComp.ClusterName, inPlaceHelper.synthesizedComp.Name)
if err := cli.Get(reqCtx.Ctx, types.NamespacedName{Name: itsName, Namespace: inPlaceHelper.synthesizedComp.Namespace}, its); err != nil {
return err
}
if err := instanceset.MergeNodeSelectorOnceAnnotation(its, map[string]string{inPlaceHelper.targetPod.Name: inPlaceHelper.instance.TargetNodeName}); err != nil {
return err
}
if err := cli.Update(reqCtx.Ctx, its); err != nil {
return err
}
}

return intctrlutil.BackgroundDeleteObject(cli, reqCtx.Ctx, inPlaceHelper.targetPod, options...)
}

Expand Down
31 changes: 28 additions & 3 deletions controllers/apps/operations/rebuild_instance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,11 @@ import (
appsv1 "github.com/apecloud/kubeblocks/apis/apps/v1"
appsv1alpha1 "github.com/apecloud/kubeblocks/apis/apps/v1alpha1"
dpv1alpha1 "github.com/apecloud/kubeblocks/apis/dataprotection/v1alpha1"
workloads "github.com/apecloud/kubeblocks/apis/workloads/v1"
"github.com/apecloud/kubeblocks/pkg/common"
"github.com/apecloud/kubeblocks/pkg/constant"
"github.com/apecloud/kubeblocks/pkg/controller/component"
"github.com/apecloud/kubeblocks/pkg/controller/instanceset"
intctrlutil "github.com/apecloud/kubeblocks/pkg/controllerutil"
dptypes "github.com/apecloud/kubeblocks/pkg/dataprotection/types"
"github.com/apecloud/kubeblocks/pkg/generics"
Expand All @@ -50,6 +52,7 @@ var _ = Describe("OpsUtil functions", func() {
randomStr = testCtx.GetRandomStr()
compDefName = "test-compdef-" + randomStr
clusterName = "test-cluster-" + randomStr
targetNodeName = "test-node-1"
rebuildInstanceCount = 2
)

Expand All @@ -70,6 +73,7 @@ var _ = Describe("OpsUtil functions", func() {
testapps.ClearResources(&testCtx, generics.OpsRequestSignature, inNS, ml)
testapps.ClearResources(&testCtx, generics.BackupSignature, inNS, ml)
testapps.ClearResources(&testCtx, generics.RestoreSignature, inNS, ml)
testapps.ClearResources(&testCtx, generics.InstanceSetSignature, inNS, ml)
// default GracePeriod is 30s
testapps.ClearResources(&testCtx, generics.PodSignature, inNS, ml, client.GracePeriodSeconds(0))
testapps.ClearResourcesWithRemoveFinalizerOption(&testCtx, generics.PersistentVolumeClaimSignature, true, inNS, ml)
Expand All @@ -90,7 +94,8 @@ var _ = Describe("OpsUtil functions", func() {
var instances []appsv1alpha1.Instance
for _, insName := range instanceNames {
instances = append(instances, appsv1alpha1.Instance{
Name: insName,
Name: insName,
TargetNodeName: targetNodeName,
})
}
ops.Spec.RebuildFrom = []appsv1alpha1.RebuildInstance{
Expand Down Expand Up @@ -258,6 +263,7 @@ var _ = Describe("OpsUtil functions", func() {
It("test rebuild instance with no backup", func() {
By("init operations resources ")
opsRes := prepareOpsRes("", true)
its := testapps.MockInstanceSetComponent(&testCtx, clusterName, defaultCompName)
opsRes.OpsRequest.Status.Phase = appsv1alpha1.OpsRunningPhase
reqCtx := intctrlutil.RequestCtx{Ctx: testCtx.Ctx}

Expand Down Expand Up @@ -299,8 +305,18 @@ var _ = Describe("OpsUtil functions", func() {
}))

By("expect to clean up the tmp pods")
_, _ = GetOpsManager().Reconcile(reqCtx, k8sClient, opsRes)
_, err := GetOpsManager().Reconcile(reqCtx, k8sClient, opsRes)
Expect(err).NotTo(HaveOccurred())
Eventually(testapps.List(&testCtx, generics.PodSignature, matchingLabels, client.InNamespace(opsRes.OpsRequest.Namespace))).Should(HaveLen(0))

By("check its' schedule once annotation")
podPrefix := constant.GenerateWorkloadNamePattern(clusterName, defaultCompName)
Eventually(testapps.CheckObj(&testCtx, client.ObjectKeyFromObject(its), func(g Gomega, its *workloads.InstanceSet) {
mapping, err := instanceset.ParseNodeSelectorOnceAnnotation(its)
g.Expect(err).NotTo(HaveOccurred())
g.Expect(mapping).To(HaveKeyWithValue(podPrefix+"-0", targetNodeName))
g.Expect(mapping).To(HaveKeyWithValue(podPrefix+"-1", targetNodeName))
})).Should(Succeed())
})

testRebuildInstanceWithBackup := func(ignoreRoleCheck bool) {
Expand All @@ -326,6 +342,7 @@ var _ = Describe("OpsUtil functions", func() {
}
})).Should(Succeed())
opsRes := prepareOpsRes(backup.Name, true)
_ = testapps.MockInstanceSetComponent(&testCtx, clusterName, defaultCompName)
if ignoreRoleCheck {
Expect(testapps.ChangeObj(&testCtx, opsRes.OpsRequest, func(request *appsv1alpha1.OpsRequest) {
if request.Annotations == nil {
Expand Down Expand Up @@ -435,8 +452,16 @@ var _ = Describe("OpsUtil functions", func() {
_, _ = GetOpsManager().Reconcile(reqCtx, k8sClient, opsRes)
Expect(opsRes.Cluster.Spec.GetComponentByName(defaultCompName).Replicas).Should(BeEquivalentTo(5))

By("mock the new pods to available")
By("its have expected nodeSelector")
podPrefix := constant.GenerateWorkloadNamePattern(clusterName, defaultCompName)
Eventually(testapps.CheckObj(&testCtx, client.ObjectKeyFromObject(its), func(g Gomega, its *workloads.InstanceSet) {
mapping, err := instanceset.ParseNodeSelectorOnceAnnotation(its)
g.Expect(err).NotTo(HaveOccurred())
g.Expect(mapping).To(HaveKeyWithValue(podPrefix+"-3", targetNodeName))
g.Expect(mapping).To(HaveKeyWithValue(podPrefix+"-4", targetNodeName))
})).Should(Succeed())

By("mock the new pods to available")
testapps.MockInstanceSetPod(&testCtx, nil, clusterName, defaultCompName, podPrefix+"-3", "follower", "Readonly")
testapps.MockInstanceSetPod(&testCtx, nil, clusterName, defaultCompName, podPrefix+"-4", "follower", "Readonly")

Expand Down
2 changes: 1 addition & 1 deletion deploy/helm/crds/apps.kubeblocks.io_opsrequests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4020,7 +4020,7 @@ spec:
type: string
targetNodeName:
description: |-
The instance will rebuild on the specified node when the instance uses local PersistentVolume as the storage disk.
The instance will rebuild on the specified node.
If not set, it will rebuild on a random node.
type: string
required:
Expand Down
2 changes: 1 addition & 1 deletion docs/developer_docs/api-reference/cluster.md
Original file line number Diff line number Diff line change
Expand Up @@ -22677,7 +22677,7 @@ string
</td>
<td>
<em>(Optional)</em>
<p>The instance will rebuild on the specified node when the instance uses local PersistentVolume as the storage disk.
<p>The instance will rebuild on the specified node.
If not set, it will rebuild on a random node.</p>
</td>
</tr>
Expand Down
3 changes: 3 additions & 0 deletions pkg/constant/annotations.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ const (
// SkipImmutableCheckAnnotationKey specifies to skip the mutation check for the object.
// The mutation check is only applied to the fields that are declared as immutable.
SkipImmutableCheckAnnotationKey = "apps.kubeblocks.io/skip-immutable-check"

// NodeSelectorOnceAnnotationKey adds nodeSelector in podSpec for one pod exactly once
NodeSelectorOnceAnnotationKey = "workloads.kubeblocks.io/node-selector-once"
)

// annotations for multi-cluster
Expand Down
64 changes: 64 additions & 0 deletions pkg/controller/instanceset/instance_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -369,6 +369,58 @@ func ConvertOrdinalsToSortedList(ordinals workloads.Ordinals) ([]int32, error) {
return sortedOrdinalList, nil
}

// ParseNodeSelectorOnceAnnotation will return a non-nil map
func ParseNodeSelectorOnceAnnotation(its *workloads.InstanceSet) (map[string]string, error) {
podToNodeMapping := make(map[string]string)
data, ok := its.Annotations[constant.NodeSelectorOnceAnnotationKey]
if !ok {
return podToNodeMapping, nil
}
if err := json.Unmarshal([]byte(data), &podToNodeMapping); err != nil {
return nil, fmt.Errorf("can't unmarshal scheduling information: %w", err)
}
return podToNodeMapping, nil
}

// sets annotation in place
func deleteNodeSelectorOnceAnnotation(its *workloads.InstanceSet, podName string) error {
podToNodeMapping, err := ParseNodeSelectorOnceAnnotation(its)
if err != nil {
return err
}
delete(podToNodeMapping, podName)
if len(podToNodeMapping) == 0 {
delete(its.Annotations, constant.NodeSelectorOnceAnnotationKey)
} else {
data, err := json.Marshal(podToNodeMapping)
if err != nil {
return err
}
its.Annotations[constant.NodeSelectorOnceAnnotationKey] = string(data)
}
return nil
}

// MergeNodeSelectorOnceAnnotation merges its's nodeSelectorOnce annotation in place
func MergeNodeSelectorOnceAnnotation(its *workloads.InstanceSet, podToNodeMapping map[string]string) error {
origPodToNodeMapping, err := ParseNodeSelectorOnceAnnotation(its)
if err != nil {
return err
}
for k, v := range podToNodeMapping {
origPodToNodeMapping[k] = v
}
data, err := json.Marshal(origPodToNodeMapping)
if err != nil {
return err
}
if its.Annotations == nil {
its.Annotations = make(map[string]string)
}
its.Annotations[constant.NodeSelectorOnceAnnotationKey] = string(data)
return nil
}

func buildInstanceByTemplate(name string, template *instanceTemplateExt, parent *workloads.InstanceSet, revision string) (*instance, error) {
// 1. build a pod from template
var err error
Expand All @@ -391,6 +443,18 @@ func buildInstanceByTemplate(name string, template *instanceTemplateExt, parent
pod.Spec.Hostname = pod.Name
pod.Spec.Subdomain = getHeadlessSvcName(parent.Name)

podToNodeMapping, err := ParseNodeSelectorOnceAnnotation(parent)
if err != nil {
return nil, err
}
if nodeName, ok := podToNodeMapping[name]; ok {
// don't specify nodeName directly here, because it may affect WaitForFirstConsumer StorageClass
if pod.Spec.NodeSelector == nil {
pod.Spec.NodeSelector = make(map[string]string)
}
pod.Spec.NodeSelector[corev1.LabelHostname] = nodeName
}

// 2. build pvcs from template
pvcMap := make(map[string]*corev1.PersistentVolumeClaim)
pvcNameMap := make(map[string]string)
Expand Down
Loading

0 comments on commit 87e27f5

Please sign in to comment.