Skip to content

Commit

Permalink
use azcopy for volume cloning
Browse files Browse the repository at this point in the history
  • Loading branch information
umagnus committed Oct 18, 2023
1 parent dd006e4 commit fec2d23
Show file tree
Hide file tree
Showing 24 changed files with 1,269 additions and 10 deletions.
2 changes: 2 additions & 0 deletions .trivyignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
CVE-2023-39325
CVE-2023-44487
2 changes: 1 addition & 1 deletion deploy/csi-blob-controller.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ spec:
- "--csi-address=$(ADDRESS)"
- "--leader-election"
- "--leader-election-namespace=kube-system"
- "--timeout=120s"
- "--timeout=600s"
- "--extra-create-metadata=true"
- "--kube-api-qps=50"
- "--kube-api-burst=100"
Expand Down
16 changes: 16 additions & 0 deletions deploy/example/pvc-blob-csi-clone.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
---
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: pvc-blob-clone
namespace: default
spec:
accessModes:
- ReadWriteMany
resources:
requests:
storage: 100Gi
storageClassName: blob-fuse
dataSource:
kind: PersistentVolumeClaim
name: pvc-blob
15 changes: 15 additions & 0 deletions hack/boilerplate/boilerplate.generatego.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
/*
Copyright The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
15 changes: 15 additions & 0 deletions hack/boilerplate/boilerplate.gomock.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
// /*
// Copyright The Kubernetes Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// */
22 changes: 21 additions & 1 deletion hack/boilerplate/boilerplate.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,14 @@ def get_refs():

return refs

def is_generated_file(filename, data, regexs):
for d in skipped_ungenerated_files:
if d in filename:
return False

p = regexs["generated"]
return p.search(data)

def file_passes(filename, refs, regexs):
try:
f = open(filename, 'r')
Expand All @@ -75,15 +83,21 @@ def file_passes(filename, refs, regexs):
data = f.read()
f.close()

# determine if the file is automatically generated
generated = is_generated_file(filename, data, regexs)

basename = os.path.basename(filename)
extension = file_extension(filename)
if generated:
if extension == "go":
extension = "gomock"
if extension != "":
ref = refs[extension]
else:
ref = refs[basename]

# remove build tags from the top of Go files
if extension == "go":
if extension == "go" or extension == "gomock":
p = regexs["go_build_constraints"]
(data, found) = p.subn("", data, 1)
if is_autogenerated(data, regexs):
Expand Down Expand Up @@ -142,6 +156,10 @@ def file_extension(filename):
'cluster/env.sh', 'vendor', 'test/e2e/generated/bindata.go',
'repo-infra/verify/boilerplate/test', '.glide']

# list all the files contain 'DO NOT EDIT', but are not generated
skipped_ungenerated_files = [
'hack/boilerplate/boilerplate.py']

def normalize_files(files):
newfiles = []
for pathname in files:
Expand Down Expand Up @@ -191,6 +209,8 @@ def get_regexs():
regexs["go_build_constraints"] = re.compile(r"^(// \+build.*\n)+\n", re.MULTILINE)
# strip #!.* from shell scripts
regexs["shebang"] = re.compile(r"^(#!.*\n)\n*", re.MULTILINE)
# Search for generated files
regexs["generated"] = re.compile('DO NOT EDIT')
return regexs


Expand Down
28 changes: 28 additions & 0 deletions hack/update-mock.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
#!/bin/bash

# Copyright 2020 The Kubernetes Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

set -euo pipefail

REPO_ROOT=$(realpath $(dirname ${BASH_SOURCE})/..)
COPYRIGHT_FILE="${REPO_ROOT}/hack/boilerplate/boilerplate.generatego.txt"

if ! type mockgen &> /dev/null; then
echo "mockgen not exist, install it"
go install github.com/golang/mock/mockgen@v1.6.0
fi

echo "Updating mocks for util.go"
mockgen -copyright_file=$COPYRIGHT_FILE -source=pkg/util/util.go -package=util -destination=pkg/util/util_mock.go
8 changes: 8 additions & 0 deletions pkg/blob/blob.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ type DriverOptions struct {
KubeAPIBurst int
EnableAznfsMount bool
VolStatsCacheExpireInMinutes int
SasTokenExpirationMinutes int
}

// Driver implements all interfaces of CSI drivers
Expand Down Expand Up @@ -211,6 +212,10 @@ type Driver struct {
accountSearchCache azcache.Resource
// a timed cache storing volume stats <volumeID, volumeStats>
volStatsCache azcache.Resource
// sas expiry time for azcopy in volume clone
sasTokenExpirationMinutes int
// azcopy for provide exec mock for ut
azcopy *util.Azcopy
}

// NewDriver Creates a NewCSIDriver object. Assumes vendor version is equal to driver version &
Expand All @@ -236,6 +241,8 @@ func NewDriver(options *DriverOptions) *Driver {
kubeAPIQPS: options.KubeAPIQPS,
kubeAPIBurst: options.KubeAPIBurst,
enableAznfsMount: options.EnableAznfsMount,
sasTokenExpirationMinutes: options.SasTokenExpirationMinutes,
azcopy: &util.Azcopy{},
}
d.Name = options.DriverName
d.Version = driverVersion
Expand Down Expand Up @@ -288,6 +295,7 @@ func (d *Driver) Run(endpoint, kubeconfig string, testBool bool) {
//csi.ControllerServiceCapability_RPC_LIST_SNAPSHOTS,
csi.ControllerServiceCapability_RPC_EXPAND_VOLUME,
csi.ControllerServiceCapability_RPC_SINGLE_NODE_MULTI_WRITER,
csi.ControllerServiceCapability_RPC_CLONE_VOLUME,
})
d.AddVolumeCapabilityAccessModes([]csi.VolumeCapability_AccessMode_Mode{
csi.VolumeCapability_AccessMode_SINGLE_NODE_WRITER,
Expand Down
136 changes: 132 additions & 4 deletions pkg/blob/controllerserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,18 @@ package blob
import (
"context"
"fmt"
"net/url"
"os/exec"
"strconv"
"strings"
"time"

"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/sas"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/service"
"github.com/Azure/azure-sdk-for-go/services/storage/mgmt/2021-09-01/storage"
azstorage "github.com/Azure/azure-sdk-for-go/storage"
"github.com/container-storage-interface/spec/lib/go/csi"
Expand All @@ -42,6 +48,9 @@ import (

const (
privateEndpoint = "privateendpoint"

waitForCopyInterval = 5 * time.Second
waitForCopyTimeout = 3 * time.Minute
)

// CreateVolume provisions a volume
Expand All @@ -61,6 +70,11 @@ func (d *Driver) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest)
}

if acquired := d.volumeLocks.TryAcquire(volName); !acquired {
// logging the job status if it's volume cloning
if req.GetVolumeContentSource() != nil {
jobState, percent, err := d.azcopy.GetAzcopyJob(volName)
klog.V(2).Infof("azcopy job status: %s, copy percent: %s%%, error: %v", jobState, percent, err)
}
return nil, status.Errorf(codes.Aborted, volumeOperationAlreadyExistsFmt, volName)
}
defer d.volumeLocks.Release(volName)
Expand Down Expand Up @@ -313,7 +327,16 @@ func (d *Driver) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest)
}

var volumeID string
mc := metrics.NewMetricContext(blobCSIDriverName, "controller_create_volume", d.cloud.ResourceGroup, d.cloud.SubscriptionID, d.Name)
requestName := "controller_create_volume"
if req.GetVolumeContentSource() != nil {
switch req.VolumeContentSource.Type.(type) {
case *csi.VolumeContentSource_Snapshot:
requestName = "controller_create_volume_from_snapshot"
case *csi.VolumeContentSource_Volume:
requestName = "controller_create_volume_from_volume"
}
}
mc := metrics.NewMetricContext(blobCSIDriverName, requestName, d.cloud.ResourceGroup, d.cloud.SubscriptionID, d.Name)
isOperationSucceeded := false
defer func() {
mc.ObserveOperationWithResult(isOperationSucceeded, VolumeID, volumeID)
Expand Down Expand Up @@ -387,9 +410,20 @@ func (d *Driver) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest)
setKeyValueInMap(parameters, containerNameField, validContainerName)
}

klog.V(2).Infof("begin to create container(%s) on account(%s) type(%s) subsID(%s) rg(%s) location(%s) size(%d)", validContainerName, accountName, storageAccountType, subsID, resourceGroup, location, requestGiB)
if err := d.CreateBlobContainer(ctx, subsID, resourceGroup, accountName, validContainerName, secrets); err != nil {
return nil, status.Errorf(codes.Internal, "failed to create container(%s) on account(%s) type(%s) rg(%s) location(%s) size(%d), error: %v", validContainerName, accountName, storageAccountType, resourceGroup, location, requestGiB, err)
if req.GetVolumeContentSource() != nil {
if accountKey == "" {
if _, accountKey, err = d.GetStorageAccesskey(ctx, accountOptions, secrets, secretName, secretNamespace); err != nil {
return nil, status.Errorf(codes.Internal, "failed to GetStorageAccesskey on account(%s) rg(%s), error: %v", accountOptions.Name, accountOptions.ResourceGroup, err)
}
}
if err := d.copyVolume(ctx, req, accountKey, validContainerName, storageEndpointSuffix); err != nil {
return nil, err
}
} else {
klog.V(2).Infof("begin to create container(%s) on account(%s) type(%s) subsID(%s) rg(%s) location(%s) size(%d)", validContainerName, accountName, storageAccountType, subsID, resourceGroup, location, requestGiB)
if err := d.CreateBlobContainer(ctx, subsID, resourceGroup, accountName, validContainerName, secrets); err != nil {
return nil, status.Errorf(codes.Internal, "failed to create container(%s) on account(%s) type(%s) rg(%s) location(%s) size(%d), error: %v", validContainerName, accountName, storageAccountType, resourceGroup, location, requestGiB, err)
}
}

if storeAccountKey && len(req.GetSecrets()) == 0 {
Expand Down Expand Up @@ -430,6 +464,7 @@ func (d *Driver) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest)
VolumeId: volumeID,
CapacityBytes: req.GetCapacityRange().GetRequiredBytes(),
VolumeContext: parameters,
ContentSource: req.GetVolumeContentSource(),
},
}, nil
}
Expand Down Expand Up @@ -675,6 +710,75 @@ func (d *Driver) DeleteBlobContainer(ctx context.Context, subsID, resourceGroupN
})
}

// CopyBlobContainer copies a blob container in the same storage account
func (d *Driver) copyBlobContainer(ctx context.Context, req *csi.CreateVolumeRequest, accountKey, dstContainerName, storageEndpointSuffix string) error {
var sourceVolumeID string
if req.GetVolumeContentSource() != nil && req.GetVolumeContentSource().GetVolume() != nil {
sourceVolumeID = req.GetVolumeContentSource().GetVolume().GetVolumeId()

}
resourceGroupName, accountName, srcContainerName, _, _, err := GetContainerInfo(sourceVolumeID) //nolint:dogsled
if err != nil {
return status.Error(codes.NotFound, err.Error())
}
if srcContainerName == "" || dstContainerName == "" {
return fmt.Errorf("srcContainerName(%s) or dstContainerName(%s) is empty", srcContainerName, dstContainerName)
}

klog.V(2).Infof("generate sas token for account(%s)", accountName)
accountSasToken, genErr := generateSASToken(accountName, accountKey, storageEndpointSuffix, d.sasTokenExpirationMinutes)
if genErr != nil {
return genErr
}

timeAfter := time.After(waitForCopyTimeout)
timeTick := time.Tick(waitForCopyInterval)
srcPath := fmt.Sprintf("https://%s.blob.%s/%s%s", accountName, storageEndpointSuffix, srcContainerName, accountSasToken)
dstPath := fmt.Sprintf("https://%s.blob.%s/%s%s", accountName, storageEndpointSuffix, dstContainerName, accountSasToken)

jobState, percent, err := d.azcopy.GetAzcopyJob(dstContainerName)
klog.V(2).Infof("azcopy job status: %s, copy percent: %s%%, error: %v", jobState, percent, err)
if jobState == util.AzcopyJobError || jobState == util.AzcopyJobCompleted {
return err
}
klog.V(2).Infof("begin to copy blob container %s to %s", srcContainerName, dstContainerName)
for {
select {
case <-timeTick:
jobState, percent, err := d.azcopy.GetAzcopyJob(dstContainerName)
klog.V(2).Infof("azcopy job status: %s, copy percent: %s%%, error: %v", jobState, percent, err)
switch jobState {
case util.AzcopyJobError, util.AzcopyJobCompleted:
return err
case util.AzcopyJobNotFound:
klog.V(2).Infof("copy blob container %s to %s", srcContainerName, dstContainerName)
out, copyErr := exec.Command("azcopy", "copy", srcPath, dstPath, "--recursive", "--check-length=false").CombinedOutput()
if copyErr != nil {
klog.Warningf("CopyBlobContainer(%s, %s, %s) failed with error(%v): %v", resourceGroupName, accountName, dstPath, copyErr, string(out))
} else {
klog.V(2).Infof("copied blob container %s to %s successfully", srcContainerName, dstContainerName)
}
return copyErr
}
case <-timeAfter:
return fmt.Errorf("timeout waiting for copy blob container %s to %s succeed", srcContainerName, dstContainerName)
}
}
}

// copyVolume copies a volume form volume or snapshot, snapshot is not supported now
func (d *Driver) copyVolume(ctx context.Context, req *csi.CreateVolumeRequest, accountKey, dstContainerName, storageEndpointSuffix string) error {
vs := req.VolumeContentSource
switch vs.Type.(type) {
case *csi.VolumeContentSource_Snapshot:
return status.Errorf(codes.InvalidArgument, "copy volume from volumeSnapshot is not supported")
case *csi.VolumeContentSource_Volume:
return d.copyBlobContainer(ctx, req, accountKey, dstContainerName, storageEndpointSuffix)
default:
return status.Errorf(codes.InvalidArgument, "%v is not a proper volume source", vs)
}
}

// isValidVolumeCapabilities validates the given VolumeCapability array is valid
func isValidVolumeCapabilities(volCaps []*csi.VolumeCapability) error {
if len(volCaps) == 0 {
Expand All @@ -699,3 +803,27 @@ func parseDays(dayStr string) (int32, error) {

return int32(days), nil
}

// generateSASToken generate a sas token for storage account
func generateSASToken(accountName, accountKey, storageEndpointSuffix string, expiryTime int) (string, error) {
credential, err := azblob.NewSharedKeyCredential(accountName, accountKey)
if err != nil {
return "", status.Errorf(codes.Internal, fmt.Sprintf("failed to generate sas token in creating new shared key credential, accountName: %s, err: %s", accountName, err.Error()))
}
serviceClient, err := service.NewClientWithSharedKeyCredential(fmt.Sprintf("https://%s.blob.%s/", accountName, storageEndpointSuffix), credential, nil)
if err != nil {
return "", status.Errorf(codes.Internal, fmt.Sprintf("failed to generate sas token in creating new client with shared key credential, accountName: %s, err: %s", accountName, err.Error()))
}
sasURL, err := serviceClient.GetSASURL(
sas.AccountResourceTypes{Object: true, Service: false, Container: true},
sas.AccountPermissions{Read: true, List: true, Write: true},
sas.AccountServices{Blob: true}, time.Now(), time.Now().Add(time.Duration(expiryTime)*time.Minute))
if err != nil {
return "", err
}
u, err := url.Parse(sasURL)
if err != nil {
return "", err
}
return "?" + u.RawQuery, nil
}
Loading

0 comments on commit fec2d23

Please sign in to comment.