Skip to content

Commit

Permalink
fix: volume expand the volumeClaimTemplates of the instance template …
Browse files Browse the repository at this point in the history
…failed (#8277)
  • Loading branch information
wangyelei authored Oct 15, 2024
1 parent 4b8e658 commit b2e90cc
Show file tree
Hide file tree
Showing 6 changed files with 139 additions and 35 deletions.
19 changes: 16 additions & 3 deletions apis/operations/v1alpha1/opsrequest_validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -694,10 +694,17 @@ func (r *OpsRequest) checkStorageClassAllowExpansion(ctx context.Context,
// getSCNameByPvcAndCheckStorageSize gets the storageClassName by pvc and checks if the storage size is valid.
func (r *OpsRequest) getSCNameByPvcAndCheckStorageSize(ctx context.Context,
cli client.Client,
componentName,
key,
vctName string,
isShardingComponent bool,
requestStorage resource.Quantity) (*string, error) {
componentName := key
targetInsTPLName := ""
if strings.Contains(key, ".") {
keyStrs := strings.Split(key, ".")
componentName = keyStrs[0]
targetInsTPLName = keyStrs[1]
}
matchingLabels := client.MatchingLabels{
constant.AppInstanceLabelKey: r.Spec.GetClusterName(),
constant.VolumeClaimTemplateNameLabelKey: vctName,
Expand All @@ -711,10 +718,16 @@ func (r *OpsRequest) getSCNameByPvcAndCheckStorageSize(ctx context.Context,
if err := cli.List(ctx, pvcList, client.InNamespace(r.Namespace), matchingLabels); err != nil {
return nil, err
}
if len(pvcList.Items) == 0 {
var pvc *corev1.PersistentVolumeClaim
for _, pvcItem := range pvcList.Items {
if targetInsTPLName == pvcItem.Labels[constant.KBAppComponentInstanceTemplateLabelKey] {
pvc = &pvcItem
break
}
}
if pvc == nil {
return nil, nil
}
pvc := pvcList.Items[0]
previousValue := *pvc.Status.Capacity.Storage()
if requestStorage.Cmp(previousValue) < 0 {
return nil, fmt.Errorf(`requested storage size of volumeClaimTemplate "%s" can not less than status.capacity.storage "%s" `,
Expand Down
76 changes: 56 additions & 20 deletions controllers/apps/component_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,8 +375,8 @@ var _ = Describe("Component Controller", func() {
})).Should(Succeed())
}

getPVCName := func(vctName, compName string, i int) string {
return fmt.Sprintf("%s-%s-%s-%d", vctName, clusterKey.Name, compName, i)
getPVCName := func(vctName, compAndTPLName string, i int) string {
return fmt.Sprintf("%s-%s-%s-%d", vctName, clusterKey.Name, compAndTPLName, i)
}

createPVC := func(clusterName, pvcName, compName, storageSize, storageClassName string) {
Expand Down Expand Up @@ -705,11 +705,15 @@ var _ = Describe("Component Controller", func() {

testVolumeExpansion := func(compDef *kbappsv1.ComponentDefinition, compName string, storageClass *storagev1.StorageClass) {
var (
replicas = 3
volumeSize = "1Gi"
newVolumeSize = "2Gi"
volumeQuantity = resource.MustParse(volumeSize)
newVolumeQuantity = resource.MustParse(newVolumeSize)
insTPLName = "foo"
replicas = 3
volumeSize = "1Gi"
newVolumeSize = "2Gi"
newFooVolumeSize = "3Gi"
volumeQuantity = resource.MustParse(volumeSize)
newVolumeQuantity = resource.MustParse(newVolumeSize)
newFooVolumeQuantity = resource.MustParse(newFooVolumeSize)
compAndTPLName = fmt.Sprintf("%s-%s", compName, insTPLName)
)

By("Mock a StorageClass which allows resize")
Expand All @@ -724,20 +728,40 @@ var _ = Describe("Component Controller", func() {
f.SetReplicas(int32(replicas)).
SetServiceVersion(compDef.Spec.ServiceVersion).
AddVolumeClaimTemplate(testapps.DataVolumeName, pvcSpec).
AddVolumeClaimTemplate(testapps.LogVolumeName, pvcSpec)
AddVolumeClaimTemplate(testapps.LogVolumeName, pvcSpec).
AddInstances(compName, kbappsv1.InstanceTemplate{
Name: insTPLName,
Replicas: pointer.Int32(1),
VolumeClaimTemplates: []kbappsv1.ClusterComponentVolumeClaimTemplate{
{Name: testapps.DataVolumeName, Spec: pvcSpec},
{Name: testapps.LogVolumeName, Spec: pvcSpec},
},
})
})

By("Checking the replicas")
itsList := testk8s.ListAndCheckInstanceSet(&testCtx, clusterKey)
its := &itsList.Items[0]
Expect(*its.Spec.Replicas).Should(BeEquivalentTo(replicas))

pvcName := func(vctName string, index int) string {
pvcName := getPVCName(vctName, compName, index)
if index == replicas-1 {
pvcName = getPVCName(vctName, compAndTPLName, 0)
}
return pvcName
}
newVolumeQuantityF := func(index int) resource.Quantity {
if index == replicas-1 {
return newFooVolumeQuantity
}
return newVolumeQuantity
}
By("Mock PVCs in Bound Status")
for i := 0; i < replicas; i++ {
for _, vctName := range []string{testapps.DataVolumeName, testapps.LogVolumeName} {
pvc := &corev1.PersistentVolumeClaim{
ObjectMeta: metav1.ObjectMeta{
Name: getPVCName(vctName, compName, i),
Name: pvcName(vctName, i),
Namespace: clusterKey.Namespace,
Labels: map[string]string{
constant.AppManagedByLabelKey: constant.AppName,
Expand All @@ -746,6 +770,9 @@ var _ = Describe("Component Controller", func() {
}},
Spec: pvcSpec.ToV1PersistentVolumeClaimSpec(),
}
if i == replicas-1 {
pvc.Labels[constant.KBAppComponentInstanceTemplateLabelKey] = insTPLName
}
Expect(testCtx.CreateObj(testCtx.Ctx, pvc)).Should(Succeed())
patch := client.MergeFrom(pvc.DeepCopy())
pvc.Status.Phase = corev1.ClaimBound // only bound pvc allows resize
Expand All @@ -770,9 +797,18 @@ var _ = Describe("Component Controller", func() {
By("Updating data PVC storage size")
Expect(testapps.GetAndChangeObj(&testCtx, clusterKey, func(cluster *kbappsv1.Cluster) {
comp := &cluster.Spec.ComponentSpecs[0]
for i, vct := range comp.VolumeClaimTemplates {
if vct.Name == testapps.DataVolumeName {
comp.VolumeClaimTemplates[i].Spec.Resources.Requests[corev1.ResourceStorage] = newVolumeQuantity
expandVolume := func(vcts []kbappsv1.ClusterComponentVolumeClaimTemplate, quantity resource.Quantity) {
for i, vct := range vcts {
if vct.Name == testapps.DataVolumeName {
vcts[i].Spec.Resources.Requests[corev1.ResourceStorage] = quantity
}
}
}
expandVolume(comp.VolumeClaimTemplates, newVolumeQuantity)
for i, insTPL := range comp.Instances {
if insTPL.Name == insTPLName {
expandVolume(comp.Instances[i].VolumeClaimTemplates, newFooVolumeQuantity)
break
}
}
})()).ShouldNot(HaveOccurred())
Expand All @@ -785,23 +821,23 @@ var _ = Describe("Component Controller", func() {
pvc := &corev1.PersistentVolumeClaim{}
pvcKey := types.NamespacedName{
Namespace: clusterKey.Namespace,
Name: getPVCName(testapps.DataVolumeName, compName, i),
Name: pvcName(testapps.DataVolumeName, i),
}
Eventually(func(g Gomega) {
g.Expect(k8sClient.Get(testCtx.Ctx, pvcKey, pvc)).Should(Succeed())
g.Expect(pvc.Spec.Resources.Requests[corev1.ResourceStorage]).To(Equal(newVolumeQuantity))
g.Expect(pvc.Status.Capacity[corev1.ResourceStorage]).To(Equal(volumeQuantity))
g.Expect(pvc.Spec.Resources.Requests[corev1.ResourceStorage]).To(Equal(newVolumeQuantityF(i)))
}).Should(Succeed())
}

By("Mock resizing of data volumes finished")
for i := 0; i < replicas; i++ {
pvcKey := types.NamespacedName{
Namespace: clusterKey.Namespace,
Name: getPVCName(testapps.DataVolumeName, compName, i),
Name: pvcName(testapps.DataVolumeName, i),
}
Expect(testapps.GetAndChangeObjStatus(&testCtx, pvcKey, func(pvc *corev1.PersistentVolumeClaim) {
pvc.Status.Capacity[corev1.ResourceStorage] = newVolumeQuantity
pvc.Status.Capacity[corev1.ResourceStorage] = newVolumeQuantityF(i)
})()).ShouldNot(HaveOccurred())
}

Expand All @@ -817,10 +853,10 @@ var _ = Describe("Component Controller", func() {
for i := 0; i < replicas; i++ {
pvcKey := types.NamespacedName{
Namespace: clusterKey.Namespace,
Name: getPVCName(testapps.DataVolumeName, compName, i),
Name: pvcName(testapps.DataVolumeName, i),
}
Eventually(testapps.CheckObj(&testCtx, pvcKey, func(g Gomega, pvc *corev1.PersistentVolumeClaim) {
g.Expect(pvc.Status.Capacity[corev1.ResourceStorage]).To(Equal(newVolumeQuantity))
g.Expect(pvc.Status.Capacity[corev1.ResourceStorage]).To(Equal(newVolumeQuantityF(i)))
})).Should(Succeed())
}

Expand All @@ -829,7 +865,7 @@ var _ = Describe("Component Controller", func() {
pvc := &corev1.PersistentVolumeClaim{}
pvcKey := types.NamespacedName{
Namespace: clusterKey.Namespace,
Name: getPVCName(testapps.LogVolumeName, compName, i),
Name: pvcName(testapps.LogVolumeName, i),
}
Expect(k8sClient.Get(testCtx.Ctx, pvcKey, pvc)).Should(Succeed())
Expect(pvc.Spec.Resources.Requests[corev1.ResourceStorage]).To(Equal(volumeQuantity))
Expand Down
58 changes: 50 additions & 8 deletions controllers/apps/transformer_component_workload.go
Original file line number Diff line number Diff line change
Expand Up @@ -408,13 +408,12 @@ func checkNRollbackProtoImages(itsObj, itsProto *workloads.InstanceSet) {
}
}

// expandVolume handles workload expand volume
func (r *componentWorkloadOps) expandVolume() error {
for _, vct := range r.runningITS.Spec.VolumeClaimTemplates {
func (r *componentWorkloadOps) expandVolumeClaimTemplates(runningVCTs []corev1.PersistentVolumeClaim, protoVCTs []corev1.PersistentVolumeClaimTemplate, insTPLName string) error {
for _, vct := range runningVCTs {
var proto *corev1.PersistentVolumeClaimTemplate
for i, v := range r.synthesizeComp.VolumeClaimTemplates {
for i, v := range protoVCTs {
if v.Name == vct.Name {
proto = &r.synthesizeComp.VolumeClaimTemplates[i]
proto = &protoVCTs[i]
break
}
}
Expand All @@ -423,7 +422,48 @@ func (r *componentWorkloadOps) expandVolume() error {
continue
}

if err := r.expandVolumes(vct.Name, proto); err != nil {
if err := r.expandVolumes(insTPLName, vct.Name, proto); err != nil {
return err
}
}
return nil
}

// expandVolume handles workload expand volume
func (r *componentWorkloadOps) expandVolume() error {
// 1. expand the volumes without instance template name.
if err := r.expandVolumeClaimTemplates(r.runningITS.Spec.VolumeClaimTemplates, r.synthesizeComp.VolumeClaimTemplates, ""); err != nil {
return err
}
if len(r.runningITS.Spec.Instances) == 0 {
return nil
}
// 2. expand the volumes with instance template name.
for i := range r.runningITS.Spec.Instances {
runningInsSpec := r.runningITS.Spec.DeepCopy()
runningInsTPL := runningInsSpec.Instances[i]
intctrlutil.MergeList(&runningInsTPL.VolumeClaimTemplates, &runningInsSpec.VolumeClaimTemplates,
func(item corev1.PersistentVolumeClaim) func(corev1.PersistentVolumeClaim) bool {
return func(claim corev1.PersistentVolumeClaim) bool {
return claim.Name == item.Name
}
})

var protoVCTs []corev1.PersistentVolumeClaimTemplate
protoVCTs = append(protoVCTs, r.synthesizeComp.VolumeClaimTemplates...)
for _, v := range r.synthesizeComp.Instances {
if runningInsTPL.Name == v.Name {
insVCTs := component.ToVolumeClaimTemplates(v.VolumeClaimTemplates)
intctrlutil.MergeList(&insVCTs, &protoVCTs,
func(item corev1.PersistentVolumeClaimTemplate) func(corev1.PersistentVolumeClaimTemplate) bool {
return func(claim corev1.PersistentVolumeClaimTemplate) bool {
return claim.Name == item.Name
}
})
break
}
}
if err := r.expandVolumeClaimTemplates(runningInsSpec.VolumeClaimTemplates, protoVCTs, runningInsTPL.Name); err != nil {
return err
}
}
Expand Down Expand Up @@ -679,7 +719,7 @@ func (r *componentWorkloadOps) deletePVCs4ScaleIn(itsObj *workloads.InstanceSet)
return nil
}

func (r *componentWorkloadOps) expandVolumes(vctName string, proto *corev1.PersistentVolumeClaimTemplate) error {
func (r *componentWorkloadOps) expandVolumes(insTPLName string, vctName string, proto *corev1.PersistentVolumeClaimTemplate) error {
for _, pod := range r.runningItsPodNames {
pvc := &corev1.PersistentVolumeClaim{}
pvcKey := types.NamespacedName{
Expand All @@ -694,7 +734,9 @@ func (r *componentWorkloadOps) expandVolumes(vctName string, proto *corev1.Persi
return err
}
}

if insTPLName != pvc.Labels[constant.KBAppComponentInstanceTemplateLabelKey] {
continue
}
if !pvcNotFound {
quantity := pvc.Spec.Resources.Requests.Storage()
newQuantity := proto.Spec.Resources.Requests.Storage()
Expand Down
6 changes: 3 additions & 3 deletions pkg/controller/component/synthesize_component.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ func buildSchedulingPolicy(synthesizedComp *SynthesizedComponent, comp *appsv1.C

func buildVolumeClaimTemplates(synthesizeComp *SynthesizedComponent, comp *appsv1.Component) {
if comp.Spec.VolumeClaimTemplates != nil {
synthesizeComp.VolumeClaimTemplates = toVolumeClaimTemplates(&comp.Spec)
synthesizeComp.VolumeClaimTemplates = ToVolumeClaimTemplates(comp.Spec.VolumeClaimTemplates)
}
}

Expand Down Expand Up @@ -336,7 +336,7 @@ func limitSharedMemoryVolumeSize(synthesizeComp *SynthesizedComponent, comp *app
}
}

func toVolumeClaimTemplates(compSpec *appsv1.ComponentSpec) []corev1.PersistentVolumeClaimTemplate {
func ToVolumeClaimTemplates(vcts []appsv1.ClusterComponentVolumeClaimTemplate) []corev1.PersistentVolumeClaimTemplate {
storageClassName := func(spec appsv1.PersistentVolumeClaimSpec, defaultStorageClass string) *string {
if spec.StorageClassName != nil && *spec.StorageClassName != "" {
return spec.StorageClassName
Expand All @@ -347,7 +347,7 @@ func toVolumeClaimTemplates(compSpec *appsv1.ComponentSpec) []corev1.PersistentV
return nil
}
var ts []corev1.PersistentVolumeClaimTemplate
for _, t := range compSpec.VolumeClaimTemplates {
for _, t := range vcts {
ts = append(ts, corev1.PersistentVolumeClaimTemplate{
ObjectMeta: metav1.ObjectMeta{
Name: t.Name,
Expand Down
4 changes: 3 additions & 1 deletion pkg/operations/volume_expansion.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ func (ve volumeExpansionOpsHandler) ReconcileAction(reqCtx intctrlutil.RequestCt
expectCount: int(ins.GetReplicas()),
vctName: vct.Name,
offlineInstanceNames: compSpec.OfflineInstances,
templateName: ins.Name,
})
}
}
Expand Down Expand Up @@ -239,7 +240,7 @@ func (ve volumeExpansionOpsHandler) SaveLastConfiguration(reqCtx intctrlutil.Req
getLastVCTs := func(vcts []appsv1.ClusterComponentVolumeClaimTemplate, templateName string) []appsv1.ClusterComponentVolumeClaimTemplate {
lastVCTs := make([]appsv1.ClusterComponentVolumeClaimTemplate, 0)
for _, vct := range vcts {
key := getComponentVCTKey(comOps.GetComponentName(), comOps.GetComponentName(), templateName)
key := getComponentVCTKey(comOps.GetComponentName(), templateName, vct.Name)
if _, ok := storageMap[key]; !ok {
continue
}
Expand All @@ -256,6 +257,7 @@ func (ve volumeExpansionOpsHandler) SaveLastConfiguration(reqCtx intctrlutil.Req
continue
}
instanceTemplates = append(instanceTemplates, appsv1.InstanceTemplate{
Name: v.Name,
VolumeClaimTemplates: getLastVCTs(ins.VolumeClaimTemplates, ins.Name),
})
}
Expand Down
11 changes: 11 additions & 0 deletions pkg/testutil/apps/cluster_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,17 @@ func (factory *MockClusterFactory) AddMultipleTemplateComponent(compName string,
return factory
}

func (factory *MockClusterFactory) AddInstances(compName string, instance appsv1.InstanceTemplate) *MockClusterFactory {
for i, compSpec := range factory.Get().Spec.ComponentSpecs {
if compSpec.Name != compName {
continue
}
factory.Get().Spec.ComponentSpecs[i].Instances = append(factory.Get().Spec.ComponentSpecs[i].Instances, instance)
break
}
return factory
}

func (factory *MockClusterFactory) AddService(service appsv1.ClusterService) *MockClusterFactory {
services := factory.Get().Spec.Services
if len(services) == 0 {
Expand Down

0 comments on commit b2e90cc

Please sign in to comment.