Skip to content

Commit

Permalink
fix volume cloning and add e2e
Browse files Browse the repository at this point in the history
  • Loading branch information
umagnus committed Aug 1, 2024
1 parent 4152d89 commit d39b8b7
Show file tree
Hide file tree
Showing 3 changed files with 252 additions and 60 deletions.
4 changes: 4 additions & 0 deletions pkg/blob/blob.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,10 @@ var (
supportedProtocolList = []string{Fuse, Fuse2, NFS, AZNFS}
retriableErrors = []string{accountNotProvisioned, tooManyRequests, statusCodeNotFound, containerBeingDeletedDataplaneAPIError, containerBeingDeletedManagementAPIError, clientThrottled}
supportedFSGroupChangePolicyList = []string{FSGroupChangeNone, string(v1.FSGroupChangeAlways), string(v1.FSGroupChangeOnRootMismatch)}

// azcopyCloneVolumeOptions used in volume cloning between different storage account and --check-length to false because volume data may be in changing state, copy volume is not same as current source volume,
// set --s2s-preserve-access-tier=false to avoid BlobAccessTierNotSupportedForAccountType error in azcopy
azcopyCloneVolumeOptions = []string{"--recursive", "--check-length=false", "--s2s-preserve-access-tier=false"}
)

// DriverOptions defines driver parameters specified in driver deployment
Expand Down
103 changes: 59 additions & 44 deletions pkg/blob/controllerserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -366,36 +366,12 @@ func (d *Driver) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest)
defer d.volumeLocks.Release(volName)

requestName := "controller_create_volume"

var srcAzcopyAuthEnv []string
var srcSubscriptionID, srcResourceGroupName, srcAccountName, srcContainerName, srcPath, srcAccountSASToken string
if volContentSource != nil {
switch volContentSource.Type.(type) {
case *csi.VolumeContentSource_Snapshot:
return nil, status.Errorf(codes.InvalidArgument, "VolumeContentSource Snapshot is not yet implemented")
case *csi.VolumeContentSource_Volume:
requestName = "controller_create_volume_from_volume"
var srcVolumeID string
if volContentSource.GetVolume() != nil {
srcVolumeID = volContentSource.GetVolume().GetVolumeId()
}
srcResourceGroupName, srcAccountName, srcContainerName, _, srcSubscriptionID, err = GetContainerInfo(srcVolumeID)
if err != nil {
return nil, status.Error(codes.NotFound, err.Error())
}
srcAccountOptions := &azure.AccountOptions{
Name: srcAccountName,
SubscriptionID: srcSubscriptionID,
ResourceGroup: srcResourceGroupName,
GetLatestAccountKey: getLatestAccountKey,
}
srcAccountSASToken, srcAzcopyAuthEnv, err = d.getAzcopyAuth(ctx, srcAccountName, "", storageEndpointSuffix, srcAccountOptions, secrets, secretName, secretNamespace)
if err != nil {
return nil, status.Errorf(codes.Internal, "failed to getAzcopyAuth on account(%s) rg(%s), error: %v", accountOptions.Name, accountOptions.ResourceGroup, err)
}
srcPath = fmt.Sprintf("https://%s.blob.%s/%s", srcAccountName, storageEndpointSuffix, srcContainerName)
default:
return nil, status.Errorf(codes.InvalidArgument, "VolumeContentSource is not recognized: %v", volContentSource)
}
}

Expand Down Expand Up @@ -466,16 +442,11 @@ func (d *Driver) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest)
return nil, status.Errorf(codes.Internal, "failed to create container(%s) on account(%s) type(%s) rg(%s) location(%s) size(%d), error: %v", validContainerName, accountName, storageAccountType, resourceGroup, location, requestGiB, err)
}
if volContentSource != nil {
dstAzcopyAuthEnv := srcAzcopyAuthEnv
dstAccountSASToken := srcAccountSASToken
if srcAccountName != accountName {
if dstAccountSASToken, dstAzcopyAuthEnv, err = d.getAzcopyAuth(ctx, accountName, accountKey, storageEndpointSuffix, accountOptions, secrets, secretName, secretNamespace); err != nil {
return nil, status.Errorf(codes.Internal, "failed to getAzcopyAuth on account(%s) rg(%s), error: %v", accountOptions.Name, accountOptions.ResourceGroup, err)
}
accountSASToken, authAzcopyEnv, err := d.getAzcopyAuth(ctx, accountName, accountKey, storageEndpointSuffix, accountOptions, secrets, secretName, secretNamespace)
if err != nil {
return nil, status.Errorf(codes.Internal, "failed to getAzcopyAuth on account(%s) rg(%s), error: %v", accountOptions.Name, accountOptions.ResourceGroup, err)
}

dstPath := fmt.Sprintf("https://%s.blob.%s/%s", accountName, storageEndpointSuffix, validContainerName)
if err := d.copyBlobContainer(dstAzcopyAuthEnv, srcPath, srcAccountSASToken, dstPath, dstAccountSASToken, validContainerName); err != nil {
if err := d.copyVolume(ctx, req, accountName, accountSASToken, authAzcopyEnv, validContainerName, secretNamespace, accountOptions, storageEndpointSuffix); err != nil {
return nil, err
}
}
Expand Down Expand Up @@ -786,11 +757,36 @@ func (d *Driver) DeleteBlobContainer(ctx context.Context, subsID, resourceGroupN
}

// copyBlobContainer copies source volume content into a destination volume
func (d *Driver) copyBlobContainer(authAzcopyEnv []string, srcPath string, srcAccountSASToken string, dstPath string, dstAccountSASToken string, dstContainerName string) error {
func (d *Driver) copyBlobContainer(ctx context.Context, req *csi.CreateVolumeRequest, dstAccountName string, dstAccountSasToken string, authAzcopyEnv []string, dstContainerName string, secretNamespace string, accountOptions *azure.AccountOptions, storageEndpointSuffix string) error {
var sourceVolumeID string
if req.GetVolumeContentSource() != nil && req.GetVolumeContentSource().GetVolume() != nil {
sourceVolumeID = req.GetVolumeContentSource().GetVolume().GetVolumeId()

if srcPath == "" || dstPath == "" || dstContainerName == "" {
return fmt.Errorf("srcPath(%s) or dstPath(%s) or dstContainerName(%s) is empty", srcPath, dstPath, dstContainerName)
}
srcResourceGroupName, srcAccountName, srcContainerName, _, srcSubscriptionID, err := GetContainerInfo(sourceVolumeID) //nolint:dogsled
if err != nil {
return status.Error(codes.NotFound, err.Error())
}
if dstAccountName == "" {
dstAccountName = srcAccountName
}
if srcAccountName == "" || srcContainerName == "" || dstContainerName == "" {
return fmt.Errorf("srcAccountName(%s) or srcContainerName(%s) or dstContainerName(%s) is empty", srcAccountName, srcContainerName, dstContainerName)
}
srcAccountSasToken := dstAccountSasToken
if srcAccountName != dstAccountName && dstAccountSasToken != "" {
srcAccountOptions := &azure.AccountOptions{
Name: srcAccountName,
ResourceGroup: srcResourceGroupName,
SubscriptionID: srcSubscriptionID,
GetLatestAccountKey: accountOptions.GetLatestAccountKey,
}
if srcAccountSasToken, _, err = d.getAzcopyAuth(ctx, srcAccountName, "", storageEndpointSuffix, srcAccountOptions, nil, "", secretNamespace); err != nil {
return err
}
}
srcPath := fmt.Sprintf("https://%s.blob.%s/%s%s", srcAccountName, storageEndpointSuffix, srcContainerName, srcAccountSasToken)
dstPath := fmt.Sprintf("https://%s.blob.%s/%s%s", dstAccountName, storageEndpointSuffix, dstContainerName, dstAccountSasToken)

jobState, percent, err := d.azcopy.GetAzcopyJob(dstContainerName, authAzcopyEnv)
klog.V(2).Infof("azcopy job status: %s, copy percent: %s%%, error: %v", jobState, percent, err)
Expand All @@ -800,13 +796,9 @@ func (d *Driver) copyBlobContainer(authAzcopyEnv []string, srcPath string, srcAc
case util.AzcopyJobRunning:
return fmt.Errorf("wait for the existing AzCopy job to complete, current copy percentage is %s%%", percent)
case util.AzcopyJobNotFound:
klog.V(2).Infof("copy blob container %s to %s", srcPath, dstContainerName)
klog.V(2).Infof("copy blob container %s:%s to %s:%s", srcAccountName, srcContainerName, dstAccountName, dstContainerName)
execFunc := func() error {
cmd := exec.Command("azcopy", "copy", srcPath+srcAccountSASToken, dstPath+dstAccountSASToken, "--recursive", "--check-length=false", "--s2s-preserve-access-tier=false")
if len(authAzcopyEnv) > 0 {
cmd.Env = append(os.Environ(), authAzcopyEnv...)
}
if out, err := cmd.CombinedOutput(); err != nil {
if out, err := d.execAzcopyCopy(srcPath, dstPath, azcopyCloneVolumeOptions, authAzcopyEnv); err != nil {
return fmt.Errorf("exec error: %v, output: %v", err, string(out))
}
return nil
Expand All @@ -817,15 +809,38 @@ func (d *Driver) copyBlobContainer(authAzcopyEnv []string, srcPath string, srcAc
}
copyErr := util.WaitUntilTimeout(time.Duration(d.waitForAzCopyTimeoutMinutes)*time.Minute, execFunc, timeoutFunc)
if copyErr != nil {
klog.Warningf("CopyBlobContainer(%s, %s, %s) failed with error: %v", srcPath, dstPath, dstContainerName, copyErr)
klog.Warningf("CopyBlobContainer(%s, %s, %s) failed with error: %v", accountOptions.ResourceGroup, dstAccountName, dstContainerName, copyErr)
} else {
klog.V(2).Infof("copied blob container %s to %s successfully", srcPath, dstContainerName)
klog.V(2).Infof("copied blob container %s to %s successfully", srcContainerName, dstContainerName)
}
return copyErr
}
return err
}

// copyVolume copies a volume form volume or snapshot, snapshot is not supported now
func (d *Driver) copyVolume(ctx context.Context, req *csi.CreateVolumeRequest, accountName string, accountSASToken string, authAzcopyEnv []string, dstContainerName, secretNamespace string, accountOptions *azure.AccountOptions, storageEndpointSuffix string) error {
vs := req.VolumeContentSource
switch vs.Type.(type) {
case *csi.VolumeContentSource_Snapshot:
return status.Errorf(codes.InvalidArgument, "VolumeContentSource Snapshot is not yet implemented")
case *csi.VolumeContentSource_Volume:
return d.copyBlobContainer(ctx, req, accountName, accountSASToken, authAzcopyEnv, dstContainerName, secretNamespace, accountOptions, storageEndpointSuffix)
default:
return status.Errorf(codes.InvalidArgument, "%v is not a proper volume source", vs)
}
}

// execAzcopyCopy exec azcopy copy command
func (d *Driver) execAzcopyCopy(srcPath, dstPath string, azcopyCopyOptions, authAzcopyEnv []string) ([]byte, error) {
cmd := exec.Command("azcopy", "copy", srcPath, dstPath)
cmd.Args = append(cmd.Args, azcopyCopyOptions...)
if len(authAzcopyEnv) > 0 {
cmd.Env = append(os.Environ(), authAzcopyEnv...)
}
return cmd.CombinedOutput()
}

// authorizeAzcopyWithIdentity returns auth env for azcopy using cluster identity
func (d *Driver) authorizeAzcopyWithIdentity() ([]string, error) {
azureAuthConfig := d.cloud.Config.AzureAuthConfig
Expand Down
Loading

0 comments on commit d39b8b7

Please sign in to comment.