Skip to content

Commit

Permalink
br: use AWS CreateSnapshots API to automic create snapshots for multi…
Browse files Browse the repository at this point in the history
…ple volumes (#43591)

close #43433
  • Loading branch information
BornChanger authored May 11, 2023
1 parent 3b02511 commit 5909400
Show file tree
Hide file tree
Showing 6 changed files with 73 additions and 50 deletions.
4 changes: 2 additions & 2 deletions DEPS.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -240,8 +240,8 @@ def go_deps():
name = "com_github_aws_aws_sdk_go",
build_file_proto_mode = "disable_global",
importpath = "github.com/aws/aws-sdk-go",
sum = "h1:jLDC9RsNoYMLFlKpB8LdqUnoDdC2yvkS4QbuyPQJ8+M=",
version = "v1.44.48",
sum = "h1:7yDn1dcv4DZFMKpu+2exIH5O6ipNj9qXrKfdMUaIJwY=",
version = "v1.44.259",
)
go_repository(
name = "com_github_axw_gocov",
Expand Down
109 changes: 66 additions & 43 deletions br/pkg/aws/ebs.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ package aws
import (
"context"
"fmt"
"sort"
"strings"
"sync"
"time"
Expand All @@ -32,11 +31,11 @@ type EC2Session struct {

type VolumeAZs map[string]string

func NewEC2Session(concurrency uint) (*EC2Session, error) {
func NewEC2Session(concurrency uint, region string) (*EC2Session, error) {
// aws-sdk has builtin exponential backoff retry mechanism, see:
// https://github.com/aws/aws-sdk-go/blob/db4388e8b9b19d34dcde76c492b17607cd5651e2/aws/client/default_retryer.go#L12-L16
// with default retryer & max-retry=9, we will wait for at least 30s in total
awsConfig := aws.NewConfig().WithMaxRetries(9)
awsConfig := aws.NewConfig().WithMaxRetries(9).WithRegion(region)
// TiDB Operator need make sure we have the correct permission to call aws api(through aws env variables)
// we may change this behaviour in the future.
sessionOptions := session.Options{Config: *awsConfig}
Expand All @@ -49,66 +48,90 @@ func NewEC2Session(concurrency uint) (*EC2Session, error) {
}

// CreateSnapshots is the mainly steps to control the data volume snapshots.
// It will do the following works.
// 1. determine the order of volume snapshot.
// 2. send snapshot requests to aws.
func (e *EC2Session) CreateSnapshots(backupInfo *config.EBSBasedBRMeta) (map[string]string, VolumeAZs, error) {
snapIDMap := make(map[string]string)
volumeIDs := []*string{}
var volumeIDs []*string

var mutex sync.Mutex
eg, _ := errgroup.WithContext(context.Background())
fillResult := func(snap *ec2.Snapshot, volume *config.EBSVolume) {
fillResult := func(createOutput *ec2.CreateSnapshotsOutput) {
mutex.Lock()
defer mutex.Unlock()
snapIDMap[volume.ID] = *snap.SnapshotId
for j := range createOutput.Snapshots {
snapshot := createOutput.Snapshots[j]
snapIDMap[aws.StringValue(snapshot.VolumeId)] = aws.StringValue(snapshot.SnapshotId)
}
}

workerPool := utils.NewWorkerPool(e.concurrency, "create snapshot")
workerPool := utils.NewWorkerPool(e.concurrency, "create snapshots")
for i := range backupInfo.TiKVComponent.Stores {
store := backupInfo.TiKVComponent.Stores[i]
volumes := store.Volumes
if len(volumes) > 1 {
// if one store has multiple volume, we should respect the order
// raft log/engine first, then kv db. then wal
sort.SliceStable(volumes, func(i, j int) bool {
if strings.Contains(volumes[i].Type, "raft") {
return true
}
if strings.Contains(volumes[j].Type, "raft") {
return false
if len(volumes) >= 1 {
log.Info("fetch EC2 instance id using first volume")
var targetVolumeIDs []*string
for j := range volumes {
volume := volumes[j]
targetVolumeIDs = append(targetVolumeIDs, &volume.ID)
volumeIDs = append(volumeIDs, &volume.ID)
}

// determine the ec2 instance id
resp, err := e.ec2.DescribeVolumes(&ec2.DescribeVolumesInput{VolumeIds: targetVolumeIDs[0:1]})
if err != nil {
return snapIDMap, nil, errors.Trace(err)
}
if resp.Volumes[0].Attachments[0] == nil || resp.Volumes[0].Attachments[0].InstanceId == nil {
return snapIDMap, nil, errors.Errorf("specified volume %s is not attached", volumes[0].ID)
}
ec2InstanceId := resp.Volumes[0].Attachments[0].InstanceId
log.Info("EC2 instance id is", zap.Stringp("id", ec2InstanceId))

// determine the exclude volume list
var excludedVolumeIDs []*string
resp1, err := e.ec2.DescribeInstances(&ec2.DescribeInstancesInput{InstanceIds: []*string{ec2InstanceId}})
if err != nil {
return snapIDMap, nil, errors.Trace(err)
}
for j := range resp1.Reservations[0].Instances[0].BlockDeviceMappings {
device := resp1.Reservations[0].Instances[0].BlockDeviceMappings[j]
// skip root volume
if aws.StringValue(device.DeviceName) == aws.StringValue(resp1.Reservations[0].Instances[0].RootDeviceName) {
continue
}
if strings.Contains(volumes[i].Type, "storage") {
return true
toInclude := false
for k := range targetVolumeIDs {
targetVolumeID := targetVolumeIDs[k]
if aws.StringValue(targetVolumeID) == aws.StringValue(device.Ebs.VolumeId) {
toInclude = true
break
}
}
if strings.Contains(volumes[j].Type, "storage") {
return true
if !toInclude {
excludedVolumeIDs = append(excludedVolumeIDs, device.Ebs.VolumeId)
}
return true
})
}
}

log.Info("exclude volume list", zap.Stringp("ec2", ec2InstanceId), zap.Any("exclude volume list", excludedVolumeIDs))

for j := range volumes {
volume := volumes[j]
volumeIDs = append(volumeIDs, &volume.ID)
// create snapshots for volumes on this ec2 instance
workerPool.ApplyOnErrorGroup(eg, func() error {
log.Debug("starts snapshot", zap.Any("volume", volume))
resp, err := e.ec2.CreateSnapshot(&ec2.CreateSnapshotInput{
VolumeId: &volume.ID,
TagSpecifications: []*ec2.TagSpecification{
{
ResourceType: aws.String(ec2.ResourceTypeSnapshot),
Tags: []*ec2.Tag{
ec2Tag("TiDBCluster-BR", "old"),
},
},
},
})
// Prepare for aws requests
instanceSpecification := ec2.InstanceSpecification{}
createSnapshotInput := ec2.CreateSnapshotsInput{}

instanceSpecification.SetInstanceId(*ec2InstanceId)
instanceSpecification.SetExcludeBootVolume(true)
instanceSpecification.SetExcludeDataVolumeIds(excludedVolumeIDs)

createSnapshotInput.SetCopyTagsFromSource("volume")
createSnapshotInput.SetInstanceSpecification(&instanceSpecification)

resp, err := e.ec2.CreateSnapshots(&createSnapshotInput)
if err != nil {
return errors.Trace(err)
}
log.Info("snapshot creating", zap.Stringer("snap", resp))
fillResult(resp, volume)
fillResult(resp)
return nil
})
}
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/task/backup_ebs.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ func RunBackupEBS(c context.Context, g glue.Glue, cfg *BackupConfig) error {
progress := g.StartProgress(ctx, "backup", int64(storeCount)*100, !cfg.LogProgress)
go progressFileWriterRoutine(ctx, progress, int64(storeCount)*100, cfg.ProgressFile)

ec2Session, err := aws.NewEC2Session(cfg.CloudAPIConcurrency)
ec2Session, err := aws.NewEC2Session(cfg.CloudAPIConcurrency, cfg.S3.Region)
if err != nil {
return errors.Trace(err)
}
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/task/restore_ebs_meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ func (h *restoreEBSMetaHelper) restoreVolumes(progress glue.Progress) (map[strin
err error
totalSize int64
)
ec2Session, err = aws.NewEC2Session(h.cfg.CloudAPIConcurrency)
ec2Session, err = aws.NewEC2Session(h.cfg.CloudAPIConcurrency, h.cfg.S3.Region)
if err != nil {
return nil, 0, errors.Trace(err)
}
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ require (
github.com/aliyun/alibaba-cloud-sdk-go v1.61.1581
github.com/apache/skywalking-eyes v0.4.0
github.com/ashanbrown/makezero v1.1.1
github.com/aws/aws-sdk-go v1.44.48
github.com/aws/aws-sdk-go v1.44.259
github.com/bazelbuild/buildtools v0.0.0-20230317132445-9c3c1fc0106e
github.com/blacktear23/go-proxyprotocol v1.0.6
github.com/butuzov/mirror v0.1.2
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -127,8 +127,8 @@ github.com/ashanbrown/makezero v1.1.1 h1:iCQ87C0V0vSyO+M9E/FZYbu65auqH0lnsOkf5Fc
github.com/ashanbrown/makezero v1.1.1/go.mod h1:i1bJLCRSCHOcOa9Y6MyF2FTfMZMFdHvxKHxgO5Z1axI=
github.com/aws/aws-sdk-go v1.15.24/go.mod h1:mFuSZ37Z9YOHbQEwBWztmVzqXrEkub65tZoCYDt7FT0=
github.com/aws/aws-sdk-go v1.30.19/go.mod h1:5zCpMtNQVjRREroY7sYe8lOMRSxkhG6MZveU8YkpAk0=
github.com/aws/aws-sdk-go v1.44.48 h1:jLDC9RsNoYMLFlKpB8LdqUnoDdC2yvkS4QbuyPQJ8+M=
github.com/aws/aws-sdk-go v1.44.48/go.mod h1:y4AeaBuwd2Lk+GepC1E9v0qOiTws0MIWAX4oIKwKHZo=
github.com/aws/aws-sdk-go v1.44.259 h1:7yDn1dcv4DZFMKpu+2exIH5O6ipNj9qXrKfdMUaIJwY=
github.com/aws/aws-sdk-go v1.44.259/go.mod h1:aVsgQcEevwlmQ7qHE9I3h+dtQgpqhFB+i8Phjh7fkwI=
github.com/aymerick/raymond v2.0.3-0.20180322193309-b565731e1464+incompatible/go.mod h1:osfaiScAUVup+UC9Nfq76eWqDhXlp+4UYaA8uhTBO6g=
github.com/bazelbuild/buildtools v0.0.0-20230317132445-9c3c1fc0106e h1:XmPu4mXICgdGnC5dXGjUGbwUD/kUmS0l5Aop3LaevBM=
github.com/bazelbuild/buildtools v0.0.0-20230317132445-9c3c1fc0106e/go.mod h1:689QdV3hBP7Vo9dJMmzhoYIyo/9iMhEmHkJcnaPRCbo=
Expand Down

0 comments on commit 5909400

Please sign in to comment.