Skip to content

Commit

Permalink
Adapt DataImportCron and DataSource to DV GC (#2441)
Browse files Browse the repository at this point in the history
* Adapt DataImportCron and DataSource to DV GC

Signed-off-by: Arnon Gilboa <agilboa@redhat.com>

* Add DIC test for GC

Signed-off-by: Arnon Gilboa <agilboa@redhat.com>

* CR fixes

Signed-off-by: Arnon Gilboa <agilboa@redhat.com>

Signed-off-by: Arnon Gilboa <agilboa@redhat.com>
  • Loading branch information
arnongilboa authored Oct 20, 2022
1 parent ff038fa commit a222359
Show file tree
Hide file tree
Showing 14 changed files with 275 additions and 140 deletions.
192 changes: 99 additions & 93 deletions pkg/controller/dataimportcron-controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,8 @@ const (
AnnNextCronTime = AnnAPIGroup + "/storage.import.nextCronTime"
// AnnLastCronTime is the cron last execution time stamp
AnnLastCronTime = AnnAPIGroup + "/storage.import.lastCronTime"
// AnnLastUseTime is the PVC last use time stamp
AnnLastUseTime = AnnAPIGroup + "/storage.import.lastUseTime"
// AnnLastAppliedConfig is the cron last applied configuration
AnnLastAppliedConfig = AnnAPIGroup + "/lastAppliedConfiguration"

Expand Down Expand Up @@ -295,55 +297,59 @@ func getCronRegistrySource(cron *cdiv1.DataImportCron) (*cdiv1.DataVolumeSourceR
}

func (r *DataImportCronReconciler) update(ctx context.Context, dataImportCron *cdiv1.DataImportCron) (reconcile.Result, error) {
log := r.log.WithName("update")
res := reconcile.Result{}

dv, pvc, err := r.getImportState(ctx, dataImportCron)
if err != nil {
return res, err
}

dataImportCronCopy := dataImportCron.DeepCopy()
importSucceeded := false
imports := dataImportCron.Status.CurrentImports
dataVolume := &cdiv1.DataVolume{}
dvFound := false
if len(imports) > 0 {
// Get the currently imported DataVolume
if err := r.client.Get(ctx, types.NamespacedName{Namespace: dataImportCron.Namespace, Name: imports[0].DataVolumeName}, dataVolume); err != nil {
if !k8serrors.IsNotFound(err) {
return res, err
}
log.Info("DataVolume not found, removing from current imports", "name", imports[0].DataVolumeName)
dataImportCron.Status.CurrentImports = imports[1:]
} else {
dvFound = true
}
}
if dvFound {
switch dataVolume.Status.Phase {
importSucceeded := false
if dv != nil {
switch dv.Status.Phase {
case cdiv1.Succeeded:
importSucceeded = true
if err := updateDataImportCronOnSuccess(dataImportCron); err != nil {
return res, err
}
updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronProgressing, corev1.ConditionFalse, "No current import", noImport)
if err := r.garbageCollectOldImports(ctx, dataImportCron); err != nil {
return res, err
}
case cdiv1.ImportScheduled:
updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronProgressing, corev1.ConditionFalse, "Import is scheduled", scheduled)
case cdiv1.ImportInProgress:
updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronProgressing, corev1.ConditionTrue, "Import is progressing", inProgress)
default:
dvPhase := string(dataVolume.Status.Phase)
dvPhase := string(dv.Status.Phase)
updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronProgressing, corev1.ConditionFalse, fmt.Sprintf("Import DataVolume phase %s", dvPhase), dvPhase)
}
} else if pvc != nil {
importSucceeded = true
pvcCopy := pvc.DeepCopy()
r.setDataImportCronResourceLabels(dataImportCron, pvc)
if !reflect.DeepEqual(pvc, pvcCopy) {
if err := r.client.Update(ctx, pvc); err != nil {
return res, err
}
}
} else {
if len(imports) > 0 {
dataImportCron.Status.CurrentImports = imports[1:]
}
updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronProgressing, corev1.ConditionFalse, "No current import", noImport)
}

if importSucceeded {
if err := updateDataImportCronOnSuccess(dataImportCron); err != nil {
return res, err
}
updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronProgressing, corev1.ConditionFalse, "No current import", noImport)
if err := r.garbageCollectOldImports(ctx, dataImportCron); err != nil {
return res, err
}
}

if err := r.updateDataSource(ctx, dataImportCron); err != nil {
return res, err
}

// We use the poller returned reconcile.Result for RequeueAfter if needed
var err error
if isImageStreamSource(dataImportCron) {
res, err = r.pollImageStreamDigest(ctx, dataImportCron)
if err != nil {
Expand All @@ -355,8 +361,8 @@ func (r *DataImportCronReconciler) update(ctx context.Context, dataImportCron *c
digestUpdated := desiredDigest != "" && (len(imports) == 0 || desiredDigest != imports[0].Digest)
if digestUpdated {
updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronUpToDate, corev1.ConditionFalse, "Source digest updated since last import", outdated)
if dvFound {
if err := r.deleteErroneousDataVolume(ctx, dataImportCron, dataVolume); err != nil {
if dv != nil {
if err := r.deleteErroneousDataVolume(ctx, dataImportCron, dv); err != nil {
return res, err
}
}
Expand Down Expand Up @@ -385,6 +391,30 @@ func (r *DataImportCronReconciler) update(ctx context.Context, dataImportCron *c
return res, nil
}

// Returns the current import DV if exists, otherwise returns the last imported PVC
func (r *DataImportCronReconciler) getImportState(ctx context.Context, cron *cdiv1.DataImportCron) (*cdiv1.DataVolume, *corev1.PersistentVolumeClaim, error) {
imports := cron.Status.CurrentImports
if len(imports) == 0 {
return nil, nil, nil
}

dvName := imports[0].DataVolumeName
dv := &cdiv1.DataVolume{}
if err := r.client.Get(ctx, types.NamespacedName{Namespace: cron.Namespace, Name: dvName}, dv); err == nil {
return dv, nil, nil
} else if !k8serrors.IsNotFound(err) {
return nil, nil, err
}

pvc := &corev1.PersistentVolumeClaim{}
if err := r.client.Get(ctx, types.NamespacedName{Namespace: cron.Namespace, Name: dvName}, pvc); err == nil {
return nil, pvc, nil
} else if !k8serrors.IsNotFound(err) {
return nil, nil, err
}
return nil, nil, nil
}

func (r *DataImportCronReconciler) deleteErroneousDataVolume(ctx context.Context, cron *cdiv1.DataImportCron, dv *cdiv1.DataVolume) error {
log := r.log.WithValues("name", dv.Name).WithValues("uid", dv.UID)
if cond := findConditionByType(cdiv1.DataVolumeRunning, dv.Status.Conditions); cond != nil {
Expand Down Expand Up @@ -490,13 +520,12 @@ func updateLastExecutionTimestamp(cron *cdiv1.DataImportCron) error {
return err
}
if ts := cron.Status.LastExecutionTimestamp; ts == nil || ts.Time != lastTime {
cron.Status.LastExecutionTimestamp = &metav1.Time{lastTime}
cron.Status.LastExecutionTimestamp = &metav1.Time{Time: lastTime}
}
return nil
}

func (r *DataImportCronReconciler) createImportDataVolume(ctx context.Context, dataImportCron *cdiv1.DataImportCron) error {
log := r.log.WithName("createImportDataVolume")
dataSourceName := dataImportCron.Spec.ManagedDataSource
digest := dataImportCron.Annotations[AnnSourceDesiredDigest]
if digest == "" {
Expand All @@ -506,70 +535,59 @@ func (r *DataImportCronReconciler) createImportDataVolume(ctx context.Context, d
if err != nil {
return err
}
dv := r.newSourceDataVolume(dataImportCron, dvName)
if err := r.client.Create(ctx, dv); err != nil {
if !k8serrors.IsAlreadyExists(err) {
// If PVC exists don't create DV
pvc := &corev1.PersistentVolumeClaim{}
if err = r.client.Get(ctx, types.NamespacedName{Namespace: dataImportCron.Namespace, Name: dvName}, pvc); err != nil {
if !k8serrors.IsNotFound(err) {
return err
}
if err := r.client.Get(ctx, types.NamespacedName{Namespace: dv.Namespace, Name: dv.Name}, dv); err != nil {
dv := r.newSourceDataVolume(dataImportCron, dvName)
if err := r.client.Create(ctx, dv); err != nil && !k8serrors.IsAlreadyExists(err) {
return err
}
// Touch the DV Ready condition heartbeat time, so DV wan't be garbage collected
if cond := findConditionByType(cdiv1.DataVolumeReady, dv.Status.Conditions); cond != nil {
cond.LastHeartbeatTime = metav1.Now()
if err := r.client.Update(ctx, dv); err != nil {
return err
}
}
log.Info("DataVolume already exists", "name", dv.Name, "uid", dv.UID)
} else {
log.Info("DataVolume created", "name", dv.Name, "uid", dv.UID)
AddAnnotation(pvc, AnnLastUseTime, time.Now().Format(time.RFC3339Nano))
if err := r.client.Update(ctx, pvc); err != nil {
return err
}
}
// Update references to current import
dataImportCron.Status.CurrentImports = []cdiv1.ImportStatus{{DataVolumeName: dvName, Digest: digest}}
return nil
}

func (r *DataImportCronReconciler) garbageCollectOldImports(ctx context.Context, dataImportCron *cdiv1.DataImportCron) error {
log := r.log.WithName("garbageCollectOldImports")
if dataImportCron.Spec.GarbageCollect != nil && *dataImportCron.Spec.GarbageCollect != cdiv1.DataImportCronGarbageCollectOutdated {
func (r *DataImportCronReconciler) garbageCollectOldImports(ctx context.Context, cron *cdiv1.DataImportCron) error {
if cron.Spec.GarbageCollect != nil && *cron.Spec.GarbageCollect != cdiv1.DataImportCronGarbageCollectOutdated {
return nil
}
selector, err := getSelector(map[string]string{common.DataImportCronLabel: dataImportCron.Name})
selector, err := getSelector(map[string]string{common.DataImportCronLabel: cron.Name})
if err != nil {
return err
}
dvList := &cdiv1.DataVolumeList{}
if err := r.client.List(ctx, dvList, &client.ListOptions{Namespace: dataImportCron.Namespace, LabelSelector: selector}); err != nil {
return err
}
maxDvs := defaultImportsToKeepPerCron
importsToKeep := dataImportCron.Spec.ImportsToKeep
if importsToKeep != nil && *importsToKeep >= 0 {
maxDvs = int(*importsToKeep)

maxImports := defaultImportsToKeepPerCron

if cron.Spec.ImportsToKeep != nil && *cron.Spec.ImportsToKeep >= 0 {
maxImports = int(*cron.Spec.ImportsToKeep)
}
if len(dvList.Items) <= maxDvs {
return nil

pvcList := &corev1.PersistentVolumeClaimList{}
if err := r.client.List(ctx, pvcList, &client.ListOptions{Namespace: cron.Namespace, LabelSelector: selector}); err != nil {
return err
}
sort.Slice(dvList.Items, func(i, j int) bool {
getDvTimestamp := func(dv cdiv1.DataVolume) time.Time {
if cond := findConditionByType(cdiv1.DataVolumeReady, dv.Status.Conditions); cond != nil {
return cond.LastHeartbeatTime.Time
if len(pvcList.Items) > maxImports {
sort.Slice(pvcList.Items, func(i, j int) bool {
return pvcList.Items[i].Annotations[AnnLastUseTime] > pvcList.Items[j].Annotations[AnnLastUseTime]
})
for _, pvc := range pvcList.Items[maxImports:] {
dv := cdiv1.DataVolume{ObjectMeta: metav1.ObjectMeta{Name: pvc.Name, Namespace: pvc.Namespace}}
if err := r.client.Delete(ctx, &dv); err == nil {
continue
} else if !k8serrors.IsNotFound(err) {
return err
}
return dv.CreationTimestamp.Time
}
return getDvTimestamp(dvList.Items[i]).After(getDvTimestamp(dvList.Items[j]))
})
for _, dv := range dvList.Items[maxDvs:] {
logDv := log.WithValues("name", dv.Name).WithValues("uid", dv.UID)
if err := r.client.Delete(ctx, &dv); err != nil {
if k8serrors.IsNotFound(err) {
logDv.Info("DataVolume not found for deletion")
} else {
logDv.Error(err, "Unable to delete DataVolume")
if err := r.client.Delete(ctx, &pvc); err != nil && !k8serrors.IsNotFound(err) {
return err
}
} else {
logDv.Info("DataVolume deleted")
}
}
return nil
Expand All @@ -581,31 +599,20 @@ func (r *DataImportCronReconciler) cleanup(ctx context.Context, cron types.Names
if err := r.deleteJobs(ctx, cron); err != nil {
return err
}

selector, err := getSelector(map[string]string{common.DataImportCronLabel: cron.Name, common.DataImportCronCleanupLabel: "true"})
if err != nil {
return err
}
dataSourceList := &cdiv1.DataSourceList{}
if err := r.client.List(ctx, dataSourceList, &client.ListOptions{Namespace: cron.Namespace, LabelSelector: selector}); err != nil {
opts := &client.DeleteAllOfOptions{ListOptions: client.ListOptions{Namespace: cron.Namespace, LabelSelector: selector}}
if err := r.client.DeleteAllOf(ctx, &cdiv1.DataSource{}, opts); err != nil {
return err
}
for _, dataSource := range dataSourceList.Items {
if err := r.client.Delete(ctx, &dataSource); IgnoreNotFound(err) != nil {
return err
}
}

dvList := &cdiv1.DataVolumeList{}
if err := r.client.List(ctx, dvList, &client.ListOptions{Namespace: cron.Namespace, LabelSelector: selector}); err != nil {
if err := r.client.DeleteAllOf(ctx, &cdiv1.DataVolume{}, opts); err != nil {
return err
}
for _, dv := range dvList.Items {
if err := r.client.Delete(ctx, &dv); IgnoreNotFound(err) != nil {
return err
}
if err := r.client.DeleteAllOf(ctx, &corev1.PersistentVolumeClaim{}, opts); err != nil {
return err
}

return nil
}

Expand Down Expand Up @@ -909,7 +916,6 @@ func (r *DataImportCronReconciler) newSourceDataVolume(cron *cdiv1.DataImportCro
r.setDataImportCronResourceLabels(cron, dv)
passCronAnnotationToDv(cron, dv, AnnImmediateBinding)
passCronAnnotationToDv(cron, dv, AnnPodRetainAfterCompletion)
AddAnnotation(dv, AnnDeleteAfterCompletion, "false")
return dv
}

Expand Down
Loading

0 comments on commit a222359

Please sign in to comment.