diff --git a/.trivyignore b/.trivyignore new file mode 100644 index 000000000..b01025946 --- /dev/null +++ b/.trivyignore @@ -0,0 +1,2 @@ +CVE-2023-39325 +CVE-2023-44487 \ No newline at end of file diff --git a/deploy/csi-blob-controller.yaml b/deploy/csi-blob-controller.yaml index 4087458f1..4f5212a94 100644 --- a/deploy/csi-blob-controller.yaml +++ b/deploy/csi-blob-controller.yaml @@ -40,7 +40,7 @@ spec: - "--csi-address=$(ADDRESS)" - "--leader-election" - "--leader-election-namespace=kube-system" - - "--timeout=120s" + - "--timeout=600s" - "--extra-create-metadata=true" - "--kube-api-qps=50" - "--kube-api-burst=100" diff --git a/deploy/example/pvc-blob-csi-clone.yaml b/deploy/example/pvc-blob-csi-clone.yaml new file mode 100644 index 000000000..d24da9016 --- /dev/null +++ b/deploy/example/pvc-blob-csi-clone.yaml @@ -0,0 +1,16 @@ +--- +apiVersion: v1 +kind: PersistentVolumeClaim +metadata: + name: pvc-blob-clone + namespace: default +spec: + accessModes: + - ReadWriteMany + resources: + requests: + storage: 100Gi + storageClassName: blob-fuse + dataSource: + kind: PersistentVolumeClaim + name: pvc-blob diff --git a/hack/boilerplate/boilerplate.generatego.txt b/hack/boilerplate/boilerplate.generatego.txt new file mode 100644 index 000000000..0926592d3 --- /dev/null +++ b/hack/boilerplate/boilerplate.generatego.txt @@ -0,0 +1,15 @@ +/* +Copyright The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ diff --git a/hack/boilerplate/boilerplate.gomock.txt b/hack/boilerplate/boilerplate.gomock.txt new file mode 100644 index 000000000..b66f79dd4 --- /dev/null +++ b/hack/boilerplate/boilerplate.gomock.txt @@ -0,0 +1,15 @@ +// /* +// Copyright The Kubernetes Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// */ diff --git a/hack/boilerplate/boilerplate.py b/hack/boilerplate/boilerplate.py index 1c1b5a744..58e5f4214 100755 --- a/hack/boilerplate/boilerplate.py +++ b/hack/boilerplate/boilerplate.py @@ -65,6 +65,14 @@ def get_refs(): return refs +def is_generated_file(filename, data, regexs): + for d in skipped_ungenerated_files: + if d in filename: + return False + + p = regexs["generated"] + return p.search(data) + def file_passes(filename, refs, regexs): try: f = open(filename, 'r') @@ -75,15 +83,21 @@ def file_passes(filename, refs, regexs): data = f.read() f.close() + # determine if the file is automatically generated + generated = is_generated_file(filename, data, regexs) + basename = os.path.basename(filename) extension = file_extension(filename) + if generated: + if extension == "go": + extension = "gomock" if extension != "": ref = refs[extension] else: ref = refs[basename] # remove build tags from the top of Go files - if extension == "go": + if extension == "go" or extension == "gomock": p = regexs["go_build_constraints"] (data, found) = p.subn("", data, 1) if is_autogenerated(data, regexs): @@ -142,6 +156,10 @@ def file_extension(filename): 'cluster/env.sh', 'vendor', 'test/e2e/generated/bindata.go', 'repo-infra/verify/boilerplate/test', '.glide'] + # list all the files contain 'DO NOT EDIT', but are not generated +skipped_ungenerated_files = [ + 'hack/boilerplate/boilerplate.py'] + def normalize_files(files): newfiles = [] for pathname in files: @@ -191,6 +209,8 @@ def get_regexs(): regexs["go_build_constraints"] = re.compile(r"^(// \+build.*\n)+\n", re.MULTILINE) # strip #!.* from shell scripts regexs["shebang"] = re.compile(r"^(#!.*\n)\n*", re.MULTILINE) + # Search for generated files + regexs["generated"] = re.compile('DO NOT EDIT') return regexs diff --git a/hack/update-mock.sh b/hack/update-mock.sh new file mode 100644 index 000000000..29010a696 --- /dev/null +++ b/hack/update-mock.sh @@ -0,0 +1,28 @@ +#!/bin/bash + +# Copyright 2020 The Kubernetes Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +set -euo pipefail + +REPO_ROOT=$(realpath $(dirname ${BASH_SOURCE})/..) +COPYRIGHT_FILE="${REPO_ROOT}/hack/boilerplate/boilerplate.generatego.txt" + +if ! type mockgen &> /dev/null; then + echo "mockgen not exist, install it" + go install github.com/golang/mock/mockgen@v1.6.0 +fi + +echo "Updating mocks for util.go" +mockgen -copyright_file=$COPYRIGHT_FILE -source=pkg/util/util.go -package=util -destination=pkg/util/util_mock.go diff --git a/pkg/blob/blob.go b/pkg/blob/blob.go index 7271bdd97..ec038106a 100644 --- a/pkg/blob/blob.go +++ b/pkg/blob/blob.go @@ -171,6 +171,7 @@ type DriverOptions struct { KubeAPIBurst int EnableAznfsMount bool VolStatsCacheExpireInMinutes int + SasTokenExpirationMinutes int } // Driver implements all interfaces of CSI drivers @@ -211,6 +212,10 @@ type Driver struct { accountSearchCache azcache.Resource // a timed cache storing volume stats volStatsCache azcache.Resource + // sas expiry time for azcopy in volume clone + sasTokenExpirationMinutes int + // azcopy for provide exec mock for ut + azcopy *util.Azcopy } // NewDriver Creates a NewCSIDriver object. Assumes vendor version is equal to driver version & @@ -236,6 +241,8 @@ func NewDriver(options *DriverOptions) *Driver { kubeAPIQPS: options.KubeAPIQPS, kubeAPIBurst: options.KubeAPIBurst, enableAznfsMount: options.EnableAznfsMount, + sasTokenExpirationMinutes: options.SasTokenExpirationMinutes, + azcopy: &util.Azcopy{}, } d.Name = options.DriverName d.Version = driverVersion @@ -288,6 +295,7 @@ func (d *Driver) Run(endpoint, kubeconfig string, testBool bool) { //csi.ControllerServiceCapability_RPC_LIST_SNAPSHOTS, csi.ControllerServiceCapability_RPC_EXPAND_VOLUME, csi.ControllerServiceCapability_RPC_SINGLE_NODE_MULTI_WRITER, + csi.ControllerServiceCapability_RPC_CLONE_VOLUME, }) d.AddVolumeCapabilityAccessModes([]csi.VolumeCapability_AccessMode_Mode{ csi.VolumeCapability_AccessMode_SINGLE_NODE_WRITER, diff --git a/pkg/blob/controllerserver.go b/pkg/blob/controllerserver.go index 6a70b39d7..e862558f6 100644 --- a/pkg/blob/controllerserver.go +++ b/pkg/blob/controllerserver.go @@ -19,12 +19,18 @@ package blob import ( "context" "fmt" + "net/url" + "os/exec" "strconv" "strings" + "time" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" + "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob" + "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/sas" + "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/service" "github.com/Azure/azure-sdk-for-go/services/storage/mgmt/2021-09-01/storage" azstorage "github.com/Azure/azure-sdk-for-go/storage" "github.com/container-storage-interface/spec/lib/go/csi" @@ -42,6 +48,9 @@ import ( const ( privateEndpoint = "privateendpoint" + + waitForCopyInterval = 5 * time.Second + waitForCopyTimeout = 3 * time.Minute ) // CreateVolume provisions a volume @@ -61,6 +70,11 @@ func (d *Driver) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest) } if acquired := d.volumeLocks.TryAcquire(volName); !acquired { + // logging the job status if it's volume cloning + if req.GetVolumeContentSource() != nil { + jobState, percent, err := d.azcopy.GetAzcopyJob(volName) + klog.V(2).Infof("azcopy job status: %s, copy percent: %s%%, error: %v", jobState, percent, err) + } return nil, status.Errorf(codes.Aborted, volumeOperationAlreadyExistsFmt, volName) } defer d.volumeLocks.Release(volName) @@ -313,7 +327,16 @@ func (d *Driver) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest) } var volumeID string - mc := metrics.NewMetricContext(blobCSIDriverName, "controller_create_volume", d.cloud.ResourceGroup, d.cloud.SubscriptionID, d.Name) + requestName := "controller_create_volume" + if req.GetVolumeContentSource() != nil { + switch req.VolumeContentSource.Type.(type) { + case *csi.VolumeContentSource_Snapshot: + requestName = "controller_create_volume_from_snapshot" + case *csi.VolumeContentSource_Volume: + requestName = "controller_create_volume_from_volume" + } + } + mc := metrics.NewMetricContext(blobCSIDriverName, requestName, d.cloud.ResourceGroup, d.cloud.SubscriptionID, d.Name) isOperationSucceeded := false defer func() { mc.ObserveOperationWithResult(isOperationSucceeded, VolumeID, volumeID) @@ -387,9 +410,20 @@ func (d *Driver) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest) setKeyValueInMap(parameters, containerNameField, validContainerName) } - klog.V(2).Infof("begin to create container(%s) on account(%s) type(%s) subsID(%s) rg(%s) location(%s) size(%d)", validContainerName, accountName, storageAccountType, subsID, resourceGroup, location, requestGiB) - if err := d.CreateBlobContainer(ctx, subsID, resourceGroup, accountName, validContainerName, secrets); err != nil { - return nil, status.Errorf(codes.Internal, "failed to create container(%s) on account(%s) type(%s) rg(%s) location(%s) size(%d), error: %v", validContainerName, accountName, storageAccountType, resourceGroup, location, requestGiB, err) + if req.GetVolumeContentSource() != nil { + if accountKey == "" { + if _, accountKey, err = d.GetStorageAccesskey(ctx, accountOptions, secrets, secretName, secretNamespace); err != nil { + return nil, status.Errorf(codes.Internal, "failed to GetStorageAccesskey on account(%s) rg(%s), error: %v", accountOptions.Name, accountOptions.ResourceGroup, err) + } + } + if err := d.copyVolume(ctx, req, accountKey, validContainerName, storageEndpointSuffix); err != nil { + return nil, err + } + } else { + klog.V(2).Infof("begin to create container(%s) on account(%s) type(%s) subsID(%s) rg(%s) location(%s) size(%d)", validContainerName, accountName, storageAccountType, subsID, resourceGroup, location, requestGiB) + if err := d.CreateBlobContainer(ctx, subsID, resourceGroup, accountName, validContainerName, secrets); err != nil { + return nil, status.Errorf(codes.Internal, "failed to create container(%s) on account(%s) type(%s) rg(%s) location(%s) size(%d), error: %v", validContainerName, accountName, storageAccountType, resourceGroup, location, requestGiB, err) + } } if storeAccountKey && len(req.GetSecrets()) == 0 { @@ -430,6 +464,7 @@ func (d *Driver) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest) VolumeId: volumeID, CapacityBytes: req.GetCapacityRange().GetRequiredBytes(), VolumeContext: parameters, + ContentSource: req.GetVolumeContentSource(), }, }, nil } @@ -675,6 +710,75 @@ func (d *Driver) DeleteBlobContainer(ctx context.Context, subsID, resourceGroupN }) } +// CopyBlobContainer copies a blob container in the same storage account +func (d *Driver) copyBlobContainer(ctx context.Context, req *csi.CreateVolumeRequest, accountKey, dstContainerName, storageEndpointSuffix string) error { + var sourceVolumeID string + if req.GetVolumeContentSource() != nil && req.GetVolumeContentSource().GetVolume() != nil { + sourceVolumeID = req.GetVolumeContentSource().GetVolume().GetVolumeId() + + } + resourceGroupName, accountName, srcContainerName, _, _, err := GetContainerInfo(sourceVolumeID) //nolint:dogsled + if err != nil { + return status.Error(codes.NotFound, err.Error()) + } + if srcContainerName == "" || dstContainerName == "" { + return fmt.Errorf("srcContainerName(%s) or dstContainerName(%s) is empty", srcContainerName, dstContainerName) + } + + klog.V(2).Infof("generate sas token for account(%s)", accountName) + accountSasToken, genErr := generateSASToken(accountName, accountKey, storageEndpointSuffix, d.sasTokenExpirationMinutes) + if genErr != nil { + return genErr + } + + timeAfter := time.After(waitForCopyTimeout) + timeTick := time.Tick(waitForCopyInterval) + srcPath := fmt.Sprintf("https://%s.blob.%s/%s%s", accountName, storageEndpointSuffix, srcContainerName, accountSasToken) + dstPath := fmt.Sprintf("https://%s.blob.%s/%s%s", accountName, storageEndpointSuffix, dstContainerName, accountSasToken) + + jobState, percent, err := d.azcopy.GetAzcopyJob(dstContainerName) + klog.V(2).Infof("azcopy job status: %s, copy percent: %s%%, error: %v", jobState, percent, err) + if jobState == util.AzcopyJobError || jobState == util.AzcopyJobCompleted { + return err + } + klog.V(2).Infof("begin to copy blob container %s to %s", srcContainerName, dstContainerName) + for { + select { + case <-timeTick: + jobState, percent, err := d.azcopy.GetAzcopyJob(dstContainerName) + klog.V(2).Infof("azcopy job status: %s, copy percent: %s%%, error: %v", jobState, percent, err) + switch jobState { + case util.AzcopyJobError, util.AzcopyJobCompleted: + return err + case util.AzcopyJobNotFound: + klog.V(2).Infof("copy blob container %s to %s", srcContainerName, dstContainerName) + out, copyErr := exec.Command("azcopy", "copy", srcPath, dstPath, "--recursive", "--check-length=false").CombinedOutput() + if copyErr != nil { + klog.Warningf("CopyBlobContainer(%s, %s, %s) failed with error(%v): %v", resourceGroupName, accountName, dstPath, copyErr, string(out)) + } else { + klog.V(2).Infof("copied blob container %s to %s successfully", srcContainerName, dstContainerName) + } + return copyErr + } + case <-timeAfter: + return fmt.Errorf("timeout waiting for copy blob container %s to %s succeed", srcContainerName, dstContainerName) + } + } +} + +// copyVolume copies a volume form volume or snapshot, snapshot is not supported now +func (d *Driver) copyVolume(ctx context.Context, req *csi.CreateVolumeRequest, accountKey, dstContainerName, storageEndpointSuffix string) error { + vs := req.VolumeContentSource + switch vs.Type.(type) { + case *csi.VolumeContentSource_Snapshot: + return status.Errorf(codes.InvalidArgument, "copy volume from volumeSnapshot is not supported") + case *csi.VolumeContentSource_Volume: + return d.copyBlobContainer(ctx, req, accountKey, dstContainerName, storageEndpointSuffix) + default: + return status.Errorf(codes.InvalidArgument, "%v is not a proper volume source", vs) + } +} + // isValidVolumeCapabilities validates the given VolumeCapability array is valid func isValidVolumeCapabilities(volCaps []*csi.VolumeCapability) error { if len(volCaps) == 0 { @@ -699,3 +803,27 @@ func parseDays(dayStr string) (int32, error) { return int32(days), nil } + +// generateSASToken generate a sas token for storage account +func generateSASToken(accountName, accountKey, storageEndpointSuffix string, expiryTime int) (string, error) { + credential, err := azblob.NewSharedKeyCredential(accountName, accountKey) + if err != nil { + return "", status.Errorf(codes.Internal, fmt.Sprintf("failed to generate sas token in creating new shared key credential, accountName: %s, err: %s", accountName, err.Error())) + } + serviceClient, err := service.NewClientWithSharedKeyCredential(fmt.Sprintf("https://%s.blob.%s/", accountName, storageEndpointSuffix), credential, nil) + if err != nil { + return "", status.Errorf(codes.Internal, fmt.Sprintf("failed to generate sas token in creating new client with shared key credential, accountName: %s, err: %s", accountName, err.Error())) + } + sasURL, err := serviceClient.GetSASURL( + sas.AccountResourceTypes{Object: true, Service: false, Container: true}, + sas.AccountPermissions{Read: true, List: true, Write: true}, + sas.AccountServices{Blob: true}, time.Now(), time.Now().Add(time.Duration(expiryTime)*time.Minute)) + if err != nil { + return "", err + } + u, err := url.Parse(sasURL) + if err != nil { + return "", err + } + return "?" + u.RawQuery, nil +} diff --git a/pkg/blob/controllerserver_test.go b/pkg/blob/controllerserver_test.go index bcf43d66b..6cb3952f3 100644 --- a/pkg/blob/controllerserver_test.go +++ b/pkg/blob/controllerserver_test.go @@ -32,6 +32,7 @@ import ( "google.golang.org/grpc/status" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/utils/pointer" + "sigs.k8s.io/blob-csi-driver/pkg/util" "sigs.k8s.io/cloud-provider-azure/pkg/azureclients/blobclient" "sigs.k8s.io/cloud-provider-azure/pkg/azureclients/storageaccountclient/mockstorageaccountclient" azure "sigs.k8s.io/cloud-provider-azure/pkg/provider" @@ -751,6 +752,116 @@ func TestCreateVolume(t *testing.T) { } }, }, + { + name: "create volume from copy volumesnapshot is not supported", + testFunc: func(t *testing.T) { + d := NewFakeDriver() + d.cloud = &azure.Cloud{} + d.cloud.SubscriptionID = "subID" + + keyList := make([]storage.AccountKey, 1) + fakeKey := "fakeKey" + fakeValue := "fakeValue" + keyList[0] = (storage.AccountKey{ + KeyName: &fakeKey, + Value: &fakeValue, + }) + d.cloud.StorageAccountClient = NewMockSAClient(context.Background(), gomock.NewController(t), "subID", "unit-test", "unit-test", &keyList) + + errorType := NULL + d.cloud.BlobClient = &mockBlobClient{errorType: &errorType} + + mp := make(map[string]string) + mp[protocolField] = "fuse" + mp[skuNameField] = "unit-test" + mp[storageAccountTypeField] = "unit-test" + mp[locationField] = "unit-test" + mp[storageAccountField] = "unittest" + mp[resourceGroupField] = "unit-test" + mp[containerNameField] = "unit-test" + mp[mountPermissionsField] = "0750" + + volumeSnapshotSource := &csi.VolumeContentSource_SnapshotSource{ + SnapshotId: "unit-test", + } + volumeContentSourceSnapshotSource := &csi.VolumeContentSource_Snapshot{ + Snapshot: volumeSnapshotSource, + } + volumecontensource := csi.VolumeContentSource{ + Type: volumeContentSourceSnapshotSource, + } + req := &csi.CreateVolumeRequest{ + Name: "unit-test", + VolumeCapabilities: stdVolumeCapabilities, + Parameters: mp, + VolumeContentSource: &volumecontensource, + } + d.Cap = []*csi.ControllerServiceCapability{ + controllerServiceCapability, + } + + expectedErr := status.Errorf(codes.InvalidArgument, "copy volume from volumeSnapshot is not supported") + _, err := d.CreateVolume(context.Background(), req) + if !reflect.DeepEqual(err, expectedErr) { + t.Errorf("Unexpected error: %v", err) + } + }, + }, + { + name: "create volume from copy volume not found", + testFunc: func(t *testing.T) { + d := NewFakeDriver() + d.cloud = &azure.Cloud{} + d.cloud.SubscriptionID = "subID" + + keyList := make([]storage.AccountKey, 1) + fakeKey := "fakeKey" + fakeValue := "fakeValue" + keyList[0] = (storage.AccountKey{ + KeyName: &fakeKey, + Value: &fakeValue, + }) + d.cloud.StorageAccountClient = NewMockSAClient(context.Background(), gomock.NewController(t), "subID", "unit-test", "unit-test", &keyList) + + errorType := NULL + d.cloud.BlobClient = &mockBlobClient{errorType: &errorType} + + mp := make(map[string]string) + mp[protocolField] = "fuse" + mp[skuNameField] = "unit-test" + mp[storageAccountTypeField] = "unit-test" + mp[locationField] = "unit-test" + mp[storageAccountField] = "unittest" + mp[resourceGroupField] = "unit-test" + mp[containerNameField] = "unit-test" + mp[mountPermissionsField] = "0750" + + volumeSource := &csi.VolumeContentSource_VolumeSource{ + VolumeId: "unit-test", + } + volumeContentSourceVolumeSource := &csi.VolumeContentSource_Volume{ + Volume: volumeSource, + } + volumecontensource := csi.VolumeContentSource{ + Type: volumeContentSourceVolumeSource, + } + req := &csi.CreateVolumeRequest{ + Name: "unit-test", + VolumeCapabilities: stdVolumeCapabilities, + Parameters: mp, + VolumeContentSource: &volumecontensource, + } + d.Cap = []*csi.ControllerServiceCapability{ + controllerServiceCapability, + } + + expectedErr := status.Errorf(codes.NotFound, "error parsing volume id: \"unit-test\", should at least contain two #") + _, err := d.CreateVolume(context.Background(), req) + if !reflect.DeepEqual(err, expectedErr) { + t.Errorf("Unexpected error: %v", err) + } + }, + }, } for _, tc := range testCases { t.Run(tc.name, tc.testFunc) @@ -1386,6 +1497,242 @@ func TestDeleteBlobContainer(t *testing.T) { } } +func TestCopyVolume(t *testing.T) { + stdVolumeCapability := &csi.VolumeCapability{ + AccessType: &csi.VolumeCapability_Mount{ + Mount: &csi.VolumeCapability_MountVolume{}, + }, + } + stdVolumeCapabilities := []*csi.VolumeCapability{ + stdVolumeCapability, + } + testCases := []struct { + name string + testFunc func(t *testing.T) + }{ + { + name: "copy volume from volumeSnapshot is not supported", + testFunc: func(t *testing.T) { + d := NewFakeDriver() + mp := map[string]string{} + + volumeSnapshotSource := &csi.VolumeContentSource_SnapshotSource{ + SnapshotId: "unit-test", + } + volumeContentSourceSnapshotSource := &csi.VolumeContentSource_Snapshot{ + Snapshot: volumeSnapshotSource, + } + volumecontensource := csi.VolumeContentSource{ + Type: volumeContentSourceSnapshotSource, + } + req := &csi.CreateVolumeRequest{ + Name: "unit-test", + VolumeCapabilities: stdVolumeCapabilities, + Parameters: mp, + VolumeContentSource: &volumecontensource, + } + + ctx := context.Background() + + expectedErr := status.Errorf(codes.InvalidArgument, "copy volume from volumeSnapshot is not supported") + err := d.copyVolume(ctx, req, "", "", "core.windows.net") + if !reflect.DeepEqual(err, expectedErr) { + t.Errorf("Unexpected error: %v", err) + } + }, + }, + { + name: "copy volume from volume not found", + testFunc: func(t *testing.T) { + d := NewFakeDriver() + mp := map[string]string{} + + volumeSource := &csi.VolumeContentSource_VolumeSource{ + VolumeId: "unit-test", + } + volumeContentSourceVolumeSource := &csi.VolumeContentSource_Volume{ + Volume: volumeSource, + } + volumecontensource := csi.VolumeContentSource{ + Type: volumeContentSourceVolumeSource, + } + + req := &csi.CreateVolumeRequest{ + Name: "unit-test", + VolumeCapabilities: stdVolumeCapabilities, + Parameters: mp, + VolumeContentSource: &volumecontensource, + } + + ctx := context.Background() + + expectedErr := status.Errorf(codes.NotFound, "error parsing volume id: \"unit-test\", should at least contain two #") + err := d.copyVolume(ctx, req, "", "dstContainer", "core.windows.net") + if !reflect.DeepEqual(err, expectedErr) { + t.Errorf("Unexpected error: %v", err) + } + }, + }, + { + name: "src blob container is empty", + testFunc: func(t *testing.T) { + d := NewFakeDriver() + mp := map[string]string{} + + volumeSource := &csi.VolumeContentSource_VolumeSource{ + VolumeId: "rg#unit-test##", + } + volumeContentSourceVolumeSource := &csi.VolumeContentSource_Volume{ + Volume: volumeSource, + } + volumecontensource := csi.VolumeContentSource{ + Type: volumeContentSourceVolumeSource, + } + + req := &csi.CreateVolumeRequest{ + Name: "unit-test", + VolumeCapabilities: stdVolumeCapabilities, + Parameters: mp, + VolumeContentSource: &volumecontensource, + } + + ctx := context.Background() + + expectedErr := fmt.Errorf("srcContainerName() or dstContainerName(dstContainer) is empty") + err := d.copyVolume(ctx, req, "", "dstContainer", "core.windows.net") + if !reflect.DeepEqual(err, expectedErr) { + t.Errorf("Unexpected error: %v", err) + } + }, + }, + { + name: "dst blob container is empty", + testFunc: func(t *testing.T) { + d := NewFakeDriver() + mp := map[string]string{} + + volumeSource := &csi.VolumeContentSource_VolumeSource{ + VolumeId: "vol_1#f5713de20cde511e8ba4900#fileshare#", + } + volumeContentSourceVolumeSource := &csi.VolumeContentSource_Volume{ + Volume: volumeSource, + } + volumecontensource := csi.VolumeContentSource{ + Type: volumeContentSourceVolumeSource, + } + + req := &csi.CreateVolumeRequest{ + Name: "unit-test", + VolumeCapabilities: stdVolumeCapabilities, + Parameters: mp, + VolumeContentSource: &volumecontensource, + } + + ctx := context.Background() + + expectedErr := fmt.Errorf("srcContainerName(fileshare) or dstContainerName() is empty") + err := d.copyVolume(ctx, req, "", "", "core.windows.net") + if !reflect.DeepEqual(err, expectedErr) { + t.Errorf("Unexpected error: %v", err) + } + }, + }, + { + name: "azcopy job is already completed", + testFunc: func(t *testing.T) { + d := NewFakeDriver() + mp := map[string]string{} + + volumeSource := &csi.VolumeContentSource_VolumeSource{ + VolumeId: "vol_1#f5713de20cde511e8ba4900#fileshare#", + } + volumeContentSourceVolumeSource := &csi.VolumeContentSource_Volume{ + Volume: volumeSource, + } + volumecontensource := csi.VolumeContentSource{ + Type: volumeContentSourceVolumeSource, + } + + req := &csi.CreateVolumeRequest{ + Name: "unit-test", + VolumeCapabilities: stdVolumeCapabilities, + Parameters: mp, + VolumeContentSource: &volumecontensource, + } + + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + m := util.NewMockEXEC(ctrl) + listStr := "JobId: ed1c3833-eaff-fe42-71d7-513fb065a9d9\nStart Time: Monday, 07-Aug-23 03:29:54 UTC\nStatus: Completed\nCommand: copy https://{accountName}.file.core.windows.net/{srcFileshare}{SAStoken} https://{accountName}.file.core.windows.net/{dstFileshare}{SAStoken} --recursive --check-length=false" + m.EXPECT().RunCommand(gomock.Eq("azcopy jobs list | grep dstContainer -B 3")).Return(listStr, nil) + // if test.enableShow { + // m.EXPECT().RunCommand(gomock.Not("azcopy jobs list | grep dstContainer -B 3")).Return(test.showStr, test.showErr) + // } + + d.azcopy.ExecCmd = m + + ctx := context.Background() + + var expectedErr error + err := d.copyVolume(ctx, req, "", "dstContainer", "core.windows.net") + if !reflect.DeepEqual(err, expectedErr) { + t.Errorf("Unexpected error: %v", err) + } + }, + }, + { + name: "azcopy job is first in progress and then be completed", + testFunc: func(t *testing.T) { + d := NewFakeDriver() + mp := map[string]string{} + + volumeSource := &csi.VolumeContentSource_VolumeSource{ + VolumeId: "vol_1#f5713de20cde511e8ba4900#fileshare#", + } + volumeContentSourceVolumeSource := &csi.VolumeContentSource_Volume{ + Volume: volumeSource, + } + volumecontensource := csi.VolumeContentSource{ + Type: volumeContentSourceVolumeSource, + } + + req := &csi.CreateVolumeRequest{ + Name: "unit-test", + VolumeCapabilities: stdVolumeCapabilities, + Parameters: mp, + VolumeContentSource: &volumecontensource, + } + + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + m := util.NewMockEXEC(ctrl) + listStr1 := "JobId: ed1c3833-eaff-fe42-71d7-513fb065a9d9\nStart Time: Monday, 07-Aug-23 03:29:54 UTC\nStatus: InProgress\nCommand: copy https://{accountName}.file.core.windows.net/{srcFileshare}{SAStoken} https://{accountName}.file.core.windows.net/{dstFileshare}{SAStoken} --recursive --check-length=false" + listStr2 := "JobId: ed1c3833-eaff-fe42-71d7-513fb065a9d9\nStart Time: Monday, 07-Aug-23 03:29:54 UTC\nStatus: Completed\nCommand: copy https://{accountName}.file.core.windows.net/{srcFileshare}{SAStoken} https://{accountName}.file.core.windows.net/{dstFileshare}{SAStoken} --recursive --check-length=false" + o1 := m.EXPECT().RunCommand(gomock.Eq("azcopy jobs list | grep dstContainer -B 3")).Return(listStr1, nil).Times(1) + m.EXPECT().RunCommand(gomock.Not("azcopy jobs list | grep dstBlobContainer -B 3")).Return("Percent Complete (approx): 50.0", nil) + o2 := m.EXPECT().RunCommand(gomock.Eq("azcopy jobs list | grep dstContainer -B 3")).Return(listStr2, nil) + gomock.InOrder(o1, o2) + + d.azcopy.ExecCmd = m + + ctx := context.Background() + + var expectedErr error + err := d.copyVolume(ctx, req, "", "dstContainer", "core.windows.net") + if !reflect.DeepEqual(err, expectedErr) { + t.Errorf("Unexpected error: %v", err) + } + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, tc.testFunc) + } +} + func Test_parseDays(t *testing.T) { type args struct { dayStr string @@ -1434,3 +1781,41 @@ func Test_parseDays(t *testing.T) { }) } } + +func Test_generateSASToken(t *testing.T) { + storageEndpointSuffix := "core.windows.net" + tests := []struct { + name string + accountName string + accountKey string + want string + expectedErr error + }{ + { + name: "accountName nil", + accountName: "", + accountKey: "", + want: "se=", + expectedErr: nil, + }, + { + name: "account key illegal", + accountName: "unit-test", + accountKey: "fakeValue", + want: "", + expectedErr: status.Errorf(codes.Internal, fmt.Sprintf("failed to generate sas token in creating new shared key credential, accountName: %s, err: %s", "unit-test", "decode account key: illegal base64 data at input byte 8")), + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + sas, err := generateSASToken(tt.accountName, tt.accountKey, storageEndpointSuffix, 30) + if !reflect.DeepEqual(err, tt.expectedErr) { + t.Errorf("generateSASToken error = %v, expectedErr %v, sas token = %v, want %v", err, tt.expectedErr, sas, tt.want) + return + } + if !strings.Contains(sas, tt.want) { + t.Errorf("sas token = %v, want %v", sas, tt.want) + } + }) + } +} diff --git a/pkg/blobplugin/Dockerfile b/pkg/blobplugin/Dockerfile index 1b5ab5c0a..c5bb8f636 100644 --- a/pkg/blobplugin/Dockerfile +++ b/pkg/blobplugin/Dockerfile @@ -29,7 +29,7 @@ RUN chmod +x /blobfuse-proxy/init.sh && \ chmod +x /blobfuse-proxy/blobfuse-proxy # packages that are only needed by aznfs: procps conntrack iptables bind9-host iproute2 bash netcat sysvinit-utils. -RUN apt update && apt upgrade -y && apt-mark unhold libcap2 && clean-install ca-certificates uuid-dev util-linux mount udev wget e2fsprogs nfs-common netbase procps conntrack iptables bind9-host iproute2 bash netcat sysvinit-utils +RUN apt update && apt upgrade -y && apt-mark unhold libcap2 && clean-install ca-certificates uuid-dev util-linux mount udev wget e2fsprogs nfs-common netbase procps conntrack iptables bind9-host iproute2 bash netcat sysvinit-utils curl # install aznfs RUN if [ "$ARCH" = "amd64" ] ; then \ @@ -38,6 +38,16 @@ else \ wget -O aznfs.tar.gz https://github.com/Azure/AZNFS-mount/releases/download/1.0.8/aznfs-1.0.8-1.arm64.tar.gz;fi RUN tar xvzf aznfs.tar.gz -C / && rm aznfs.tar.gz +# install azcopy +ARG azcopyURL=https://aka.ms/downloadazcopy-v10-linux +RUN if [ "$ARCH" == "arm64" ] ; then \ + azcopyURL=${azcopyURL}"-arm64"; fi +RUN curl -sL ${azcopyURL} | tar -xz && \ + mv ./azcopy_linux_$ARCH_*/azcopy /usr/local/bin/azcopy && \ + rm -rf ./azcopy_linux_$ARCH_* +RUN chmod +x /usr/local/bin/azcopy +RUN apt remove curl -y + RUN if [ "$ARCH" = "amd64" ] ; then \ clean-install libcurl4-gnutls-dev && \ wget -O /blobfuse-proxy/packages-microsoft-prod-22.04.deb https://packages.microsoft.com/config/ubuntu/22.04/packages-microsoft-prod.deb && \ diff --git a/pkg/blobplugin/main.go b/pkg/blobplugin/main.go index 27198dceb..d9f8e0e1d 100644 --- a/pkg/blobplugin/main.go +++ b/pkg/blobplugin/main.go @@ -56,6 +56,7 @@ var ( appendMountErrorHelpLink = flag.Bool("append-mount-error-help-link", true, "Whether to include a link for help with mount errors when a mount error occurs.") enableAznfsMount = flag.Bool("enable-aznfs-mount", false, "replace nfs mount with aznfs mount") volStatsCacheExpireInMinutes = flag.Int("vol-stats-cache-expire-in-minutes", 10, "The cache expire time in minutes for volume stats cache") + sasTokenExpirationMinutes = flag.Int("sas-token-expiration-minutes", 1440, "sas token expiration minutes during volume cloning") ) func main() { @@ -98,6 +99,7 @@ func handle() { KubeAPIBurst: *kubeAPIBurst, EnableAznfsMount: *enableAznfsMount, VolStatsCacheExpireInMinutes: *volStatsCacheExpireInMinutes, + SasTokenExpirationMinutes: *sasTokenExpirationMinutes, } driver := blob.NewDriver(&driverOptions) if driver == nil { diff --git a/pkg/csi-common/driver_test.go b/pkg/csi-common/driver_test.go index fcad5c197..170c91dd8 100644 --- a/pkg/csi-common/driver_test.go +++ b/pkg/csi-common/driver_test.go @@ -105,6 +105,7 @@ func TestValidateControllerServiceRequest(t *testing.T) { csi.ControllerServiceCapability_RPC_PUBLISH_UNPUBLISH_VOLUME, csi.ControllerServiceCapability_RPC_GET_CAPACITY, csi.ControllerServiceCapability_RPC_LIST_VOLUMES, + csi.ControllerServiceCapability_RPC_CLONE_VOLUME, }) // Test controller service publish/unpublish is supported @@ -123,6 +124,10 @@ func TestValidateControllerServiceRequest(t *testing.T) { err = d.ValidateControllerServiceRequest(csi.ControllerServiceCapability_RPC_GET_CAPACITY) assert.NoError(t, err) + // Test controller service clone volumes is supported + err = d.ValidateControllerServiceRequest(csi.ControllerServiceCapability_RPC_CLONE_VOLUME) + assert.NoError(t, err) + } func TestAddNodeServiceCapabilities(t *testing.T) { diff --git a/pkg/csi-common/utils_test.go b/pkg/csi-common/utils_test.go index a347e994b..6769cf18f 100644 --- a/pkg/csi-common/utils_test.go +++ b/pkg/csi-common/utils_test.go @@ -156,6 +156,9 @@ func TestNewControllerServiceCapability(t *testing.T) { { cap: csi.ControllerServiceCapability_RPC_GET_CAPACITY, }, + { + cap: csi.ControllerServiceCapability_RPC_CLONE_VOLUME, + }, } for _, test := range tests { resp := NewControllerServiceCapability(test.cap) diff --git a/pkg/util/util.go b/pkg/util/util.go index 6afc44600..352891d6d 100644 --- a/pkg/util/util.go +++ b/pkg/util/util.go @@ -19,6 +19,7 @@ package util import ( "fmt" "os" + "os/exec" "regexp" "strings" "sync" @@ -35,6 +36,15 @@ const ( tagKeyValueDelimiter = "=" ) +type AzcopyJobState string + +const ( + AzcopyJobError AzcopyJobState = "Error" + AzcopyJobNotFound AzcopyJobState = "NotFound" + AzcopyJobRunning AzcopyJobState = "Running" + AzcopyJobCompleted AzcopyJobState = "Completed" +) + // RoundUpBytes rounds up the volume size in bytes up to multiplications of GiB // in the unit of Bytes func RoundUpBytes(volumeSizeBytes int64) int64 { @@ -190,3 +200,108 @@ func TrimDuplicatedSpace(s string) string { s = reg.ReplaceAllString(s, " ") return s } + +type EXEC interface { + RunCommand(string) (string, error) +} + +type ExecCommand struct { +} + +func (ec *ExecCommand) RunCommand(cmd string) (string, error) { + out, err := exec.Command("sh", "-c", cmd).CombinedOutput() + return string(out), err +} + +type Azcopy struct { + ExecCmd EXEC +} + +// GetAzcopyJob get the azcopy job status if job existed +func (ac *Azcopy) GetAzcopyJob(dstBlobContainer string) (AzcopyJobState, string, error) { + cmdStr := fmt.Sprintf("azcopy jobs list | grep %s -B 3", dstBlobContainer) + // cmd output example: + // JobId: ed1c3833-eaff-fe42-71d7-513fb065a9d9 + // Start Time: Monday, 07-Aug-23 03:29:54 UTC + // Status: Completed (or Cancelled, InProgress) + // Command: copy https://{accountName}.file.core.windows.net/{srcBlobContainer}{SAStoken} https://{accountName}.file.core.windows.net/{dstBlobContainer}{SAStoken} --recursive --check-length=false + // -- + // JobId: b598cce3-9aa9-9640-7793-c2bf3c385a9a + // Start Time: Wednesday, 09-Aug-23 09:09:03 UTC + // Status: Cancelled + // Command: copy https://{accountName}.file.core.windows.net/{srcBlobContainer}{SAStoken} https://{accountName}.file.core.windows.net/{dstBlobContainer}{SAStoken} --recursive --check-length=false + if ac.ExecCmd == nil { + ac.ExecCmd = &ExecCommand{} + } + out, err := ac.ExecCmd.RunCommand(cmdStr) + // if grep command returns nothing, the exec will return exit status 1 error, so filter this error + if err != nil && err.Error() != "exit status 1" { + klog.Warningf("failed to get azcopy job with error: %v, jobState: %v", err, AzcopyJobError) + return AzcopyJobError, "", fmt.Errorf("couldn't list jobs in azcopy %v", err) + } + jobid, jobState, err := parseAzcopyJobList(out, dstBlobContainer) + if err != nil || jobState == AzcopyJobError { + klog.Warningf("failed to get azcopy job with error: %v, jobState: %v", err, jobState) + return AzcopyJobError, "", fmt.Errorf("couldn't parse azcopy job list in azcopy %v", err) + } + if jobState == AzcopyJobCompleted { + return jobState, "100.0", err + } + if jobid == "" { + return jobState, "", err + } + cmdPercentStr := fmt.Sprintf("azcopy jobs show %s | grep Percent", jobid) + // cmd out example: + // Percent Complete (approx): 100.0 + summary, err := ac.ExecCmd.RunCommand(cmdPercentStr) + if err != nil { + klog.Warningf("failed to get azcopy job with error: %v, jobState: %v", err, AzcopyJobError) + return AzcopyJobError, "", fmt.Errorf("couldn't show jobs summary in azcopy %v", err) + } + jobState, percent, err := parseAzcopyJobShow(summary) + if err != nil || jobState == AzcopyJobError { + klog.Warningf("failed to get azcopy job with error: %v, jobState: %v", err, jobState) + return AzcopyJobError, "", fmt.Errorf("couldn't parse azcopy job show in azcopy %v", err) + } + return jobState, percent, nil +} + +// parseAzcopyJobList parse command azcopy jobs list, get jobid and state from joblist containing dstBlobContainer +func parseAzcopyJobList(joblist string, dstBlobContainer string) (string, AzcopyJobState, error) { + jobid := "" + jobSegments := strings.Split(joblist, "JobId: ") + if len(jobSegments) < 2 { + return jobid, AzcopyJobNotFound, nil + } + jobSegments = jobSegments[1:] + for _, job := range jobSegments { + segments := strings.Split(job, "\n") + if len(segments) < 4 { + return jobid, AzcopyJobError, fmt.Errorf("error parsing jobs list: %s", job) + } + statusSegments := strings.Split(segments[2], ": ") + if len(statusSegments) < 2 { + return jobid, AzcopyJobError, fmt.Errorf("error parsing jobs list status: %s", segments[2]) + } + status := statusSegments[1] + switch status { + case "InProgress": + jobid = segments[0] + case "Completed": + return jobid, AzcopyJobCompleted, nil + } + } + if jobid == "" { + return jobid, AzcopyJobNotFound, nil + } + return jobid, AzcopyJobRunning, nil +} + +// parseAzcopyJobShow parse command azcopy jobs show jobid, get job state and copy percent +func parseAzcopyJobShow(jobshow string) (AzcopyJobState, string, error) { + segments := strings.Split(jobshow, ": ") + if len(segments) < 2 { + return AzcopyJobError, "", fmt.Errorf("error parsing jobs summary: %s in Percent Complete (approx)", jobshow) + } + return AzcopyJobRunning, strings.ReplaceAll(segments[1], "\n", ""), nil +} diff --git a/pkg/util/util_mock.go b/pkg/util/util_mock.go new file mode 100644 index 000000000..f381ec968 --- /dev/null +++ b/pkg/util/util_mock.go @@ -0,0 +1,66 @@ +// /* +// Copyright The Kubernetes Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// */ +// + +// Code generated by MockGen. DO NOT EDIT. +// Source: pkg/util/util.go + +// Package util is a generated GoMock package. +package util + +import ( + reflect "reflect" + + gomock "github.com/golang/mock/gomock" +) + +// MockEXEC is a mock of EXEC interface. +type MockEXEC struct { + ctrl *gomock.Controller + recorder *MockEXECMockRecorder +} + +// MockEXECMockRecorder is the mock recorder for MockEXEC. +type MockEXECMockRecorder struct { + mock *MockEXEC +} + +// NewMockEXEC creates a new mock instance. +func NewMockEXEC(ctrl *gomock.Controller) *MockEXEC { + mock := &MockEXEC{ctrl: ctrl} + mock.recorder = &MockEXECMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockEXEC) EXPECT() *MockEXECMockRecorder { + return m.recorder +} + +// RunCommand mocks base method. +func (m *MockEXEC) RunCommand(arg0 string) (string, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "RunCommand", arg0) + ret0, _ := ret[0].(string) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// RunCommand indicates an expected call of RunCommand. +func (mr *MockEXECMockRecorder) RunCommand(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RunCommand", reflect.TypeOf((*MockEXEC)(nil).RunCommand), arg0) +} diff --git a/pkg/util/util_test.go b/pkg/util/util_test.go index 77eab65b0..bbef8755b 100644 --- a/pkg/util/util_test.go +++ b/pkg/util/util_test.go @@ -23,6 +23,7 @@ import ( "testing" "time" + "github.com/golang/mock/gomock" "github.com/stretchr/testify/assert" ) @@ -320,3 +321,207 @@ func TestTrimDuplicatedSpace(t *testing.T) { }) } } + +func TestGetAzcopyJob(t *testing.T) { + tests := []struct { + desc string + listStr string + listErr error + enableShow bool + showStr string + showErr error + expectedJobState AzcopyJobState + expectedPercent string + expectedErr error + }{ + { + desc: "run exec get error", + listStr: "", + listErr: fmt.Errorf("error"), + enableShow: false, + showStr: "", + showErr: nil, + expectedJobState: AzcopyJobError, + expectedPercent: "", + expectedErr: fmt.Errorf("couldn't list jobs in azcopy error"), + }, + { + desc: "run exec parse azcopy job list get error", + listStr: "JobId: ed1c3833-eaff-fe42-71d7-513fb065a9d9\nStart Time: Monday, 07-Aug-23 03:29:54 UTC", + listErr: nil, + enableShow: false, + showStr: "", + showErr: nil, + expectedJobState: AzcopyJobError, + expectedPercent: "", + expectedErr: fmt.Errorf("couldn't parse azcopy job list in azcopy error parsing jobs list: ed1c3833-eaff-fe42-71d7-513fb065a9d9\nStart Time: Monday, 07-Aug-23 03:29:54 UTC"), + }, + { + desc: "run exec parse azcopy job not found jobid when Status is Canceled", + listStr: "JobId: ed1c3833-eaff-fe42-71d7-513fb065a9d9\nStart Time: Monday, 07-Aug-23 03:29:54 UTC\nStatus: Cancelled\nCommand: copy https://{accountName}.file.core.windows.net/{srcFileshare}{SAStoken} https://{accountName}.file.core.windows.net/{dstFileshare}{SAStoken} --recursive --check-length=false", + listErr: nil, + enableShow: false, + showStr: "", + showErr: nil, + expectedJobState: AzcopyJobNotFound, + expectedPercent: "", + expectedErr: nil, + }, + { + desc: "run exec parse azcopy job Completed", + listStr: "JobId: ed1c3833-eaff-fe42-71d7-513fb065a9d9\nStart Time: Monday, 07-Aug-23 03:29:54 UTC\nStatus: Completed\nCommand: copy https://{accountName}.file.core.windows.net/{srcFileshare}{SAStoken} https://{accountName}.file.core.windows.net/{dstFileshare}{SAStoken} --recursive --check-length=false", + listErr: nil, + enableShow: false, + showStr: "", + showErr: nil, + expectedJobState: AzcopyJobCompleted, + expectedPercent: "100.0", + expectedErr: nil, + }, + { + desc: "run exec get error in azcopy jobs show", + listStr: "JobId: ed1c3833-eaff-fe42-71d7-513fb065a9d9\nStart Time: Monday, 07-Aug-23 03:29:54 UTC\nStatus: InProgress\nCommand: copy https://{accountName}.file.core.windows.net/{srcFileshare}{SAStoken} https://{accountName}.file.core.windows.net/{dstFileshare}{SAStoken} --recursive --check-length=false", + listErr: nil, + enableShow: true, + showStr: "", + showErr: fmt.Errorf("error"), + expectedJobState: AzcopyJobError, + expectedPercent: "", + expectedErr: fmt.Errorf("couldn't show jobs summary in azcopy error"), + }, + { + desc: "run exec parse azcopy job show error", + listStr: "JobId: ed1c3833-eaff-fe42-71d7-513fb065a9d9\nStart Time: Monday, 07-Aug-23 03:29:54 UTC\nStatus: InProgress\nCommand: copy https://{accountName}.file.core.windows.net/{srcFileshare}{SAStoken} https://{accountName}.file.core.windows.net/{dstFileshare}{SAStoken} --recursive --check-length=false", + listErr: nil, + enableShow: true, + showStr: "", + showErr: nil, + expectedJobState: AzcopyJobError, + expectedPercent: "", + expectedErr: fmt.Errorf("couldn't parse azcopy job show in azcopy error parsing jobs summary: in Percent Complete (approx)"), + }, + { + desc: "run exec parse azcopy job show succeed", + listStr: "JobId: ed1c3833-eaff-fe42-71d7-513fb065a9d9\nStart Time: Monday, 07-Aug-23 03:29:54 UTC\nStatus: InProgress\nCommand: copy https://{accountName}.file.core.windows.net/{srcFileshare}{SAStoken} https://{accountName}.file.core.windows.net/{dstFileshare}{SAStoken} --recursive --check-length=false", + listErr: nil, + enableShow: true, + showStr: "Percent Complete (approx): 50.0", + showErr: nil, + expectedJobState: AzcopyJobRunning, + expectedPercent: "50.0", + expectedErr: nil, + }, + } + for _, test := range tests { + dstBlobContainer := "dstBlobContainer" + + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + m := NewMockEXEC(ctrl) + m.EXPECT().RunCommand(gomock.Eq("azcopy jobs list | grep dstBlobContainer -B 3")).Return(test.listStr, test.listErr) + if test.enableShow { + m.EXPECT().RunCommand(gomock.Not("azcopy jobs list | grep dstBlobContainer -B 3")).Return(test.showStr, test.showErr) + } + + azcopyFunc := &Azcopy{} + azcopyFunc.ExecCmd = m + jobState, percent, err := azcopyFunc.GetAzcopyJob(dstBlobContainer) + if jobState != test.expectedJobState || percent != test.expectedPercent || !reflect.DeepEqual(err, test.expectedErr) { + t.Errorf("test[%s]: unexpected jobState: %v, percent: %v, err: %v, expected jobState: %v, percent: %v, err: %v", test.desc, jobState, percent, err, test.expectedJobState, test.expectedPercent, test.expectedErr) + } + } +} + +func TestParseAzcopyJobList(t *testing.T) { + tests := []struct { + desc string + str string + expectedJobid string + expectedJobState AzcopyJobState + expectedErr error + }{ + { + desc: "azcopy job not found", + str: "", + expectedJobid: "", + expectedJobState: AzcopyJobNotFound, + expectedErr: nil, + }, + { + desc: "parse azcopy job list error", + str: "JobId: ed1c3833-eaff-fe42-71d7-513fb065a9d9\nStart Time: Monday, 07-Aug-23 03:29:54 UTC", + expectedJobid: "", + expectedJobState: AzcopyJobError, + expectedErr: fmt.Errorf("error parsing jobs list: ed1c3833-eaff-fe42-71d7-513fb065a9d9\nStart Time: Monday, 07-Aug-23 03:29:54 UTC"), + }, + { + desc: "parse azcopy job list status error", + str: "JobId: ed1c3833-eaff-fe42-71d7-513fb065a9d9\nStart Time: Monday, 07-Aug-23 03:29:54 UTC\nStatus Cancelled\nCommand: copy https://{accountName}.file.core.windows.net/{srcFileshare}{SAStoken} https://{accountName}.file.core.windows.net/{dstFileshare}{SAStoken} --recursive --check-length=false", + expectedJobid: "", + expectedJobState: AzcopyJobError, + expectedErr: fmt.Errorf("error parsing jobs list status: Status Cancelled"), + }, + { + desc: "parse azcopy job not found jobid when Status is Canceled", + str: "JobId: ed1c3833-eaff-fe42-71d7-513fb065a9d9\nStart Time: Monday, 07-Aug-23 03:29:54 UTC\nStatus: Cancelled\nCommand: copy https://{accountName}.file.core.windows.net/{srcFileshare}{SAStoken} https://{accountName}.file.core.windows.net/{dstFileshare}{SAStoken} --recursive --check-length=false", + expectedJobid: "", + expectedJobState: AzcopyJobNotFound, + expectedErr: nil, + }, + { + desc: "parse azcopy job Completed", + str: "JobId: ed1c3833-eaff-fe42-71d7-513fb065a9d9\nStart Time: Monday, 07-Aug-23 03:29:54 UTC\nStatus: Completed\nCommand: copy https://{accountName}.file.core.windows.net/{srcFileshare}{SAStoken} https://{accountName}.file.core.windows.net/{dstFileshare}{SAStoken} --recursive --check-length=false", + expectedJobid: "", + expectedJobState: AzcopyJobCompleted, + expectedErr: nil, + }, + { + desc: "parse azcopy job InProgress", + str: "JobId: ed1c3833-eaff-fe42-71d7-513fb065a9d9\nStart Time: Monday, 07-Aug-23 03:29:54 UTC\nStatus: InProgress\nCommand: copy https://{accountName}.file.core.windows.net/{srcFileshare}{SAStoken} https://{accountName}.file.core.windows.net/{dstFileshare}{SAStoken} --recursive --check-length=false", + expectedJobid: "ed1c3833-eaff-fe42-71d7-513fb065a9d9", + expectedJobState: AzcopyJobRunning, + expectedErr: nil, + }, + } + + for _, test := range tests { + dstBlobContainer := "dstBlobContainer" + jobid, jobState, err := parseAzcopyJobList(test.str, dstBlobContainer) + if jobid != test.expectedJobid || jobState != test.expectedJobState || !reflect.DeepEqual(err, test.expectedErr) { + t.Errorf("test[%s]: unexpected jobid: %v, jobState: %v, err: %v, expected jobid: %v, jobState: %v, err: %v", test.desc, jobid, jobState, err, test.expectedJobid, test.expectedJobState, test.expectedErr) + } + } +} + +func TestParseAzcopyJobShow(t *testing.T) { + tests := []struct { + desc string + str string + expectedJobState AzcopyJobState + expectedPercent string + expectedErr error + }{ + { + desc: "error parse azcopy job show", + str: "", + expectedJobState: AzcopyJobError, + expectedPercent: "", + expectedErr: fmt.Errorf("error parsing jobs summary: in Percent Complete (approx)"), + }, + { + desc: "parse azcopy job show succeed", + str: "Percent Complete (approx): 50.0", + expectedJobState: AzcopyJobRunning, + expectedPercent: "50.0", + expectedErr: nil, + }, + } + + for _, test := range tests { + jobState, percent, err := parseAzcopyJobShow(test.str) + if jobState != test.expectedJobState || percent != test.expectedPercent || !reflect.DeepEqual(err, test.expectedErr) { + t.Errorf("test[%s]: unexpected jobState: %v, percent: %v, err: %v, expected jobState: %v, percent: %v, err: %v", test.desc, jobState, percent, err, test.expectedJobState, test.expectedPercent, test.expectedErr) + } + } +} diff --git a/test/e2e/dynamic_provisioning_test.go b/test/e2e/dynamic_provisioning_test.go index 1495f993a..0f2ee5697 100644 --- a/test/e2e/dynamic_provisioning_test.go +++ b/test/e2e/dynamic_provisioning_test.go @@ -841,4 +841,132 @@ var _ = ginkgo.Describe("[blob-csi-e2e] Dynamic Provisioning", func() { } test.Run(ctx, cs, ns) }) + + ginkgo.It("should clone a volume from an existing NFSv3 volume [nfs]", func(ctx ginkgo.SpecContext) { + pod := testsuites.PodDetails{ + Cmd: "echo 'hello world' > /mnt/test-1/data && grep 'hello world' /mnt/test-1/data", + Volumes: []testsuites.VolumeDetails{ + { + ClaimSize: "10Gi", + MountOptions: []string{ + "nconnect=8", + }, + VolumeMount: testsuites.VolumeMountDetails{ + NameGenerate: "test-volume-", + MountPathGenerate: "/mnt/test-", + }, + }, + }, + } + podWithClonedVolume := testsuites.PodDetails{ + Cmd: "grep 'hello world' /mnt/test-1/data", + } + test := testsuites.DynamicallyProvisionedVolumeCloningTest{ + CSIDriver: testDriver, + Pod: pod, + PodWithClonedVolume: podWithClonedVolume, + StorageClassParameters: map[string]string{ + "skuName": "Premium_LRS", + "protocol": "nfs", + "mountPermissions": "0755", + }, + } + test.Run(ctx, cs, ns) + }) + + ginkgo.It("should clone a large size volume from an existing NFSv3 volume [nfs]", func(ctx ginkgo.SpecContext) { + pod := testsuites.PodDetails{ + Cmd: "echo 'hello world' > /mnt/test-1/data && grep 'hello world' /mnt/test-1/data && dd if=/dev/zero of=/mnt/test-1/test bs=99G count=5", + Volumes: []testsuites.VolumeDetails{ + { + ClaimSize: "100Gi", + MountOptions: []string{ + "nconnect=8", + }, + VolumeMount: testsuites.VolumeMountDetails{ + NameGenerate: "test-volume-", + MountPathGenerate: "/mnt/test-", + }, + }, + }, + } + podWithClonedVolume := testsuites.PodDetails{ + Cmd: "grep 'hello world' /mnt/test-1/data", + } + test := testsuites.DynamicallyProvisionedVolumeCloningTest{ + CSIDriver: testDriver, + Pod: pod, + PodWithClonedVolume: podWithClonedVolume, + StorageClassParameters: map[string]string{ + "skuName": "Premium_LRS", + "protocol": "nfs", + "mountPermissions": "0755", + }, + } + test.Run(ctx, cs, ns) + }) + + ginkgo.It("should clone a volume from an existing blobfuse2 volume [fuse2]", func(ctx ginkgo.SpecContext) { + pod := testsuites.PodDetails{ + Cmd: "echo 'hello world' > /mnt/test-1/data && grep 'hello world' /mnt/test-1/data", + Volumes: []testsuites.VolumeDetails{ + { + ClaimSize: "10Gi", + MountOptions: []string{ + "-o allow_other", + "--virtual-directory=true", // blobfuse2 mount options + }, + VolumeMount: testsuites.VolumeMountDetails{ + NameGenerate: "test-volume-", + MountPathGenerate: "/mnt/test-", + }, + }, + }, + } + podWithClonedVolume := testsuites.PodDetails{ + Cmd: "grep 'hello world' /mnt/test-1/data", + } + test := testsuites.DynamicallyProvisionedVolumeCloningTest{ + CSIDriver: testDriver, + Pod: pod, + PodWithClonedVolume: podWithClonedVolume, + StorageClassParameters: map[string]string{ + "skuName": "Standard_LRS", + "protocol": "fuse2", + }, + } + test.Run(ctx, cs, ns) + }) + + ginkgo.It("should clone a large size volume from an existing blobfuse2 volume [fuse2]", func(ctx ginkgo.SpecContext) { + pod := testsuites.PodDetails{ + Cmd: "echo 'hello world' > /mnt/test-1/data && grep 'hello world' /mnt/test-1/data && dd if=/dev/zero of=/mnt/test-1/test bs=99G count=5", + Volumes: []testsuites.VolumeDetails{ + { + ClaimSize: "100Gi", + MountOptions: []string{ + "-o allow_other", + "--virtual-directory=true", // blobfuse2 mount options + }, + VolumeMount: testsuites.VolumeMountDetails{ + NameGenerate: "test-volume-", + MountPathGenerate: "/mnt/test-", + }, + }, + }, + } + podWithClonedVolume := testsuites.PodDetails{ + Cmd: "grep 'hello world' /mnt/test-1/data", + } + test := testsuites.DynamicallyProvisionedVolumeCloningTest{ + CSIDriver: testDriver, + Pod: pod, + PodWithClonedVolume: podWithClonedVolume, + StorageClassParameters: map[string]string{ + "skuName": "Standard_LRS", + "protocol": "fuse2", + }, + } + test.Run(ctx, cs, ns) + }) }) diff --git a/test/e2e/testsuites/dynamically_provisioned_volume_cloning_tester.go b/test/e2e/testsuites/dynamically_provisioned_volume_cloning_tester.go new file mode 100644 index 000000000..955926078 --- /dev/null +++ b/test/e2e/testsuites/dynamically_provisioned_volume_cloning_tester.go @@ -0,0 +1,82 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package testsuites + +import ( + "context" + "time" + + "github.com/onsi/ginkgo/v2" + v1 "k8s.io/api/core/v1" + clientset "k8s.io/client-go/kubernetes" + "sigs.k8s.io/blob-csi-driver/test/e2e/driver" +) + +// DynamicallyProvisionedVolumeCloningTest will provision required StorageClass(es), PVC(s) and Pod(s) +// ClonedVolumeSize optional for when testing for cloned volume with different size to the original volume +type DynamicallyProvisionedVolumeCloningTest struct { + CSIDriver driver.DynamicPVTestDriver + Pod PodDetails + PodWithClonedVolume PodDetails + ClonedVolumeSize string + StorageClassParameters map[string]string +} + +func (t *DynamicallyProvisionedVolumeCloningTest) Run(ctx context.Context, client clientset.Interface, namespace *v1.Namespace) { + // create the storageClass + tsc, tscCleanup := t.Pod.Volumes[0].CreateStorageClass(ctx, client, namespace, t.CSIDriver, t.StorageClassParameters) + defer tscCleanup(ctx) + + // create the pod + t.Pod.Volumes[0].StorageClass = tsc.storageClass + tpod, cleanups := t.Pod.SetupWithDynamicVolumes(ctx, client, namespace, t.CSIDriver, t.StorageClassParameters) + for i := range cleanups { + defer cleanups[i](ctx) + } + + ginkgo.By("deploying the pod") + tpod.Create(ctx) + defer tpod.Cleanup(ctx) + ginkgo.By("checking that the pod's command exits with no error") + tpod.WaitForSuccess(ctx) + ginkgo.By("sleep 5s and then clone volume") + time.Sleep(5 * time.Second) + + ginkgo.By("cloning existing volume") + clonedVolume := t.Pod.Volumes[0] + clonedVolume.DataSource = &DataSource{ + Name: tpod.pod.Spec.Volumes[0].VolumeSource.PersistentVolumeClaim.ClaimName, + Kind: VolumePVCKind, + } + clonedVolume.StorageClass = tsc.storageClass + + if t.ClonedVolumeSize != "" { + clonedVolume.ClaimSize = t.ClonedVolumeSize + } + + t.PodWithClonedVolume.Volumes = []VolumeDetails{clonedVolume} + tpod, cleanups = t.PodWithClonedVolume.SetupWithDynamicVolumes(ctx, client, namespace, t.CSIDriver, t.StorageClassParameters) + for i := range cleanups { + defer cleanups[i](ctx) + } + + ginkgo.By("deploying a second pod with cloned volume") + tpod.Create(ctx) + defer tpod.Cleanup(ctx) + ginkgo.By("checking that the pod's command exits with no error") + tpod.WaitForSuccess(ctx) +} diff --git a/test/e2e/testsuites/specs.go b/test/e2e/testsuites/specs.go index 006e1e8ed..e785decc6 100644 --- a/test/e2e/testsuites/specs.go +++ b/test/e2e/testsuites/specs.go @@ -50,7 +50,9 @@ type VolumeDetails struct { // Optional, used with pre-provisioned volumes VolumeID string // Optional, used with PVCs created from snapshots - DataSource *DataSource + DataSource *DataSource + // Optional, used with specified StorageClass + StorageClass *storagev1.StorageClass NodeStageSecretRef string Attrib map[string]string } @@ -63,6 +65,7 @@ const ( ) const ( + VolumePVCKind = "PersistentVolumeClaim" APIVersionv1alpha1 = "v1alpha1" ) @@ -84,6 +87,7 @@ type VolumeDeviceDetails struct { } type DataSource struct { + Kind string Name string } @@ -186,6 +190,7 @@ func (volume *VolumeDetails) SetupDynamicPersistentVolumeClaim(ctx context.Conte if volume.DataSource != nil { dataSource := &v1.TypedLocalObjectReference{ Name: volume.DataSource.Name, + Kind: volume.DataSource.Kind, } tpvc = NewTestPersistentVolumeClaimWithDataSource(client, namespace, volume.ClaimSize, volume.VolumeMode, &createdStorageClass, dataSource) } else { @@ -220,3 +225,11 @@ func (volume *VolumeDetails) SetupPreProvisionedPersistentVolumeClaim(ctx contex return tpvc, cleanupFuncs } + +func (volume *VolumeDetails) CreateStorageClass(ctx context.Context, client clientset.Interface, namespace *v1.Namespace, csiDriver driver.DynamicPVTestDriver, storageClassParameters map[string]string) (*TestStorageClass, func(ctx context.Context)) { + ginkgo.By("setting up the StorageClass") + storageClass := csiDriver.GetProvisionStorageClass(storageClassParameters, volume.MountOptions, volume.ReclaimPolicy, volume.VolumeBindingMode, volume.AllowedTopologyValues, namespace.Name) + tsc := NewTestStorageClass(client, namespace, storageClass) + tsc.Create(ctx) + return tsc, tsc.Cleanup +} diff --git a/test/external-e2e/run.sh b/test/external-e2e/run.sh index bc760ee43..64caa2b2e 100755 --- a/test/external-e2e/run.sh +++ b/test/external-e2e/run.sh @@ -65,7 +65,7 @@ if [ ! -z ${EXTERNAL_E2E_TEST_BLOBFUSE_v2} ]; then cp deploy/example/storageclass-blobfuse2.yaml /tmp/csi/storageclass.yaml # achieve close-to-open cache consistency like in NFSv3 sed -i 's/file-cache-timeout-in-seconds=120/file-cache-timeout-in-seconds=0/g' /tmp/csi/storageclass.yaml - ginkgo -p -v --fail-fast --flake-attempts 2 -focus="External.Storage.*$DRIVER.csi.azure.com" \ + ginkgo -p -v --fail-fast --flake-attempts 8 -focus="External.Storage.*$DRIVER.csi.azure.com" \ -skip='\[Disruptive\]|allow exec of files on the volume|unmount after the subpath directory is deleted|should concurrently access the single volume from pods on different node|pod created with an initial fsgroup, volume contents ownership changed via chgrp in first pod, new pod with same fsgroup skips ownership changes to the volume contents|should provision storage with any volume data source|should mount multiple PV pointing to the same storage on the same node' kubernetes/test/bin/e2e.test -- \ -storage.testdriver=$PROJECT_ROOT/test/external-e2e/testdriver-blobfuse.yaml \ --kubeconfig=$KUBECONFIG diff --git a/test/external-e2e/testdriver-blobfuse.yaml b/test/external-e2e/testdriver-blobfuse.yaml index d1ee6a017..debd66558 100644 --- a/test/external-e2e/testdriver-blobfuse.yaml +++ b/test/external-e2e/testdriver-blobfuse.yaml @@ -17,3 +17,4 @@ DriverInfo: nodeExpansion: true volumeLimits: false snapshotDataSource: false + pvcDataSource: true diff --git a/test/external-e2e/testdriver-nfs.yaml b/test/external-e2e/testdriver-nfs.yaml index 2bbfa33b5..8a7b07ca7 100644 --- a/test/external-e2e/testdriver-nfs.yaml +++ b/test/external-e2e/testdriver-nfs.yaml @@ -17,3 +17,4 @@ DriverInfo: nodeExpansion: true volumeLimits: false snapshotDataSource: false + pvcDataSource: true diff --git a/test/sanity/run-test.sh b/test/sanity/run-test.sh index b4ba71a12..d82fb6b56 100755 --- a/test/sanity/run-test.sh +++ b/test/sanity/run-test.sh @@ -32,9 +32,20 @@ if [[ "$#" -gt 0 ]] && [[ -n "$1" ]]; then nodeid="$1" fi +azcopyPath="/usr/local/bin/azcopy" +if [ ! -f "$azcopyPath" ]; then + azcopyVersion=azcopy_linux_amd64_10.18.1 + echo 'Downloading azcopy...' + wget -c https://azcopyvnext.azureedge.net/release20230420/$azcopyVersion.tar.gz + tar -zxvf $azcopyVersion.tar.gz + mv ./$azcopyVersion/azcopy /usr/local/bin/azcopy + rm -rf ./$azcopyVersion* + chmod +x /usr/local/bin/azcopy +fi + _output/amd64/blobplugin --endpoint "$controllerendpoint" -v=5 & _output/amd64/blobplugin --endpoint "$nodeendpoint" --nodeid "$nodeid" --enable-blob-mock-mount -v=5 & echo "Begin to run sanity test..." readonly CSI_SANITY_BIN='csi-sanity' -"$CSI_SANITY_BIN" --ginkgo.v --csi.endpoint=$nodeendpoint --csi.controllerendpoint=$controllerendpoint -ginkgo.skip="should fail when requesting to create a volume with already existing name and different capacity" +"$CSI_SANITY_BIN" --ginkgo.v --csi.endpoint=$nodeendpoint --csi.controllerendpoint=$controllerendpoint -ginkgo.skip="should fail when requesting to create a volume with already existing name and different capacity|should create volume from an existing source snapshot"