Skip to content

Commit

Permalink
Merge pull request #441 from Rakshith-R/df-bp
Browse files Browse the repository at this point in the history
DFBUGS-1199: [cephcsi] RBD images backing VolumeGroupSnapshots cannot be flattened
  • Loading branch information
openshift-merge-bot[bot] authored Dec 20, 2024
2 parents 1a68176 + 9a47054 commit 52a4d3e
Show file tree
Hide file tree
Showing 9 changed files with 139 additions and 100 deletions.
2 changes: 1 addition & 1 deletion e2e/cephfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -2480,7 +2480,7 @@ var _ = Describe(cephfsType, func() {

By("test volumeGroupSnapshot", func() {
scName := "csi-cephfs-sc"
snapshotter, err := newCephFSVolumeGroupSnapshot(f, f.UniqueName, scName, false, deployTimeout, 3)
snapshotter, err := newCephFSVolumeGroupSnapshot(f, f.UniqueName, scName, false, deployTimeout, 3, 0)
if err != nil {
framework.Failf("failed to create volumeGroupSnapshot Base: %v", err)
}
Expand Down
2 changes: 1 addition & 1 deletion e2e/rbd.go
Original file line number Diff line number Diff line change
Expand Up @@ -4866,7 +4866,7 @@ var _ = Describe("RBD", func() {
}

scName := "csi-rbd-sc"
snapshotter, err := newRBDVolumeGroupSnapshot(f, f.UniqueName, scName, false, deployTimeout, 3)
snapshotter, err := newRBDVolumeGroupSnapshot(f, f.UniqueName, scName, false, deployTimeout, 3, 5)
if err != nil {
framework.Failf("failed to create RBDVolumeGroupSnapshot: %v", err)
}
Expand Down
10 changes: 6 additions & 4 deletions e2e/volumegroupsnapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,10 @@ var _ VolumeGroupSnapshotter = &cephFSVolumeGroupSnapshot{}
func newCephFSVolumeGroupSnapshot(f *framework.Framework, namespace,
storageClass string,
blockPVC bool,
timeout, totalPVCCount int,
timeout, totalPVCCount, additionalVGSnapshotCount int,
) (VolumeGroupSnapshotter, error) {
base, err := newVolumeGroupSnapshotBase(f, namespace, storageClass, blockPVC, timeout, totalPVCCount)
base, err := newVolumeGroupSnapshotBase(f, namespace, storageClass, blockPVC,
timeout, totalPVCCount, additionalVGSnapshotCount)
if err != nil {
return nil, fmt.Errorf("failed to create volumeGroupSnapshotterBase: %w", err)
}
Expand Down Expand Up @@ -127,9 +128,10 @@ var _ VolumeGroupSnapshotter = &rbdVolumeGroupSnapshot{}
func newRBDVolumeGroupSnapshot(f *framework.Framework, namespace,
storageClass string,
blockPVC bool,
timeout, totalPVCCount int,
timeout, totalPVCCount, additionalVGSnapshotCount int,
) (VolumeGroupSnapshotter, error) {
base, err := newVolumeGroupSnapshotBase(f, namespace, storageClass, blockPVC, timeout, totalPVCCount)
base, err := newVolumeGroupSnapshotBase(f, namespace, storageClass, blockPVC,
timeout, totalPVCCount, additionalVGSnapshotCount)
if err != nil {
return nil, fmt.Errorf("failed to create volumeGroupSnapshotterBase: %w", err)
}
Expand Down
56 changes: 37 additions & 19 deletions e2e/volumegroupsnapshot_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,19 +73,20 @@ type VolumeGroupSnapshotter interface {
}

type volumeGroupSnapshotterBase struct {
timeout int
framework *framework.Framework
groupclient *groupsnapclient.GroupsnapshotV1alpha1Client
storageClassName string
blockPVC bool
totalPVCCount int
namespace string
timeout int
framework *framework.Framework
groupclient *groupsnapclient.GroupsnapshotV1alpha1Client
storageClassName string
blockPVC bool
totalPVCCount int
additionalVGSnapshotCount int
namespace string
}

func newVolumeGroupSnapshotBase(f *framework.Framework, namespace,
storageClass string,
blockPVC bool,
timeout, totalPVCCount int,
timeout, totalPVCCount, additionalVGSnapshotCount int,
) (*volumeGroupSnapshotterBase, error) {
config, err := framework.LoadConfig()
if err != nil {
Expand All @@ -97,13 +98,14 @@ func newVolumeGroupSnapshotBase(f *framework.Framework, namespace,
}

return &volumeGroupSnapshotterBase{
framework: f,
groupclient: c,
namespace: namespace,
storageClassName: storageClass,
blockPVC: blockPVC,
timeout: timeout,
totalPVCCount: totalPVCCount,
framework: f,
groupclient: c,
namespace: namespace,
storageClassName: storageClass,
blockPVC: blockPVC,
timeout: timeout,
totalPVCCount: totalPVCCount,
additionalVGSnapshotCount: additionalVGSnapshotCount,
}, err
}

Expand Down Expand Up @@ -263,7 +265,7 @@ func (v *volumeGroupSnapshotterBase) DeletePods(pods []*v1.Pod) error {
return nil
}

func (v volumeGroupSnapshotterBase) CreateVolumeGroupSnapshotClass(
func (v *volumeGroupSnapshotterBase) CreateVolumeGroupSnapshotClass(
groupSnapshotClass *groupsnapapi.VolumeGroupSnapshotClass,
) error {
return wait.PollUntilContextTimeout(
Expand All @@ -286,7 +288,7 @@ func (v volumeGroupSnapshotterBase) CreateVolumeGroupSnapshotClass(
})
}

func (v volumeGroupSnapshotterBase) CreateVolumeGroupSnapshot(name,
func (v *volumeGroupSnapshotterBase) CreateVolumeGroupSnapshot(name,
volumeGroupSnapshotClassName string, labels map[string]string,
) (*groupsnapapi.VolumeGroupSnapshot, error) {
namespace := v.namespace
Expand Down Expand Up @@ -356,7 +358,7 @@ func (v volumeGroupSnapshotterBase) CreateVolumeGroupSnapshot(name,
return groupSnapshot, nil
}

func (v volumeGroupSnapshotterBase) DeleteVolumeGroupSnapshot(volumeGroupSnapshotName string) error {
func (v *volumeGroupSnapshotterBase) DeleteVolumeGroupSnapshot(volumeGroupSnapshotName string) error {
namespace := v.namespace
ctx := context.TODO()
err := v.groupclient.VolumeGroupSnapshots(namespace).Delete(
Expand Down Expand Up @@ -395,7 +397,7 @@ func (v volumeGroupSnapshotterBase) DeleteVolumeGroupSnapshot(volumeGroupSnapsho
})
}

func (v volumeGroupSnapshotterBase) DeleteVolumeGroupSnapshotClass(groupSnapshotClassName string) error {
func (v *volumeGroupSnapshotterBase) DeleteVolumeGroupSnapshotClass(groupSnapshotClassName string) error {
ctx := context.TODO()
err := v.groupclient.VolumeGroupSnapshotClasses().Delete(
ctx, groupSnapshotClassName, metav1.DeleteOptions{})
Expand Down Expand Up @@ -456,6 +458,22 @@ func (v *volumeGroupSnapshotterBase) testVolumeGroupSnapshot(vol VolumeGroupSnap
return fmt.Errorf("failed to create volume group snapshot: %w", err)
}

// Create and delete additional group snapshots.
for i := range v.additionalVGSnapshotCount {
newVGSName := fmt.Sprintf("%s-%d", vgsName, i)
_, err = v.CreateVolumeGroupSnapshot(newVGSName, vgscName, pvcLabels)
if err != nil {
return fmt.Errorf("failed to create volume group snapshot %q: %w", newVGSName, err)
}
}
for i := range v.additionalVGSnapshotCount {
newVGSName := fmt.Sprintf("%s-%d", vgsName, i)
err = v.DeleteVolumeGroupSnapshot(newVGSName)
if err != nil {
return fmt.Errorf("failed to delete volume group snapshot %q: %w", newVGSName, err)
}
}

clonePVCs, err := v.CreatePVCClones(volumeGroupSnapshot)
if err != nil {
return fmt.Errorf("failed to create clones: %w", err)
Expand Down
37 changes: 26 additions & 11 deletions internal/rbd/controllerserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -573,7 +573,7 @@ func (cs *ControllerServer) repairExistingVolume(ctx context.Context, req *csi.C
// are more than the `minSnapshotOnImage` Add a task to flatten all the
// temporary cloned images.
func flattenTemporaryClonedImages(ctx context.Context, rbdVol *rbdVolume, cr *util.Credentials) error {
snaps, err := rbdVol.listSnapshots()
snaps, children, err := rbdVol.listSnapAndChildren()
if err != nil {
if errors.Is(err, ErrImageNotFound) {
return status.Error(codes.InvalidArgument, err.Error())
Expand All @@ -589,9 +589,19 @@ func flattenTemporaryClonedImages(ctx context.Context, rbdVol *rbdVolume, cr *ut
len(snaps),
rbdVol,
maxSnapshotsOnImage)

if len(children) == 0 {
// if none of the child images(are in trash) exist, we can't flatten them.
// return ResourceExhausted error message as we have reached the hard limit.
log.ErrorLog(ctx, "child images of image %q cannot be flatten", rbdVol)

return status.Errorf(codes.ResourceExhausted,
"rbd image %q has %d snapshots but child images cannot be flattened",
rbdVol, len(snaps))
}
err = flattenClonedRbdImages(
ctx,
snaps,
children,
rbdVol.Pool,
rbdVol.Monitors,
rbdVol.RbdImageName,
Expand All @@ -610,13 +620,23 @@ func flattenTemporaryClonedImages(ctx context.Context, rbdVol *rbdVolume, cr *ut
len(snaps),
rbdVol,
minSnapshotsOnImageToStartFlatten)
if len(children) == 0 {
// if none of the child images(are in trash) exist, we can't flatten them.
// return nil since we have only reach the soft limit.
log.DebugLog(ctx, "child images of image %q cannot be flatten", rbdVol)

return nil
}
// If we start flattening all the snapshots at one shot the volume
// creation time will be affected,so we will flatten only the extra
// snapshots.
snaps = snaps[minSnapshotsOnImageToStartFlatten-1:]
// snapshots. Use the min of the extra snapshots and the number of children
// to avoid scenario where number of children are less than the extra snapshots.
// This occurs when the child images are in trash and not yet deleted.
extraSnapshots := min((len(snaps) - int(minSnapshotsOnImageToStartFlatten)), len(children))
children = children[:extraSnapshots]
err = flattenClonedRbdImages(
ctx,
snaps,
children,
rbdVol.Pool,
rbdVol.Monitors,
rbdVol.RbdImageName,
Expand Down Expand Up @@ -1157,7 +1177,7 @@ func (cs *ControllerServer) CreateSnapshot(
return cloneFromSnapshot(ctx, rbdVol, rbdSnap, cr, req.GetParameters())
}

err = flattenTemporaryClonedImages(ctx, rbdVol, cr)
err = rbdVol.PrepareVolumeForSnapshot(ctx, cr)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -1376,11 +1396,6 @@ func (cs *ControllerServer) doSnapshotClone(
return cloneRbd, err
}

err = cloneRbd.flattenRbdImage(ctx, false, rbdHardMaxCloneDepth, rbdSoftMaxCloneDepth)
if err != nil {
return cloneRbd, err
}

return cloneRbd, nil
}

Expand Down
23 changes: 23 additions & 0 deletions internal/rbd/group_controllerserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ import (
// group snapshot and remove all images from the group again. This leaves the
// group and its snapshot around, the group snapshot can be inspected to list
// the snapshots of the images.
//
//nolint:gocyclo,cyclop // TODO: reduce complexity.
func (cs *ControllerServer) CreateVolumeGroupSnapshot(
ctx context.Context,
req *csi.CreateVolumeGroupSnapshotRequest,
Expand Down Expand Up @@ -132,6 +134,27 @@ func (cs *ControllerServer) CreateVolumeGroupSnapshot(
"failed to get existing one with name %q: %v", vgsName, err)
}

creds, err := util.NewUserCredentials(req.GetSecrets())
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
defer creds.DeleteCredentials()

errList := make([]error, 0)
for _, volume := range volumes {
err = volume.PrepareVolumeForSnapshot(ctx, creds)
if err != nil {
errList = append(errList, err)
}
}
if len(errList) > 0 {
// FIXME: we should probably choose a error code that has more priority.
return nil, status.Errorf(
status.Code(errList[0]),
"failed to prepare volumes for snapshot: %v",
errList)
}

// create a temporary VolumeGroup with a different name
vg, err = mgr.CreateVolumeGroup(ctx, vgName)
if err != nil {
Expand Down
Loading

0 comments on commit 52a4d3e

Please sign in to comment.