Skip to content

Commit

Permalink
regression introduced in 1.6 restore progress: fix CR restore (vmware…
Browse files Browse the repository at this point in the history
…-tanzu#3845)

Signed-off-by: Scott Seago <sseago@redhat.com>
  • Loading branch information
sseago authored Jun 19, 2021
1 parent d871370 commit 962a957
Show file tree
Hide file tree
Showing 3 changed files with 204 additions and 98 deletions.
1 change: 1 addition & 0 deletions changelogs/unreleased/3845-sseago
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix CR restore regression introduced in 1.6 restore progress.
270 changes: 176 additions & 94 deletions pkg/restore/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,10 @@ func getOrderedResources(resourcePriorities []string, backupResources map[string
return append(resourcePriorities, orderedBackupResources...)
}

type progressUpdate struct {
totalItems, itemsRestored int
}

func (ctx *restoreContext) execute() (Result, Result) {
warnings, errs := Result{}, Result{}

Expand Down Expand Up @@ -409,14 +413,6 @@ func (ctx *restoreContext) execute() (Result, Result) {
}
}

selectedResourceCollection, w, e := ctx.getOrderedResourceCollection(backupResources)
warnings.Merge(&w)
errs.Merge(&e)

type progressUpdate struct {
totalItems, itemsRestored int
}

update := make(chan progressUpdate)

quit := make(chan struct{})
Expand Down Expand Up @@ -456,94 +452,69 @@ func (ctx *restoreContext) execute() (Result, Result) {
}()

// totalItems: previously discovered items, i: iteration counter.
totalItems, i, existingNamespaces := 0, 0, sets.NewString()
totalItems, processedItems, existingNamespaces := 0, 0, sets.NewString()

// First restore CRDs. This is needed so that they are available in the cluster
// when getOrderedResourceCollection is called again on the whole backup and
// needs to validate all resources listed.
crdResourceCollection, processedResources, w, e := ctx.getOrderedResourceCollection(
backupResources,
make([]restoreableResource, 0),
sets.NewString(),
[]string{"customresourcedefinitions"},
false,
)
warnings.Merge(&w)
errs.Merge(&e)

for _, selectedResource := range selectedResourceCollection {
for _, selectedResource := range crdResourceCollection {
totalItems += selectedResource.totalItems
}

for _, selectedResource := range selectedResourceCollection {
groupResource := schema.ParseGroupResource(selectedResource.resource)

for namespace, selectedItems := range selectedResource.selectedItemsByNamespace {
for _, selectedItem := range selectedItems {
// If we don't know whether this namespace exists yet, attempt to create
// it in order to ensure it exists. Try to get it from the backup tarball
// (in order to get any backed-up metadata), but if we don't find it there,
// create a blank one.
if namespace != "" && !existingNamespaces.Has(selectedItem.targetNamespace) {
logger := ctx.log.WithField("namespace", namespace)

ns := getNamespace(
logger,
archive.GetItemFilePath(ctx.restoreDir, "namespaces", "", namespace),
selectedItem.targetNamespace,
)
_, nsCreated, err := kube.EnsureNamespaceExistsAndIsReady(
ns,
ctx.namespaceClient,
ctx.resourceTerminatingTimeout,
)
if err != nil {
errs.AddVeleroError(err)
continue
}

// Add the newly created namespace to the list of restored items.
if nsCreated {
itemKey := velero.ResourceIdentifier{
GroupResource: kuberesource.Namespaces,
Namespace: ns.Namespace,
Name: ns.Name,
}
ctx.restoredItems[itemKey] = struct{}{}
}

// Keep track of namespaces that we know exist so we don't
// have to try to create them multiple times.
existingNamespaces.Insert(selectedItem.targetNamespace)
}
for _, selectedResource := range crdResourceCollection {
var w, e Result
// Restore this resource
processedItems, w, e = ctx.processSelectedResource(
selectedResource,
totalItems,
processedItems,
existingNamespaces,
update,
)
warnings.Merge(&w)
errs.Merge(&e)
}

obj, err := archive.Unmarshal(ctx.fileSystem, selectedItem.path)
if err != nil {
errs.Add(
selectedItem.targetNamespace,
fmt.Errorf(
"error decoding %q: %v",
strings.Replace(selectedItem.path, ctx.restoreDir+"/", "", -1),
err,
),
)
continue
}
// Restore everything else
selectedResourceCollection, _, w, e := ctx.getOrderedResourceCollection(
backupResources,
crdResourceCollection,
processedResources,
ctx.resourcePriorities,
true,
)
warnings.Merge(&w)
errs.Merge(&e)

w, e := ctx.restoreItem(obj, groupResource, selectedItem.targetNamespace)
warnings.Merge(&w)
errs.Merge(&e)
i++

// totalItems keeps the count of items previously known. There
// may be additional items restored by plugins. We want to include
// the additional items by looking at restoredItems at the same
// time, we don't want previously known items counted twice as
// they are present in both restoredItems and totalItems.
actualTotalItems := len(ctx.restoredItems) + (totalItems - i)
update <- progressUpdate{
totalItems: actualTotalItems,
itemsRestored: len(ctx.restoredItems),
}
}
}
// reset processedItems and totalItems before processing full resource list
processedItems = 0
totalItems = 0
for _, selectedResource := range selectedResourceCollection {
totalItems += selectedResource.totalItems
}

// If we just restored custom resource definitions (CRDs), refresh
// discovery because the restored CRDs may have created new APIs that
// didn't previously exist in the cluster, and we want to be able to
// resolve & restore instances of them in subsequent loop iterations.
if groupResource == kuberesource.CustomResourceDefinitions {
if err := ctx.discoveryHelper.Refresh(); err != nil {
warnings.Add("", errors.Wrap(err, "refresh discovery after restoring CRDs"))
}
}
for _, selectedResource := range selectedResourceCollection {
var w, e Result
// Restore this resource
processedItems, w, e = ctx.processSelectedResource(
selectedResource,
totalItems,
processedItems,
existingNamespaces,
update,
)
warnings.Merge(&w)
errs.Merge(&e)
}

// Close the progress update channel.
Expand Down Expand Up @@ -605,6 +576,107 @@ func (ctx *restoreContext) execute() (Result, Result) {
return warnings, errs
}

// Process and restore one restoreableResource from the backup and update restore progress
// metadata. At this point, the resource has already been validated and counted for inclusion
// in the expected total restore count.
func (ctx *restoreContext) processSelectedResource(
selectedResource restoreableResource,
totalItems int,
processedItems int,
existingNamespaces sets.String,
update chan progressUpdate,
) (int, Result, Result) {
warnings, errs := Result{}, Result{}
groupResource := schema.ParseGroupResource(selectedResource.resource)

for namespace, selectedItems := range selectedResource.selectedItemsByNamespace {
for _, selectedItem := range selectedItems {
// If we don't know whether this namespace exists yet, attempt to create
// it in order to ensure it exists. Try to get it from the backup tarball
// (in order to get any backed-up metadata), but if we don't find it there,
// create a blank one.
if namespace != "" && !existingNamespaces.Has(selectedItem.targetNamespace) {
logger := ctx.log.WithField("namespace", namespace)

ns := getNamespace(
logger,
archive.GetItemFilePath(ctx.restoreDir, "namespaces", "", namespace),
selectedItem.targetNamespace,
)
_, nsCreated, err := kube.EnsureNamespaceExistsAndIsReady(
ns,
ctx.namespaceClient,
ctx.resourceTerminatingTimeout,
)
if err != nil {
errs.AddVeleroError(err)
continue
}

// Add the newly created namespace to the list of restored items.
if nsCreated {
itemKey := velero.ResourceIdentifier{
GroupResource: kuberesource.Namespaces,
Namespace: ns.Namespace,
Name: ns.Name,
}
ctx.restoredItems[itemKey] = struct{}{}
}

// Keep track of namespaces that we know exist so we don't
// have to try to create them multiple times.
existingNamespaces.Insert(selectedItem.targetNamespace)
}

obj, err := archive.Unmarshal(ctx.fileSystem, selectedItem.path)
if err != nil {
errs.Add(
selectedItem.targetNamespace,
fmt.Errorf(
"error decoding %q: %v",
strings.Replace(selectedItem.path, ctx.restoreDir+"/", "", -1),
err,
),
)
continue
}

w, e := ctx.restoreItem(obj, groupResource, selectedItem.targetNamespace)
warnings.Merge(&w)
errs.Merge(&e)
processedItems++

// totalItems keeps the count of items previously known. There
// may be additional items restored by plugins. We want to include
// the additional items by looking at restoredItems at the same
// time, we don't want previously known items counted twice as
// they are present in both restoredItems and totalItems.
actualTotalItems := len(ctx.restoredItems) + (totalItems - processedItems)
update <- progressUpdate{
totalItems: actualTotalItems,
itemsRestored: len(ctx.restoredItems),
}
ctx.log.WithFields(map[string]interface{}{
"progress": "",
"resource": groupResource.String(),
"namespace": selectedItem.targetNamespace,
"name": selectedItem.name,
}).Infof("Restored %d items out of an estimated total of %d (estimate will change throughout the restore)", len(ctx.restoredItems), actualTotalItems)
}
}

// If we just restored custom resource definitions (CRDs), refresh
// discovery because the restored CRDs may have created new APIs that
// didn't previously exist in the cluster, and we want to be able to
// resolve & restore instances of them in subsequent loop iterations.
if groupResource == kuberesource.CustomResourceDefinitions {
if err := ctx.discoveryHelper.Refresh(); err != nil {
warnings.Add("", errors.Wrap(err, "refresh discovery after restoring CRDs"))
}
}
return processedItems, warnings, errs
}

// getNamespace returns a namespace API object that we should attempt to
// create before restoring anything into it. It will come from the backup
// tarball if it exists, else will be a new one. If from the tarball, it
Expand Down Expand Up @@ -1567,10 +1639,14 @@ type restoreableItem struct {
// identifiers, applies resource include/exclude criteria, and Kubernetes
// selectors to make a list of resources to be actually restored preserving the
// original order.
func (ctx *restoreContext) getOrderedResourceCollection(backupResources map[string]*archive.ResourceItems) ([]restoreableResource, Result, Result) {
func (ctx *restoreContext) getOrderedResourceCollection(
backupResources map[string]*archive.ResourceItems,
restoreResourceCollection []restoreableResource,
processedResources sets.String,
resourcePriorities []string,
includeAllResources bool,
) ([]restoreableResource, sets.String, Result, Result) {
var warnings, errs Result
processedResources := sets.NewString()
restoreResourceCollection := make([]restoreableResource, 0)
// Iterate through an ordered list of resources to restore, checking each
// one to see if it should be restored. Note that resources *may* be in this
// list twice, i.e. once due to being a prioritized resource, and once due
Expand All @@ -1585,7 +1661,13 @@ func (ctx *restoreContext) getOrderedResourceCollection(backupResources map[stri
// Since we keep track of the fully-resolved group-resources that we *have*
// restored, we won't try to restore a resource twice even if it's in the
// ordered list twice.
for _, resource := range getOrderedResources(ctx.resourcePriorities, backupResources) {
var resourceList []string
if includeAllResources {
resourceList = getOrderedResources(resourcePriorities, backupResources)
} else {
resourceList = resourcePriorities
}
for _, resource := range resourceList {
// try to resolve the resource via discovery to a complete group/version/resource
gvr, _, err := ctx.discoveryHelper.ResourceFor(schema.ParseGroupResource(resource).WithVersion(""))
if err != nil {
Expand Down Expand Up @@ -1658,7 +1740,7 @@ func (ctx *restoreContext) getOrderedResourceCollection(backupResources map[stri
// record that we've restored the resource
processedResources.Insert(groupResource.String())
}
return restoreResourceCollection, warnings, errs
return restoreResourceCollection, processedResources, warnings, errs
}

// getSelectedRestoreableItems applies Kubernetes selectors on individual items
Expand Down
31 changes: 27 additions & 4 deletions test/e2e/enable_api_group_versions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,24 @@ func runEnableAPIGroupVersionsTests(ctx context.Context, client testClient, reso
},
},
},
{
name: "Restore successful when CRD doesn't (yet) exist in target",
srcCrdYaml: "testdata/enable_api_group_versions/case-a-source.yaml",
srcCRs: map[string]string{
"v1": "testdata/enable_api_group_versions/music_v1_rockband.yaml",
},
tgtCrdYaml: "",
tgtVer: "v1",
cm: nil,
want: map[string]map[string]string{
"annotations": {
"rockbands.music.example.io/originalVersion": "v1",
},
"specs": {
"genre": "60s rock",
},
},
},
}

for i, tc := range tests {
Expand Down Expand Up @@ -242,9 +260,12 @@ func runEnableAPIGroupVersionsTests(ctx context.Context, client testClient, reso
}
}

if err := installCRD(ctx, tc.tgtCrdYaml); err != nil {
deleteNamespacesOnErr(ctx, tc.namespaces)
return errors.Wrapf(err, "install music-system CRD on target cluster")
// Install music-system CRD for target cluster.
if tc.tgtCrdYaml != "" {
if err := installCRD(ctx, tc.tgtCrdYaml); err != nil {
deleteNamespacesOnErr(ctx, tc.namespaces)
return errors.Wrapf(err, "install music-system CRD on target cluster")
}
}

// Apply config map if there is one.
Expand Down Expand Up @@ -323,7 +344,9 @@ func runEnableAPIGroupVersionsTests(ctx context.Context, client testClient, reso
deleteNamespace(ctx, ns)
}
_ = deleteCRD(ctx, tc.srcCrdYaml)
_ = deleteCRD(ctx, tc.tgtCrdYaml)
if tc.tgtCrdYaml != "" {
_ = deleteCRD(ctx, tc.tgtCrdYaml)
}
}

return nil
Expand Down

0 comments on commit 962a957

Please sign in to comment.