From f11c99062fd5d1a6582457edf34f882bf31e7abe Mon Sep 17 00:00:00 2001 From: BornChanger <97348524+BornChanger@users.noreply.github.com> Date: Sat, 11 Nov 2023 13:24:43 +0800 Subject: [PATCH] This is an automated cherry-pick of #48506 Signed-off-by: ti-chi-bot --- br/pkg/aws/ebs.go | 166 ++++++++++++++++++++++++++++++++ br/pkg/task/restore_ebs_meta.go | 10 ++ 2 files changed, 176 insertions(+) diff --git a/br/pkg/aws/ebs.go b/br/pkg/aws/ebs.go index ddea6b358f556..61c7b294db774 100644 --- a/br/pkg/aws/ebs.go +++ b/br/pkg/aws/ebs.go @@ -27,6 +27,7 @@ import ( const ( pollingPendingSnapshotInterval = 30 * time.Second errCodeTooManyPendingSnapshots = "PendingSnapshotLimitExceeded" + FsrApiSnapshotsThreshold = 10 ) type EC2Session struct { @@ -281,6 +282,171 @@ func (e *EC2Session) DeleteSnapshots(snapIDMap map[string]string) { log.Info("delete snapshot end", zap.Int("need-to-del", len(snapIDMap)), zap.Int32("deleted", deletedCnt.Load())) } +<<<<<<< HEAD +======= +// EnableDataFSR enables FSR for data volume snapshots +func (e *EC2Session) EnableDataFSR(meta *config.EBSBasedBRMeta, targetAZ string) (map[string][]*string, error) { + snapshotsIDsMap := fetchTargetSnapshots(meta, targetAZ) + + if len(snapshotsIDsMap) == 0 { + return snapshotsIDsMap, errors.Errorf("empty backup meta") + } + + eg, _ := errgroup.WithContext(context.Background()) + + for availableZone := range snapshotsIDsMap { + targetAZ := availableZone + // We have to control the batch size to avoid the error of "parameter SourceSnapshotIds must be less than or equal to 10" + for i := 0; i < len(snapshotsIDsMap[targetAZ]); i += FsrApiSnapshotsThreshold { + start := i + end := i + FsrApiSnapshotsThreshold + if end > len(snapshotsIDsMap[targetAZ]) { + end = len(snapshotsIDsMap[targetAZ]) + } + eg.Go(func() error { + log.Info("enable fsr for snapshots", zap.String("available zone", targetAZ), zap.Any("snapshots", snapshotsIDsMap[targetAZ][start:end])) + resp, err := e.ec2.EnableFastSnapshotRestores(&ec2.EnableFastSnapshotRestoresInput{ + AvailabilityZones: []*string{&targetAZ}, + SourceSnapshotIds: snapshotsIDsMap[targetAZ][start:end], + }) + + if err != nil { + return errors.Trace(err) + } + + if len(resp.Unsuccessful) > 0 { + log.Warn("not all snapshots enabled FSR") + return errors.Errorf("Some snapshot fails to enable FSR for available zone %s, such as %s, error code is %v", targetAZ, *resp.Unsuccessful[0].SnapshotId, resp.Unsuccessful[0].FastSnapshotRestoreStateErrors) + } + + return e.waitDataFSREnabled(snapshotsIDsMap[targetAZ][start:end], targetAZ) + }) + } + } + return snapshotsIDsMap, eg.Wait() +} + +// waitDataFSREnabled waits FSR for data volume snapshots are all enabled +func (e *EC2Session) waitDataFSREnabled(snapShotIDs []*string, targetAZ string) error { + // Create a map to store the strings as keys + pendingSnapshots := make(map[string]struct{}) + + // Populate the map with the strings from the array + for _, str := range snapShotIDs { + pendingSnapshots[*str] = struct{}{} + } + + log.Info("starts check fsr pending snapshots", zap.Any("snapshots", pendingSnapshots), zap.String("available zone", targetAZ)) + for { + if len(pendingSnapshots) == 0 { + log.Info("all snapshots in current batch fsr enablement is finished", zap.String("available zone", targetAZ), zap.Any("snapshots", snapShotIDs)) + return nil + } + + // check pending snapshots every 1 minute + time.Sleep(1 * time.Minute) + log.Info("check snapshots not fsr enabled", zap.Int("count", len(pendingSnapshots))) + input := &ec2.DescribeFastSnapshotRestoresInput{ + Filters: []*ec2.Filter{ + { + Name: aws.String("state"), + Values: []*string{aws.String("disabled"), aws.String("disabling"), aws.String("enabling"), aws.String("optimizing")}, + }, + { + Name: aws.String("availability-zone"), + Values: []*string{aws.String(targetAZ)}, + }, + }, + } + + result, err := e.ec2.DescribeFastSnapshotRestores(input) + if err != nil { + return errors.Trace(err) + } + + uncompletedSnapshots := make(map[string]struct{}) + for _, fastRestore := range result.FastSnapshotRestores { + _, found := pendingSnapshots[*fastRestore.SnapshotId] + if found { + // Detect some conflict states + if strings.EqualFold(*fastRestore.State, "disabled") || strings.EqualFold(*fastRestore.State, "disabling") { + log.Error("detect conflict status", zap.String("snapshot", *fastRestore.SnapshotId), zap.String("status", *fastRestore.State)) + return errors.Errorf("status of snapshot %s is %s ", *fastRestore.SnapshotId, *fastRestore.State) + } + uncompletedSnapshots[*fastRestore.SnapshotId] = struct{}{} + } + } + pendingSnapshots = uncompletedSnapshots + } +} + +// DisableDataFSR disables FSR for data volume snapshots +func (e *EC2Session) DisableDataFSR(snapshotsIDsMap map[string][]*string) error { + if len(snapshotsIDsMap) == 0 { + return nil + } + + eg, _ := errgroup.WithContext(context.Background()) + + for availableZone := range snapshotsIDsMap { + targetAZ := availableZone + // We have to control the batch size to avoid the error of "parameter SourceSnapshotIds must be less than or equal to 10" + for i := 0; i < len(snapshotsIDsMap[targetAZ]); i += FsrApiSnapshotsThreshold { + start := i + end := i + FsrApiSnapshotsThreshold + if end > len(snapshotsIDsMap[targetAZ]) { + end = len(snapshotsIDsMap[targetAZ]) + } + eg.Go(func() error { + resp, err := e.ec2.DisableFastSnapshotRestores(&ec2.DisableFastSnapshotRestoresInput{ + AvailabilityZones: []*string{&targetAZ}, + SourceSnapshotIds: snapshotsIDsMap[targetAZ][start:end], + }) + + if err != nil { + return errors.Trace(err) + } + + if len(resp.Unsuccessful) > 0 { + log.Warn("not all snapshots disabled FSR", zap.String("available zone", targetAZ)) + return errors.Errorf("Some snapshot fails to disable FSR for available zone %s, such as %s, error code is %v", targetAZ, *resp.Unsuccessful[0].SnapshotId, resp.Unsuccessful[0].FastSnapshotRestoreStateErrors) + } + + log.Info("Disable FSR issued", zap.String("available zone", targetAZ), zap.Any("snapshots", snapshotsIDsMap[targetAZ][start:end])) + + return nil + }) + } + } + return eg.Wait() +} + +func fetchTargetSnapshots(meta *config.EBSBasedBRMeta, specifiedAZ string) map[string][]*string { + var sourceSnapshotIDs = make(map[string][]*string) + + if len(meta.TiKVComponent.Stores) == 0 { + return sourceSnapshotIDs + } + + for i := range meta.TiKVComponent.Stores { + store := meta.TiKVComponent.Stores[i] + for j := range store.Volumes { + oldVol := store.Volumes[j] + // Handle data volume snapshots only + if strings.Compare(oldVol.Type, "storage.data-dir") == 0 { + if specifiedAZ != "" { + sourceSnapshotIDs[specifiedAZ] = append(sourceSnapshotIDs[specifiedAZ], &oldVol.SnapshotID) + } else { + sourceSnapshotIDs[oldVol.VolumeAZ] = append(sourceSnapshotIDs[oldVol.VolumeAZ], &oldVol.SnapshotID) + } + } + } + } + + return sourceSnapshotIDs +} + +>>>>>>> 2d45b7afe73 (ebs br: control the snapshots batch size for fsr enable/disable (#48506)) // CreateVolumes create volumes from snapshots // if err happens in the middle, return half-done result // returned map: store id -> old volume id -> new volume id diff --git a/br/pkg/task/restore_ebs_meta.go b/br/pkg/task/restore_ebs_meta.go index 53286505b5b9c..423581328fc24 100644 --- a/br/pkg/task/restore_ebs_meta.go +++ b/br/pkg/task/restore_ebs_meta.go @@ -236,6 +236,16 @@ func (h *restoreEBSMetaHelper) restoreVolumes(progress glue.Progress) (map[strin log.Error("failed to create all volumes, cleaning up created volume") ec2Session.DeleteVolumes(volumeIDMap) } +<<<<<<< HEAD +======= + + if h.cfg.UseFSR { + err = ec2Session.DisableDataFSR(snapshotsIDsMap) + if err != nil { + log.Error("disable fsr failed", zap.Error(err)) + } + } +>>>>>>> 2d45b7afe73 (ebs br: control the snapshots batch size for fsr enable/disable (#48506)) }() volumeIDMap, err = ec2Session.CreateVolumes(h.metaInfo, string(h.cfg.VolumeType), h.cfg.VolumeIOPS, h.cfg.VolumeThroughput, h.cfg.TargetAZ)