Skip to content

Commit

Permalink
Remove snapshot related lister, informer and client from backup contr…
Browse files Browse the repository at this point in the history
…oller.

Signed-off-by: Xun Jiang <blackpiglet@gmail.com>
  • Loading branch information
Xun Jiang committed Sep 7, 2022
1 parent c7bd2b9 commit 9a4b816
Show file tree
Hide file tree
Showing 4 changed files with 88 additions and 132 deletions.
2 changes: 1 addition & 1 deletion pkg/backup/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ type Request struct {
VolumeSnapshots []*volume.Snapshot
PodVolumeBackups []*velerov1api.PodVolumeBackup
BackedUpItems map[itemKey]struct{}
CSISnapshots []*snapshotv1api.VolumeSnapshot
CSISnapshots []snapshotv1api.VolumeSnapshot
}

// BackupResourceList returns the list of backed up resources grouped by the API
Expand Down
37 changes: 0 additions & 37 deletions pkg/cmd/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ import (
snapshotv1api "github.com/kubernetes-csi/external-snapshotter/client/v4/apis/volumesnapshot/v1"
snapshotv1client "github.com/kubernetes-csi/external-snapshotter/client/v4/clientset/versioned"
snapshotv1informers "github.com/kubernetes-csi/external-snapshotter/client/v4/informers/externalversions"
snapshotv1listers "github.com/kubernetes-csi/external-snapshotter/client/v4/listers/volumesnapshot/v1"

"github.com/vmware-tanzu/velero/internal/credentials"
"github.com/vmware-tanzu/velero/internal/storage"
Expand Down Expand Up @@ -554,36 +553,6 @@ func (s *server) initRestic() error {
return nil
}

func (s *server) getCSISnapshotListers() (snapshotv1listers.VolumeSnapshotLister, snapshotv1listers.VolumeSnapshotContentLister, snapshotv1listers.VolumeSnapshotClassLister) {
// Make empty listers that will only be populated if CSI is properly enabled.
var vsLister snapshotv1listers.VolumeSnapshotLister
var vscLister snapshotv1listers.VolumeSnapshotContentLister
var vsClassLister snapshotv1listers.VolumeSnapshotClassLister
var err error

// If CSI is enabled, check for the CSI groups and generate the listers
// If CSI isn't enabled, return empty listers.
if features.IsEnabled(velerov1api.CSIFeatureFlag) {
_, err = s.discoveryClient.ServerResourcesForGroupVersion(snapshotv1api.SchemeGroupVersion.String())
switch {
case apierrors.IsNotFound(err):
// CSI is enabled, but the required CRDs aren't installed, so halt.
s.logger.Fatalf("The '%s' feature flag was specified, but CSI API group [%s] was not found.", velerov1api.CSIFeatureFlag, snapshotv1api.SchemeGroupVersion.String())
case err == nil:
// CSI is enabled, and the resources were found.
// Instantiate the listers fully
s.logger.Debug("Creating CSI listers")
// Access the wrapped factory directly here since we've already done the feature flag check above to know it's safe.
vsLister = s.csiSnapshotterSharedInformerFactory.factory.Snapshot().V1().VolumeSnapshots().Lister()
vscLister = s.csiSnapshotterSharedInformerFactory.factory.Snapshot().V1().VolumeSnapshotContents().Lister()
vsClassLister = s.csiSnapshotterSharedInformerFactory.factory.Snapshot().V1().VolumeSnapshotClasses().Lister()
case err != nil:
cmd.CheckError(err)
}
}
return vsLister, vscLister, vsClassLister
}

func (s *server) runControllers(defaultVolumeSnapshotLocations map[string]string) error {
s.logger.Info("Starting controllers")

Expand All @@ -608,8 +577,6 @@ func (s *server) runControllers(defaultVolumeSnapshotLocations map[string]string

backupStoreGetter := persistence.NewObjectBackupStoreGetter(s.credentialFileStore)

csiVSLister, csiVSCLister, csiVSClassLister := s.getCSISnapshotListers()

backupTracker := controller.NewBackupTracker()

backupControllerRunInfo := func() controllerRunInfo {
Expand Down Expand Up @@ -645,10 +612,6 @@ func (s *server) runControllers(defaultVolumeSnapshotLocations map[string]string
defaultVolumeSnapshotLocations,
s.metrics,
s.config.formatFlag.Parse(),
csiVSLister,
s.csiSnapshotClient,
csiVSCLister,
csiVSClassLister,
backupStoreGetter,
)

Expand Down
179 changes: 86 additions & 93 deletions pkg/controller/backup_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,6 @@ import (
"github.com/vmware-tanzu/velero/pkg/util/csi"

snapshotv1api "github.com/kubernetes-csi/external-snapshotter/client/v4/apis/volumesnapshot/v1"
snapshotterClientSet "github.com/kubernetes-csi/external-snapshotter/client/v4/clientset/versioned"
snapshotv1listers "github.com/kubernetes-csi/external-snapshotter/client/v4/listers/volumesnapshot/v1"

"github.com/vmware-tanzu/velero/internal/storage"
velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
Expand Down Expand Up @@ -76,28 +74,24 @@ import (

type backupController struct {
*genericController
discoveryHelper discovery.Helper
backupper pkgbackup.Backupper
lister velerov1listers.BackupLister
client velerov1client.BackupsGetter
kbClient kbclient.Client
clock clock.Clock
backupLogLevel logrus.Level
newPluginManager func(logrus.FieldLogger) clientmgmt.Manager
backupTracker BackupTracker
defaultBackupLocation string
defaultVolumesToRestic bool
defaultBackupTTL time.Duration
defaultCSISnapshotTimeout time.Duration
snapshotLocationLister velerov1listers.VolumeSnapshotLocationLister
defaultSnapshotLocations map[string]string
metrics *metrics.ServerMetrics
backupStoreGetter persistence.ObjectBackupStoreGetter
formatFlag logging.Format
volumeSnapshotLister snapshotv1listers.VolumeSnapshotLister
volumeSnapshotClient *snapshotterClientSet.Clientset
volumeSnapshotContentLister snapshotv1listers.VolumeSnapshotContentLister
volumeSnapshotClassLister snapshotv1listers.VolumeSnapshotClassLister
discoveryHelper discovery.Helper
backupper pkgbackup.Backupper
lister velerov1listers.BackupLister
client velerov1client.BackupsGetter
kbClient kbclient.Client
clock clock.Clock
backupLogLevel logrus.Level
newPluginManager func(logrus.FieldLogger) clientmgmt.Manager
backupTracker BackupTracker
defaultBackupLocation string
defaultVolumesToRestic bool
defaultBackupTTL time.Duration
defaultCSISnapshotTimeout time.Duration
snapshotLocationLister velerov1listers.VolumeSnapshotLocationLister
defaultSnapshotLocations map[string]string
metrics *metrics.ServerMetrics
backupStoreGetter persistence.ObjectBackupStoreGetter
formatFlag logging.Format
}

func NewBackupController(
Expand All @@ -118,36 +112,28 @@ func NewBackupController(
defaultSnapshotLocations map[string]string,
metrics *metrics.ServerMetrics,
formatFlag logging.Format,
volumeSnapshotLister snapshotv1listers.VolumeSnapshotLister,
volumeSnapshotClient *snapshotterClientSet.Clientset,
volumeSnapshotContentLister snapshotv1listers.VolumeSnapshotContentLister,
volumesnapshotClassLister snapshotv1listers.VolumeSnapshotClassLister,
backupStoreGetter persistence.ObjectBackupStoreGetter,
) Interface {
c := &backupController{
genericController: newGenericController(Backup, logger),
discoveryHelper: discoveryHelper,
backupper: backupper,
lister: backupInformer.Lister(),
client: client,
clock: &clock.RealClock{},
backupLogLevel: backupLogLevel,
newPluginManager: newPluginManager,
backupTracker: backupTracker,
kbClient: kbClient,
defaultBackupLocation: defaultBackupLocation,
defaultVolumesToRestic: defaultVolumesToRestic,
defaultBackupTTL: defaultBackupTTL,
defaultCSISnapshotTimeout: defaultCSISnapshotTimeout,
snapshotLocationLister: volumeSnapshotLocationLister,
defaultSnapshotLocations: defaultSnapshotLocations,
metrics: metrics,
formatFlag: formatFlag,
volumeSnapshotLister: volumeSnapshotLister,
volumeSnapshotClient: volumeSnapshotClient,
volumeSnapshotContentLister: volumeSnapshotContentLister,
volumeSnapshotClassLister: volumesnapshotClassLister,
backupStoreGetter: backupStoreGetter,
genericController: newGenericController(Backup, logger),
discoveryHelper: discoveryHelper,
backupper: backupper,
lister: backupInformer.Lister(),
client: client,
clock: &clock.RealClock{},
backupLogLevel: backupLogLevel,
newPluginManager: newPluginManager,
backupTracker: backupTracker,
kbClient: kbClient,
defaultBackupLocation: defaultBackupLocation,
defaultVolumesToRestic: defaultVolumesToRestic,
defaultBackupTTL: defaultBackupTTL,
defaultCSISnapshotTimeout: defaultCSISnapshotTimeout,
snapshotLocationLister: volumeSnapshotLocationLister,
defaultSnapshotLocations: defaultSnapshotLocations,
metrics: metrics,
formatFlag: formatFlag,
backupStoreGetter: backupStoreGetter,
}

c.syncHandler = c.processBackup
Expand Down Expand Up @@ -643,47 +629,51 @@ func (c *backupController) runBackup(backup *pkgbackup.Request) error {

// Empty slices here so that they can be passed in to the persistBackup call later, regardless of whether or not CSI's enabled.
// This way, we only make the Lister call if the feature flag's on.
var volumeSnapshots []*snapshotv1api.VolumeSnapshot
var volumeSnapshotContents []*snapshotv1api.VolumeSnapshotContent
var volumeSnapshotClasses []*snapshotv1api.VolumeSnapshotClass
var volumeSnapshots []snapshotv1api.VolumeSnapshot
var volumeSnapshotContents []snapshotv1api.VolumeSnapshotContent
var volumeSnapshotClasses []snapshotv1api.VolumeSnapshotClass
if features.IsEnabled(velerov1api.CSIFeatureFlag) {
selector := label.NewSelectorForBackup(backup.Name)
// Listers are wrapped in a nil check out of caution, since they may not be populated based on the
// EnableCSI feature flag. This is more to guard against programmer error, as they shouldn't be nil
// when EnableCSI is on.
if c.volumeSnapshotLister != nil {
volumeSnapshots, err = c.volumeSnapshotLister.List(selector)
if err != nil {
backupLog.Error(err)
}
vsList := &snapshotv1api.VolumeSnapshotList{}
vscList := &snapshotv1api.VolumeSnapshotContentList{}

err = c.checkVolumeSnapshotReadyToUse(context.Background(), volumeSnapshots, backup.Spec.CSISnapshotTimeout.Duration)
if err != nil {
backupLog.Errorf("fail to wait VolumeSnapshot change to Ready: %s", err.Error())
}

backup.CSISnapshots = volumeSnapshots
err = c.kbClient.List(context.Background(), vsList, &kbclient.ListOptions{LabelSelector: selector})
if err != nil {
backupLog.Error(err)
}
if len(vsList.Items) >= 0 {
volumeSnapshots = vsList.Items
}
err = c.checkVolumeSnapshotReadyToUse(context.Background(), volumeSnapshots, backup.Spec.CSISnapshotTimeout.Duration)
if err != nil {
backupLog.Errorf("fail to wait VolumeSnapshot change to Ready: %s", err.Error())
}
backup.CSISnapshots = volumeSnapshots

if c.volumeSnapshotContentLister != nil {
volumeSnapshotContents, err = c.volumeSnapshotContentLister.List(selector)
if err != nil {
backupLog.Error(err)
}
err = c.kbClient.List(context.Background(), vscList, &kbclient.ListOptions{LabelSelector: selector})
if err != nil {
backupLog.Error(err)
}
if len(vscList.Items) >= 0 {
volumeSnapshotContents = vscList.Items
}

vsClassSet := sets.NewString()
for _, vsc := range volumeSnapshotContents {
// persist the volumesnapshotclasses referenced by vsc
if c.volumeSnapshotClassLister != nil &&
vsc.Spec.VolumeSnapshotClassName != nil &&
!vsClassSet.Has(*vsc.Spec.VolumeSnapshotClassName) {
if vsClass, err := c.volumeSnapshotClassLister.Get(*vsc.Spec.VolumeSnapshotClassName); err != nil {
if vsc.Spec.VolumeSnapshotClassName != nil && !vsClassSet.Has(*vsc.Spec.VolumeSnapshotClassName) {
vsClass := &snapshotv1api.VolumeSnapshotClass{}
if err := c.kbClient.Get(context.TODO(), kbclient.ObjectKey{Name: *vsc.Spec.VolumeSnapshotClassName}, vsClass); err != nil {
backupLog.Error(err)
} else {
vsClassSet.Insert(*vsc.Spec.VolumeSnapshotClassName)
volumeSnapshotClasses = append(volumeSnapshotClasses, vsClass)
volumeSnapshotClasses = append(volumeSnapshotClasses, *vsClass)
}
}

if err := csi.ResetVolumeSnapshotContent(vsc); err != nil {
backupLog.Error(err)
}
Expand Down Expand Up @@ -787,9 +777,9 @@ func persistBackup(backup *pkgbackup.Request,
backupContents, backupLog *os.File,
backupStore persistence.BackupStore,
log logrus.FieldLogger,
csiVolumeSnapshots []*snapshotv1api.VolumeSnapshot,
csiVolumeSnapshotContents []*snapshotv1api.VolumeSnapshotContent,
csiVolumesnapshotClasses []*snapshotv1api.VolumeSnapshotClass,
csiVolumeSnapshots []snapshotv1api.VolumeSnapshot,
csiVolumeSnapshotContents []snapshotv1api.VolumeSnapshotContent,
csiVolumesnapshotClasses []snapshotv1api.VolumeSnapshotClass,
) []error {
persistErrs := []error{}
backupJSON := new(bytes.Buffer)
Expand Down Expand Up @@ -898,7 +888,7 @@ func encodeToJSONGzip(data interface{}, desc string) (*bytes.Buffer, []error) {
// using goroutine here instead of waiting in CSI plugin, because it's not easy to make BackupItemAction
// parallel by now. After BackupItemAction parallel is implemented, this logic should be moved to CSI plugin
// as https://github.com/vmware-tanzu/velero-plugin-for-csi/pull/100
func (c *backupController) checkVolumeSnapshotReadyToUse(ctx context.Context, volumesnapshots []*snapshotv1api.VolumeSnapshot,
func (c *backupController) checkVolumeSnapshotReadyToUse(ctx context.Context, volumesnapshots []snapshotv1api.VolumeSnapshot,
csiSnapshotTimeout time.Duration) error {
eg, _ := errgroup.WithContext(ctx)
timeout := csiSnapshotTimeout
Expand All @@ -908,7 +898,8 @@ func (c *backupController) checkVolumeSnapshotReadyToUse(ctx context.Context, vo
volumeSnapshot := vs
eg.Go(func() error {
err := wait.PollImmediate(interval, timeout, func() (bool, error) {
tmpVS, err := c.volumeSnapshotClient.SnapshotV1().VolumeSnapshots(volumeSnapshot.Namespace).Get(ctx, volumeSnapshot.Name, metav1.GetOptions{})
tmpVS := &snapshotv1api.VolumeSnapshot{}
err := c.kbClient.Get(ctx, kbclient.ObjectKey{Name: volumeSnapshot.Name, Namespace: volumeSnapshot.Namespace}, tmpVS)
if err != nil {
return false, errors.Wrapf(err, fmt.Sprintf("failed to get volumesnapshot %s/%s", volumeSnapshot.Namespace, volumeSnapshot.Name))
}
Expand All @@ -933,28 +924,29 @@ func (c *backupController) checkVolumeSnapshotReadyToUse(ctx context.Context, vo
// which will cause snapshot deletion on cloud provider, then backup cannot restore the PV.
// If DeletionPolicy is Retain, just delete it. If DeletionPolicy is Delete, need to
// change DeletionPolicy to Retain before deleting VS, then change DeletionPolicy back to Delete.
func (c *backupController) deleteVolumeSnapshot(volumeSnapshots []*snapshotv1api.VolumeSnapshot,
volumeSnapshotContents []*snapshotv1api.VolumeSnapshotContent,
func (c *backupController) deleteVolumeSnapshot(volumeSnapshots []snapshotv1api.VolumeSnapshot,
volumeSnapshotContents []snapshotv1api.VolumeSnapshotContent,
backup pkgbackup.Request, logger logrus.FieldLogger) {
var wg sync.WaitGroup
vscMap := make(map[string]*snapshotv1api.VolumeSnapshotContent)
vscMap := make(map[string]snapshotv1api.VolumeSnapshotContent)
for _, vsc := range volumeSnapshotContents {
vscMap[vsc.Name] = vsc
}

for _, vs := range volumeSnapshots {
wg.Add(1)
go func(vs *snapshotv1api.VolumeSnapshot) {
go func(vs snapshotv1api.VolumeSnapshot) {
defer wg.Done()
var vsc *snapshotv1api.VolumeSnapshotContent
var vsc snapshotv1api.VolumeSnapshotContent
modifyVSCFlag := false
if vs.Status.BoundVolumeSnapshotContentName != nil &&
len(*vs.Status.BoundVolumeSnapshotContentName) > 0 {
vsc = vscMap[*vs.Status.BoundVolumeSnapshotContentName]
if nil == vsc {
var found bool
if vsc, found = vscMap[*vs.Status.BoundVolumeSnapshotContentName]; !found {
logger.Errorf("Not find %s from the vscMap", vs.Status.BoundVolumeSnapshotContentName)
return
}

if vsc.Spec.DeletionPolicy == snapshotv1api.VolumeSnapshotContentDelete {
modifyVSCFlag = true
}
Expand All @@ -968,7 +960,7 @@ func (c *backupController) deleteVolumeSnapshot(volumeSnapshots []*snapshotv1api
logger.Debugf("Patching VolumeSnapshotContent %s", vsc.Name)
original := vsc.DeepCopy()
vsc.Spec.DeletionPolicy = snapshotv1api.VolumeSnapshotContentRetain
if err := c.kbClient.Patch(context.Background(), vsc, kbclient.MergeFrom(original)); err != nil {
if err := c.kbClient.Patch(context.Background(), &vsc, kbclient.MergeFrom(original)); err != nil {
logger.Errorf("fail to modify VolumeSnapshotContent %s DeletionPolicy to Retain: %s", vsc.Name, err.Error())
return
}
Expand All @@ -984,7 +976,7 @@ func (c *backupController) deleteVolumeSnapshot(volumeSnapshots []*snapshotv1api

// Delete VolumeSnapshot from cluster
logger.Debugf("Deleting VolumeSnapshotContent %s", vsc.Name)
err := c.volumeSnapshotClient.SnapshotV1().VolumeSnapshots(vs.Namespace).Delete(context.TODO(), vs.Name, metav1.DeleteOptions{})
err := c.kbClient.Delete(context.TODO(), &vs)
if err != nil {
logger.Errorf("fail to delete VolumeSnapshot %s/%s: %s", vs.Namespace, vs.Name, err.Error())
}
Expand All @@ -999,18 +991,19 @@ func (c *backupController) deleteVolumeSnapshot(volumeSnapshots []*snapshotv1api
// and Source. Source is updated to let csi-controller thinks the VSC is statically provsisioned with VS.
// Set VolumeSnapshotRef's UID to nil will let the csi-controller finds out the related VS is gone, then
// VSC can be deleted.
func (c *backupController) recreateVolumeSnapshotContent(vsc *snapshotv1api.VolumeSnapshotContent) error {
func (c *backupController) recreateVolumeSnapshotContent(vsc snapshotv1api.VolumeSnapshotContent) error {
timeout := 1 * time.Minute
interval := 1 * time.Second

err := c.volumeSnapshotClient.SnapshotV1().VolumeSnapshotContents().Delete(context.TODO(), vsc.Name, metav1.DeleteOptions{})
err := c.kbClient.Delete(context.TODO(), &vsc)
if err != nil {
return errors.Wrapf(err, "fail to delete VolumeSnapshotContent: %s", vsc.Name)
}

// Check VolumeSnapshotContents is already deleted, before re-creating it.
err = wait.PollImmediate(interval, timeout, func() (bool, error) {
_, err := c.volumeSnapshotClient.SnapshotV1().VolumeSnapshotContents().Get(context.TODO(), vsc.Name, metav1.GetOptions{})
tmpVSC := &snapshotv1api.VolumeSnapshotContent{}
err := c.kbClient.Get(context.TODO(), kbclient.ObjectKey{Name: vsc.Name}, tmpVSC)
if err != nil {
if apierrors.IsNotFound(err) {
return true, nil
Expand Down Expand Up @@ -1039,7 +1032,7 @@ func (c *backupController) recreateVolumeSnapshotContent(vsc *snapshotv1api.Volu
}
// ResourceVersion shouldn't exist for new creation.
vsc.ResourceVersion = ""
_, err = c.volumeSnapshotClient.SnapshotV1().VolumeSnapshotContents().Create(context.TODO(), vsc, metav1.CreateOptions{})
err = c.kbClient.Create(context.TODO(), &vsc)
if err != nil {
return errors.Wrapf(err, "fail to create VolumeSnapshotContent %s", vsc.Name)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/util/csi/reset.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
// It will move the snapshot Handle to the source to avoid the snapshot-controller creating a snapshot when it's
// synced by the backup sync controller.
// It will return an error if the snapshot handle is not set, which should not happen when this func is called.
func ResetVolumeSnapshotContent(snapCont *snapshotv1api.VolumeSnapshotContent) error {
func ResetVolumeSnapshotContent(snapCont snapshotv1api.VolumeSnapshotContent) error {
if snapCont.Status != nil && snapCont.Status.SnapshotHandle != nil && len(*snapCont.Status.SnapshotHandle) > 0 {
v := *snapCont.Status.SnapshotHandle
snapCont.Spec.Source = snapshotv1api.VolumeSnapshotContentSource{
Expand Down

0 comments on commit 9a4b816

Please sign in to comment.