Skip to content

Commit

Permalink
Add snapshot copy func to Az disk (kanisterio#563)
Browse files Browse the repository at this point in the history
* Add snapshot copy func to Az disk

* generate storageAccountID

* generate unique blob names

* nit: compilation error fix

* Add SnapshotCopyWithArgs func to AD

* Address review comments and fix ci

* address review suggestion

* fix lint errors

* nit fix

* undo changes to client_test
  • Loading branch information
SupriyaKasten authored Mar 11, 2020
1 parent bb9d142 commit 6bc0459
Show file tree
Hide file tree
Showing 10 changed files with 220 additions and 21 deletions.
4 changes: 4 additions & 0 deletions pkg/blockstorage/awsebs/awsebs.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,10 @@ func (s *ebsStorage) SnapshotCopy(ctx context.Context, from, to blockstorage.Sna
return rs, nil
}

func (s *ebsStorage) SnapshotCopyWithArgs(ctx context.Context, from blockstorage.Snapshot, to blockstorage.Snapshot, args map[string]string) (*blockstorage.Snapshot, error) {
return nil, errors.New("Copy Snapshot with Args not implemented")
}

func (s *ebsStorage) SnapshotCreate(ctx context.Context, volume blockstorage.Volume, tags map[string]string) (*blockstorage.Snapshot, error) {
// Snapshot the EBS volume
csi := (&ec2.CreateSnapshotInput{}).SetVolumeId(volume.ID)
Expand Down
4 changes: 4 additions & 0 deletions pkg/blockstorage/awsefs/awsefs.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,10 @@ func (e *efs) SnapshotCopy(ctx context.Context, from blockstorage.Snapshot, to b
return nil, errors.New("Not implemented")
}

func (e *efs) SnapshotCopyWithArgs(ctx context.Context, from blockstorage.Snapshot, to blockstorage.Snapshot, args map[string]string) (*blockstorage.Snapshot, error) {
return nil, errors.New("Copy Snapshot with Args not implemented")
}

func (e *efs) SnapshotCreate(ctx context.Context, volume blockstorage.Volume, tags map[string]string) (*blockstorage.Snapshot, error) {
err := e.createK10DefaultBackupVault()
if err != nil {
Expand Down
146 changes: 144 additions & 2 deletions pkg/blockstorage/azure/azuredisk.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@ import (
"fmt"
"regexp"
"strings"
"time"

azcompute "github.com/Azure/azure-sdk-for-go/services/compute/mgmt/2019-07-01/compute"
"github.com/Azure/azure-sdk-for-go/storage"
azto "github.com/Azure/go-autorest/autorest/to"
"github.com/pkg/errors"
uuid "github.com/satori/go.uuid"
Expand All @@ -17,15 +19,18 @@ import (
"github.com/kanisterio/kanister/pkg/field"
"github.com/kanisterio/kanister/pkg/kube"
"github.com/kanisterio/kanister/pkg/log"
"github.com/kanisterio/kanister/pkg/poll"
)

var _ blockstorage.Provider = (*adStorage)(nil)

var _ zone.Mapper = (*adStorage)(nil)

const (
volumeNameFmt = "vol-%s"
snapshotNameFmt = "snap-%s"
volumeNameFmt = "vol-%s"
snapshotNameFmt = "snap-%s"
copyContainerName = "vhdscontainer"
copyBlobName = "copy-blob-%s.vhd"
)

type adStorage struct {
Expand Down Expand Up @@ -116,6 +121,143 @@ func (s *adStorage) SnapshotCopy(ctx context.Context, from blockstorage.Snapshot
return nil, errors.New("Copy Snapshot not implemented")
}

// SnapshotCopyWithArgs func: args map should contain non-empty StorageAccountName(AZURE_MIGRATE_STORAGE_ACCOUNT_NAME)
// and StorageKey(AZURE_MIGRATE_STORAGE_ACCOUNT_KEY)
func (s *adStorage) SnapshotCopyWithArgs(ctx context.Context, from blockstorage.Snapshot, to blockstorage.Snapshot, args map[string]string) (*blockstorage.Snapshot, error) {
migrateStorageAccount := args[blockstorage.AzureMigrateStorageAccount]
migrateStorageKey := args[blockstorage.AzureMigrateStorageKey]
if migrateStorageAccount == "" || migrateStorageKey == "" {
return nil, errors.Errorf("Required args %s and %s for snapshot copy not available", blockstorage.AzureMigrateStorageAccount, blockstorage.AzureMigrateStorageKey)
}

storageCli, err := storage.NewBasicClient(migrateStorageAccount, migrateStorageKey)
if err != nil {
return nil, errors.Wrap(err, "Cannot get storage service client")
}
storageAccountID := "/subscriptions/" + s.azCli.SubscriptionID + "/resourceGroups/" + s.azCli.ResourceGroup + "/providers/Microsoft.Storage/storageAccounts/" + migrateStorageAccount

_, rg, name, err := parseSnapshotID(from.ID)
if err != nil {
return nil, errors.Wrapf(err, "SnapshotsClient.Copy: Failure in parsing snapshot ID %s", from.ID)
}
_, err = s.azCli.SnapshotsClient.Get(ctx, rg, name)
if err != nil {
return nil, errors.Wrapf(err, "SnapshotsClient.Copy: Failed to get snapshot with ID %s", from.ID)
}

duration := int32(3600)
gad := azcompute.GrantAccessData{
Access: azcompute.Read,
DurationInSeconds: &duration,
}

snapshotsGrantAccessFuture, err := s.azCli.SnapshotsClient.GrantAccess(ctx, rg, name, gad)
if err != nil {
return nil, errors.Wrapf(err, "Failed to grant read access to snapshot: %s", from.ID)
}
defer s.revokeAccess(ctx, rg, name, from.ID)

err = poll.Wait(ctx, func(ctx context.Context) (bool, error) {
_, err := snapshotsGrantAccessFuture.Result(*s.azCli.SnapshotsClient)
if err != nil {
if strings.Contains(err.Error(), "asynchronous operation has not completed") {
return false, nil
}
return false, err
}
return true, nil
})
if err != nil {
return nil, errors.Wrap(err, "SnapshotsClient.Copy failure to grant snapshot access")
}

accessURI, err := snapshotsGrantAccessFuture.Result(*s.azCli.SnapshotsClient)
if err != nil {
return nil, errors.Wrap(err, "SnapshotsClient.Copy failure to grant snapshot access")
}
blobStorageClient := storageCli.GetBlobService()
container := blobStorageClient.GetContainerReference(copyContainerName)
_, err = container.CreateIfNotExists(nil)
if err != nil {
return nil, err
}
blobName := fmt.Sprintf(copyBlobName, name)
blob := container.GetBlobReference(blobName)
defer deleteBlob(blob, blobName)

var copyOptions *storage.CopyOptions
if t, ok := ctx.Deadline(); ok {
time := time.Until(t).Seconds()
if time <= 0 {
return nil, errors.New("Context deadline exceeded, cannot copy snapshot")
}
copyOptions = &storage.CopyOptions{
Timeout: uint(time),
}
}
err = blob.Copy(*accessURI.AccessSAS, copyOptions)
if err != nil {
return nil, errors.Wrapf(err, "Failed to copy disk to blob")
}
blobURI := blob.GetURL()

snapName := fmt.Sprintf(snapshotNameFmt, uuid.NewV1().String())
var tags = make(map[string]string)
for _, tag := range from.Volume.Tags {
if _, found := tags[tag.Key]; !found {
tags[tag.Key] = tag.Value
}
}
tags = blockstorage.SanitizeTags(ktags.GetTags(tags))

createSnap := azcompute.Snapshot{
Name: azto.StringPtr(snapName),
Location: azto.StringPtr(to.Region),
Tags: *azto.StringMapPtr(tags),
SnapshotProperties: &azcompute.SnapshotProperties{
CreationData: &azcompute.CreationData{
CreateOption: azcompute.Import,
StorageAccountID: azto.StringPtr(storageAccountID),
SourceURI: azto.StringPtr(blobURI),
},
},
}

result, err := s.azCli.SnapshotsClient.CreateOrUpdate(ctx, s.azCli.ResourceGroup, snapName, createSnap)
if err != nil {
return nil, errors.Wrapf(err, "Failed to copy snapshot from source snapshot %v", from)
}
err = result.WaitForCompletionRef(ctx, s.azCli.SnapshotsClient.Client)
if err != nil {
return nil, errors.Wrapf(err, "Failed to copy snapshot from source snapshot %v", from)
}
rs, err := result.Result(*s.azCli.SnapshotsClient)
if err != nil {
return nil, errors.Wrapf(err, "Error in getting result of Snapshot copy operation, snaphotName %s", snapName)
}

snap, err := s.SnapshotGet(ctx, azto.String(rs.ID))
if err != nil {
return nil, errors.Wrapf(err, "Failed to Get Snapshot after create, snaphotName %s", snapName)
}
*snap.Volume = *from.Volume
return snap, nil
}

func (s *adStorage) revokeAccess(ctx context.Context, rg, name, ID string) {
_, err := s.azCli.SnapshotsClient.RevokeAccess(ctx, rg, name)
if err != nil {
log.Print("Failed to revoke access from snapshot", field.M{"snapshot": ID})
}
}

func deleteBlob(blob *storage.Blob, blobName string) {
_, err := blob.DeleteIfExists(nil)
if err != nil {
log.Print("Failed to delete blob", field.M{"blob": blobName})
}
}

func (s *adStorage) SnapshotCreate(ctx context.Context, volume blockstorage.Volume, tags map[string]string) (*blockstorage.Snapshot, error) {
snapName := fmt.Sprintf(snapshotNameFmt, uuid.NewV1().String())
tags = blockstorage.SanitizeTags(ktags.GetTags(tags))
Expand Down
5 changes: 5 additions & 0 deletions pkg/blockstorage/blockstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@ type Provider interface {
VolumeGet(ctx context.Context, id string, zone string) (*Volume, error)
// Snapshot operations
SnapshotCopy(ctx context.Context, from Snapshot, to Snapshot) (*Snapshot, error)
// SnapshotCopyWithArgs func is invoked to perform migration when there is a need to provide
// additional params such as creds of target cluster to carry out the
// Snapshot copy action, use SnapshotCopy func otherwise.
// Currently used by Azure only.
SnapshotCopyWithArgs(ctx context.Context, from Snapshot, to Snapshot, args map[string]string) (*Snapshot, error)
SnapshotCreate(ctx context.Context, volume Volume, tags map[string]string) (*Snapshot, error)
SnapshotCreateWaitForCompletion(context.Context, *Snapshot) error
SnapshotDelete(context.Context, *Snapshot) error
Expand Down
41 changes: 33 additions & 8 deletions pkg/blockstorage/blockstorage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ type BlockStorageProviderSuite struct {
provider blockstorage.Provider
volumes []*blockstorage.Volume
snapshots []*blockstorage.Snapshot
args map[string]string
}

var _ = Suite(&BlockStorageProviderSuite{storageType: blockstorage.TypeEBS, storageRegion: clusterRegionAWS, storageAZ: "us-west-2b"})
Expand All @@ -55,6 +56,7 @@ var _ = Suite(&BlockStorageProviderSuite{storageType: blockstorage.TypeAD, stora

func (s *BlockStorageProviderSuite) SetUpSuite(c *C) {
var err error
s.args = make(map[string]string)
config := s.getConfig(c, s.storageRegion)
s.provider, err = getter.New().Get(s.storageType, config)
c.Assert(err, IsNil)
Expand Down Expand Up @@ -144,16 +146,35 @@ func (s *BlockStorageProviderSuite) TestCreateSnapshot(c *C) {
func (s *BlockStorageProviderSuite) TestSnapshotCopy(c *C) {
c.Skip("Sometimes, snapcopy takes over 10 minutes. go test declares failure if tests are that slow.")

var snap *blockstorage.Snapshot
var err error
srcSnapshot := s.createSnapshot(c)
dstSnapshot := &blockstorage.Snapshot{
Type: srcSnapshot.Type,
Encrypted: false,
Size: srcSnapshot.Size,
Region: "us-east-1",
Volume: nil,
var dstSnapshot *blockstorage.Snapshot
switch s.storageType {
case blockstorage.TypeEBS:
dstSnapshot = &blockstorage.Snapshot{
Type: srcSnapshot.Type,
Encrypted: false,
Size: srcSnapshot.Size,
Region: "us-east-1",
Volume: nil,
}
case blockstorage.TypeAD:
dstSnapshot = &blockstorage.Snapshot{
Type: srcSnapshot.Type,
Encrypted: false,
Size: srcSnapshot.Size,
Region: "westus2",
Volume: nil,
}
snap, err = s.provider.SnapshotCopyWithArgs(context.TODO(), *srcSnapshot, *dstSnapshot, s.args)
c.Assert(err, IsNil)
}

if s.storageType != blockstorage.TypeAD {
snap, err = s.provider.SnapshotCopy(context.TODO(), *srcSnapshot, *dstSnapshot)
c.Assert(err, IsNil)
}
snap, err := s.provider.SnapshotCopy(context.TODO(), *srcSnapshot, *dstSnapshot)
c.Assert(err, IsNil)

log.Print("Snapshot copied", field.M{"FromSnapshotID": srcSnapshot.ID, "ToSnapshotID": snap.ID})

Expand All @@ -169,6 +190,8 @@ func (s *BlockStorageProviderSuite) TestSnapshotCopy(c *C) {

err = provider.SnapshotDelete(context.TODO(), snap)
c.Assert(err, IsNil)
err = provider.SnapshotDelete(context.TODO(), srcSnapshot)
c.Assert(err, IsNil)
}

func (s *BlockStorageProviderSuite) testVolumesList(c *C) {
Expand Down Expand Up @@ -279,6 +302,8 @@ func (s *BlockStorageProviderSuite) getConfig(c *C, region string) map[string]st
config[blockstorage.AzureCientID] = envconfig.GetEnvOrSkip(c, blockstorage.AzureCientID)
config[blockstorage.AzureClentSecret] = envconfig.GetEnvOrSkip(c, blockstorage.AzureClentSecret)
config[blockstorage.AzureResurceGroup] = envconfig.GetEnvOrSkip(c, blockstorage.AzureResurceGroup)
s.args[blockstorage.AzureMigrateStorageAccount] = envconfig.GetEnvOrSkip(c, blockstorage.AzureMigrateStorageAccount)
s.args[blockstorage.AzureMigrateStorageKey] = envconfig.GetEnvOrSkip(c, blockstorage.AzureMigrateStorageKey)
default:
c.Errorf("Unknown blockstorage storage type %s", s.storageType)
}
Expand Down
4 changes: 4 additions & 0 deletions pkg/blockstorage/gcepd/gcepd.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,10 @@ func (s *gpdStorage) SnapshotCopy(ctx context.Context, from blockstorage.Snapsho
return nil, errors.Errorf("Not implemented")
}

func (s *gpdStorage) SnapshotCopyWithArgs(ctx context.Context, from blockstorage.Snapshot, to blockstorage.Snapshot, args map[string]string) (*blockstorage.Snapshot, error) {
return nil, errors.New("Copy Snapshot with Args not implemented")
}

func (s *gpdStorage) SnapshotCreate(ctx context.Context, volume blockstorage.Volume, tags map[string]string) (*blockstorage.Snapshot, error) {
rb := &compute.Snapshot{
Name: fmt.Sprintf(snapshotNameFmt, uuid.NewV1().String()),
Expand Down
24 changes: 13 additions & 11 deletions pkg/blockstorage/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,19 @@ import (

// Google Cloud environment variable names
const (
GoogleCloudZone = "CLOUDSDK_COMPUTE_ZONE"
GoogleCloudCreds = "GOOGLE_APPLICATION_CREDENTIALS"
GoogleProjectID = "projectID"
GoogleServiceKey = "serviceKey"
AzureStorageAccount = "AZURE_STORAGE_ACCOUNT_NAME"
AzureStorageKey = "AZURE_STORAGE_ACCOUNT_KEY"
AzureSubscriptionID = "AZURE_SUBSCRIPTION_ID"
AzureTenantID = "AZURE_TENANT_ID"
AzureCientID = "AZURE_CLIENT_ID"
AzureClentSecret = "AZURE_CLIENT_SECRET"
AzureResurceGroup = "AZURE_RESOURCE_GROUP"
GoogleCloudZone = "CLOUDSDK_COMPUTE_ZONE"
GoogleCloudCreds = "GOOGLE_APPLICATION_CREDENTIALS"
GoogleProjectID = "projectID"
GoogleServiceKey = "serviceKey"
AzureStorageAccount = "AZURE_STORAGE_ACCOUNT_NAME"
AzureStorageKey = "AZURE_STORAGE_ACCOUNT_KEY"
AzureSubscriptionID = "AZURE_SUBSCRIPTION_ID"
AzureTenantID = "AZURE_TENANT_ID"
AzureCientID = "AZURE_CLIENT_ID"
AzureClentSecret = "AZURE_CLIENT_SECRET"
AzureResurceGroup = "AZURE_RESOURCE_GROUP"
AzureMigrateStorageAccount = "AZURE_MIGRATE_STORAGE_ACCOUNT_NAME"
AzureMigrateStorageKey = "AZURE_MIGRATE_STORAGE_ACCOUNT_KEY"
)

// SanitizeTags are used to sanitize the tags
Expand Down
4 changes: 4 additions & 0 deletions pkg/blockstorage/ibm/ibmcloud.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,10 @@ func (s *ibmCloud) SnapshotCopy(ctx context.Context, from, to blockstorage.Snaps
return nil, errors.New("Not implemented")
}

func (s *ibmCloud) SnapshotCopyWithArgs(ctx context.Context, from blockstorage.Snapshot, to blockstorage.Snapshot, args map[string]string) (*blockstorage.Snapshot, error) {
return nil, errors.New("Copy Snapshot with Args not implemented")
}

func (s *ibmCloud) SnapshotCreate(ctx context.Context, volume blockstorage.Volume, tags map[string]string) (*blockstorage.Snapshot, error) {
alltags := ktags.GetTags(tags)
ibmvol, err := s.cli.Service.GetVolume(volume.ID)
Expand Down
4 changes: 4 additions & 0 deletions pkg/blockstorage/vmware/vmware.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,10 @@ func (p *fcdProvider) SnapshotCopy(ctx context.Context, from blockstorage.Snapsh
return nil, errors.New("Not implemented")
}

func (p *fcdProvider) SnapshotCopyWithArgs(ctx context.Context, from blockstorage.Snapshot, to blockstorage.Snapshot, args map[string]string) (*blockstorage.Snapshot, error) {
return nil, errors.New("Copy Snapshot with Args not implemented")
}

func (p *fcdProvider) SnapshotCreate(ctx context.Context, volume blockstorage.Volume, tags map[string]string) (*blockstorage.Snapshot, error) {
task, err := p.gom.CreateSnapshot(ctx, vimID(volume.ID), noDescription)
if err != nil {
Expand Down
5 changes: 5 additions & 0 deletions pkg/testutil/mockblockstorage/mockblockstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,11 @@ func (p *Provider) SnapshotCopy(ctx context.Context, from, to blockstorage.Snaps
return p.MockSnapshot(), nil
}

// SnapshotCopyWithArgs mock
func (p *Provider) SnapshotCopyWithArgs(ctx context.Context, from, to blockstorage.Snapshot, args map[string]string) (*blockstorage.Snapshot, error) {
return p.MockSnapshot(), nil
}

// SnapshotCreate mock
func (p *Provider) SnapshotCreate(ctx context.Context, volume blockstorage.Volume, tags map[string]string) (*blockstorage.Snapshot, error) {
return p.MockSnapshot(), nil
Expand Down

0 comments on commit 6bc0459

Please sign in to comment.