diff --git a/changelogs/unreleased/3845-sseago b/changelogs/unreleased/3845-sseago new file mode 100644 index 0000000000..0abe890fef --- /dev/null +++ b/changelogs/unreleased/3845-sseago @@ -0,0 +1 @@ +Fix CR restore regression introduced in 1.6 restore progress. diff --git a/pkg/restore/restore.go b/pkg/restore/restore.go index 41aee19389..bde7677737 100644 --- a/pkg/restore/restore.go +++ b/pkg/restore/restore.go @@ -377,6 +377,10 @@ func getOrderedResources(resourcePriorities []string, backupResources map[string return append(resourcePriorities, orderedBackupResources...) } +type progressUpdate struct { + totalItems, itemsRestored int +} + func (ctx *restoreContext) execute() (Result, Result) { warnings, errs := Result{}, Result{} @@ -409,14 +413,6 @@ func (ctx *restoreContext) execute() (Result, Result) { } } - selectedResourceCollection, w, e := ctx.getOrderedResourceCollection(backupResources) - warnings.Merge(&w) - errs.Merge(&e) - - type progressUpdate struct { - totalItems, itemsRestored int - } - update := make(chan progressUpdate) quit := make(chan struct{}) @@ -456,94 +452,69 @@ func (ctx *restoreContext) execute() (Result, Result) { }() // totalItems: previously discovered items, i: iteration counter. - totalItems, i, existingNamespaces := 0, 0, sets.NewString() + totalItems, processedItems, existingNamespaces := 0, 0, sets.NewString() + + // First restore CRDs. This is needed so that they are available in the cluster + // when getOrderedResourceCollection is called again on the whole backup and + // needs to validate all resources listed. + crdResourceCollection, processedResources, w, e := ctx.getOrderedResourceCollection( + backupResources, + make([]restoreableResource, 0), + sets.NewString(), + []string{"customresourcedefinitions"}, + false, + ) + warnings.Merge(&w) + errs.Merge(&e) - for _, selectedResource := range selectedResourceCollection { + for _, selectedResource := range crdResourceCollection { totalItems += selectedResource.totalItems } - for _, selectedResource := range selectedResourceCollection { - groupResource := schema.ParseGroupResource(selectedResource.resource) - - for namespace, selectedItems := range selectedResource.selectedItemsByNamespace { - for _, selectedItem := range selectedItems { - // If we don't know whether this namespace exists yet, attempt to create - // it in order to ensure it exists. Try to get it from the backup tarball - // (in order to get any backed-up metadata), but if we don't find it there, - // create a blank one. - if namespace != "" && !existingNamespaces.Has(selectedItem.targetNamespace) { - logger := ctx.log.WithField("namespace", namespace) - - ns := getNamespace( - logger, - archive.GetItemFilePath(ctx.restoreDir, "namespaces", "", namespace), - selectedItem.targetNamespace, - ) - _, nsCreated, err := kube.EnsureNamespaceExistsAndIsReady( - ns, - ctx.namespaceClient, - ctx.resourceTerminatingTimeout, - ) - if err != nil { - errs.AddVeleroError(err) - continue - } - - // Add the newly created namespace to the list of restored items. - if nsCreated { - itemKey := velero.ResourceIdentifier{ - GroupResource: kuberesource.Namespaces, - Namespace: ns.Namespace, - Name: ns.Name, - } - ctx.restoredItems[itemKey] = struct{}{} - } - - // Keep track of namespaces that we know exist so we don't - // have to try to create them multiple times. - existingNamespaces.Insert(selectedItem.targetNamespace) - } + for _, selectedResource := range crdResourceCollection { + var w, e Result + // Restore this resource + processedItems, w, e = ctx.processSelectedResource( + selectedResource, + totalItems, + processedItems, + existingNamespaces, + update, + ) + warnings.Merge(&w) + errs.Merge(&e) + } - obj, err := archive.Unmarshal(ctx.fileSystem, selectedItem.path) - if err != nil { - errs.Add( - selectedItem.targetNamespace, - fmt.Errorf( - "error decoding %q: %v", - strings.Replace(selectedItem.path, ctx.restoreDir+"/", "", -1), - err, - ), - ) - continue - } + // Restore everything else + selectedResourceCollection, _, w, e := ctx.getOrderedResourceCollection( + backupResources, + crdResourceCollection, + processedResources, + ctx.resourcePriorities, + true, + ) + warnings.Merge(&w) + errs.Merge(&e) - w, e := ctx.restoreItem(obj, groupResource, selectedItem.targetNamespace) - warnings.Merge(&w) - errs.Merge(&e) - i++ - - // totalItems keeps the count of items previously known. There - // may be additional items restored by plugins. We want to include - // the additional items by looking at restoredItems at the same - // time, we don't want previously known items counted twice as - // they are present in both restoredItems and totalItems. - actualTotalItems := len(ctx.restoredItems) + (totalItems - i) - update <- progressUpdate{ - totalItems: actualTotalItems, - itemsRestored: len(ctx.restoredItems), - } - } - } + // reset processedItems and totalItems before processing full resource list + processedItems = 0 + totalItems = 0 + for _, selectedResource := range selectedResourceCollection { + totalItems += selectedResource.totalItems + } - // If we just restored custom resource definitions (CRDs), refresh - // discovery because the restored CRDs may have created new APIs that - // didn't previously exist in the cluster, and we want to be able to - // resolve & restore instances of them in subsequent loop iterations. - if groupResource == kuberesource.CustomResourceDefinitions { - if err := ctx.discoveryHelper.Refresh(); err != nil { - warnings.Add("", errors.Wrap(err, "refresh discovery after restoring CRDs")) - } - } + for _, selectedResource := range selectedResourceCollection { + var w, e Result + // Restore this resource + processedItems, w, e = ctx.processSelectedResource( + selectedResource, + totalItems, + processedItems, + existingNamespaces, + update, + ) + warnings.Merge(&w) + errs.Merge(&e) } // Close the progress update channel. @@ -605,6 +576,107 @@ func (ctx *restoreContext) execute() (Result, Result) { return warnings, errs } +// Process and restore one restoreableResource from the backup and update restore progress +// metadata. At this point, the resource has already been validated and counted for inclusion +// in the expected total restore count. +func (ctx *restoreContext) processSelectedResource( + selectedResource restoreableResource, + totalItems int, + processedItems int, + existingNamespaces sets.String, + update chan progressUpdate, +) (int, Result, Result) { + warnings, errs := Result{}, Result{} + groupResource := schema.ParseGroupResource(selectedResource.resource) + + for namespace, selectedItems := range selectedResource.selectedItemsByNamespace { + for _, selectedItem := range selectedItems { + // If we don't know whether this namespace exists yet, attempt to create + // it in order to ensure it exists. Try to get it from the backup tarball + // (in order to get any backed-up metadata), but if we don't find it there, + // create a blank one. + if namespace != "" && !existingNamespaces.Has(selectedItem.targetNamespace) { + logger := ctx.log.WithField("namespace", namespace) + + ns := getNamespace( + logger, + archive.GetItemFilePath(ctx.restoreDir, "namespaces", "", namespace), + selectedItem.targetNamespace, + ) + _, nsCreated, err := kube.EnsureNamespaceExistsAndIsReady( + ns, + ctx.namespaceClient, + ctx.resourceTerminatingTimeout, + ) + if err != nil { + errs.AddVeleroError(err) + continue + } + + // Add the newly created namespace to the list of restored items. + if nsCreated { + itemKey := velero.ResourceIdentifier{ + GroupResource: kuberesource.Namespaces, + Namespace: ns.Namespace, + Name: ns.Name, + } + ctx.restoredItems[itemKey] = struct{}{} + } + + // Keep track of namespaces that we know exist so we don't + // have to try to create them multiple times. + existingNamespaces.Insert(selectedItem.targetNamespace) + } + + obj, err := archive.Unmarshal(ctx.fileSystem, selectedItem.path) + if err != nil { + errs.Add( + selectedItem.targetNamespace, + fmt.Errorf( + "error decoding %q: %v", + strings.Replace(selectedItem.path, ctx.restoreDir+"/", "", -1), + err, + ), + ) + continue + } + + w, e := ctx.restoreItem(obj, groupResource, selectedItem.targetNamespace) + warnings.Merge(&w) + errs.Merge(&e) + processedItems++ + + // totalItems keeps the count of items previously known. There + // may be additional items restored by plugins. We want to include + // the additional items by looking at restoredItems at the same + // time, we don't want previously known items counted twice as + // they are present in both restoredItems and totalItems. + actualTotalItems := len(ctx.restoredItems) + (totalItems - processedItems) + update <- progressUpdate{ + totalItems: actualTotalItems, + itemsRestored: len(ctx.restoredItems), + } + ctx.log.WithFields(map[string]interface{}{ + "progress": "", + "resource": groupResource.String(), + "namespace": selectedItem.targetNamespace, + "name": selectedItem.name, + }).Infof("Restored %d items out of an estimated total of %d (estimate will change throughout the restore)", len(ctx.restoredItems), actualTotalItems) + } + } + + // If we just restored custom resource definitions (CRDs), refresh + // discovery because the restored CRDs may have created new APIs that + // didn't previously exist in the cluster, and we want to be able to + // resolve & restore instances of them in subsequent loop iterations. + if groupResource == kuberesource.CustomResourceDefinitions { + if err := ctx.discoveryHelper.Refresh(); err != nil { + warnings.Add("", errors.Wrap(err, "refresh discovery after restoring CRDs")) + } + } + return processedItems, warnings, errs +} + // getNamespace returns a namespace API object that we should attempt to // create before restoring anything into it. It will come from the backup // tarball if it exists, else will be a new one. If from the tarball, it @@ -1567,10 +1639,14 @@ type restoreableItem struct { // identifiers, applies resource include/exclude criteria, and Kubernetes // selectors to make a list of resources to be actually restored preserving the // original order. -func (ctx *restoreContext) getOrderedResourceCollection(backupResources map[string]*archive.ResourceItems) ([]restoreableResource, Result, Result) { +func (ctx *restoreContext) getOrderedResourceCollection( + backupResources map[string]*archive.ResourceItems, + restoreResourceCollection []restoreableResource, + processedResources sets.String, + resourcePriorities []string, + includeAllResources bool, +) ([]restoreableResource, sets.String, Result, Result) { var warnings, errs Result - processedResources := sets.NewString() - restoreResourceCollection := make([]restoreableResource, 0) // Iterate through an ordered list of resources to restore, checking each // one to see if it should be restored. Note that resources *may* be in this // list twice, i.e. once due to being a prioritized resource, and once due @@ -1585,7 +1661,13 @@ func (ctx *restoreContext) getOrderedResourceCollection(backupResources map[stri // Since we keep track of the fully-resolved group-resources that we *have* // restored, we won't try to restore a resource twice even if it's in the // ordered list twice. - for _, resource := range getOrderedResources(ctx.resourcePriorities, backupResources) { + var resourceList []string + if includeAllResources { + resourceList = getOrderedResources(resourcePriorities, backupResources) + } else { + resourceList = resourcePriorities + } + for _, resource := range resourceList { // try to resolve the resource via discovery to a complete group/version/resource gvr, _, err := ctx.discoveryHelper.ResourceFor(schema.ParseGroupResource(resource).WithVersion("")) if err != nil { @@ -1658,7 +1740,7 @@ func (ctx *restoreContext) getOrderedResourceCollection(backupResources map[stri // record that we've restored the resource processedResources.Insert(groupResource.String()) } - return restoreResourceCollection, warnings, errs + return restoreResourceCollection, processedResources, warnings, errs } // getSelectedRestoreableItems applies Kubernetes selectors on individual items diff --git a/test/e2e/enable_api_group_versions_test.go b/test/e2e/enable_api_group_versions_test.go index b0913a54e7..1195639567 100644 --- a/test/e2e/enable_api_group_versions_test.go +++ b/test/e2e/enable_api_group_versions_test.go @@ -187,6 +187,24 @@ func runEnableAPIGroupVersionsTests(ctx context.Context, client testClient, reso }, }, }, + { + name: "Restore successful when CRD doesn't (yet) exist in target", + srcCrdYaml: "testdata/enable_api_group_versions/case-a-source.yaml", + srcCRs: map[string]string{ + "v1": "testdata/enable_api_group_versions/music_v1_rockband.yaml", + }, + tgtCrdYaml: "", + tgtVer: "v1", + cm: nil, + want: map[string]map[string]string{ + "annotations": { + "rockbands.music.example.io/originalVersion": "v1", + }, + "specs": { + "genre": "60s rock", + }, + }, + }, } for i, tc := range tests { @@ -242,9 +260,12 @@ func runEnableAPIGroupVersionsTests(ctx context.Context, client testClient, reso } } - if err := installCRD(ctx, tc.tgtCrdYaml); err != nil { - deleteNamespacesOnErr(ctx, tc.namespaces) - return errors.Wrapf(err, "install music-system CRD on target cluster") + // Install music-system CRD for target cluster. + if tc.tgtCrdYaml != "" { + if err := installCRD(ctx, tc.tgtCrdYaml); err != nil { + deleteNamespacesOnErr(ctx, tc.namespaces) + return errors.Wrapf(err, "install music-system CRD on target cluster") + } } // Apply config map if there is one. @@ -323,7 +344,9 @@ func runEnableAPIGroupVersionsTests(ctx context.Context, client testClient, reso deleteNamespace(ctx, ns) } _ = deleteCRD(ctx, tc.srcCrdYaml) - _ = deleteCRD(ctx, tc.tgtCrdYaml) + if tc.tgtCrdYaml != "" { + _ = deleteCRD(ctx, tc.tgtCrdYaml) + } } return nil