Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ require (
golang.org/x/crypto v0.17.0 // indirect
golang.org/x/exp v0.0.0-20230905200255-921286631fa9 // indirect
golang.org/x/oauth2 v0.11.0 // indirect
golang.org/x/sync v0.5.0 // indirect
golang.org/x/sync v0.5.0
golang.org/x/sys v0.15.0 // indirect
golang.org/x/term v0.15.0 // indirect
golang.org/x/text v0.14.0 // indirect
Expand Down
36 changes: 29 additions & 7 deletions pkg/blob/blob.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package blob

import (
"context"
"errors"
"flag"
"fmt"
"os"
Expand All @@ -31,8 +32,9 @@ import (
az "github.com/Azure/go-autorest/autorest/azure"
"github.com/container-storage-interface/spec/lib/go/csi"
"github.com/pborman/uuid"
"google.golang.org/grpc"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
apierror "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/klog/v2"
Expand Down Expand Up @@ -301,17 +303,37 @@ func NewDriver(options *DriverOptions, kubeClient kubernetes.Interface, cloud *p
}

// Run driver initialization
func (d *Driver) Run(endpoint string, testBool bool) {
func (d *Driver) Run(ctx context.Context, endpoint string) error {
versionMeta, err := GetVersionYAML(d.Name)
if err != nil {
klog.Fatalf("%v", err)
}
klog.Infof("\nDRIVER INFORMATION:\n-------------------\n%s\n\nStreaming logs below:", versionMeta)

s := csicommon.NewNonBlockingGRPCServer()
grpcInterceptor := grpc.UnaryInterceptor(csicommon.LogGRPC)
opts := []grpc.ServerOption{
grpcInterceptor,
}
s := grpc.NewServer(opts...)
csi.RegisterIdentityServer(s, d)
csi.RegisterControllerServer(s, d)
csi.RegisterNodeServer(s, d)

go func() {
//graceful shutdown
<-ctx.Done()
s.GracefulStop()
}()
// Driver d act as IdentityServer, ControllerServer and NodeServer
s.Start(endpoint, d, d, d, testBool)
s.Wait()
listener, err := csicommon.Listen(ctx, endpoint)
if err != nil {
klog.Fatalf("failed to listen to endpoint, error: %v", err)
}
err = s.Serve(listener)
if errors.Is(err, grpc.ErrServerStopped) {
klog.Infof("gRPC server stopped serving")
return nil
}
return err
}

// GetContainerInfo get container info according to volume id
Expand Down Expand Up @@ -797,7 +819,7 @@ func setAzureCredentials(ctx context.Context, kubeClient kubernetes.Interface, a
Type: "Opaque",
}
_, err := kubeClient.CoreV1().Secrets(secretNamespace).Create(ctx, secret, metav1.CreateOptions{})
if errors.IsAlreadyExists(err) {
if apierror.IsAlreadyExists(err) {
err = nil
}
if err != nil {
Expand Down
21 changes: 19 additions & 2 deletions pkg/blob/blob_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,12 @@ import (
"sort"
"strings"
"testing"
"time"

"github.com/Azure/azure-sdk-for-go/services/storage/mgmt/2021-09-01/storage"
"github.com/golang/mock/gomock"
"github.com/stretchr/testify/assert"
"golang.org/x/sync/errgroup"
v1api "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
Expand Down Expand Up @@ -132,7 +134,15 @@ func TestRun(t *testing.T) {
os.Setenv(DefaultAzureCredentialFileEnv, fakeCredFile)

d := NewFakeDriver()
d.Run("tcp://127.0.0.1:0", true)

ctx, cancelFn := context.WithCancel(context.Background())
var routines errgroup.Group
routines.Go(func() error { return d.Run(ctx, "tcp://127.0.0.1:0") })
time.Sleep(time.Millisecond * 500)
cancelFn()
time.Sleep(time.Millisecond * 500)
err := routines.Wait()
assert.Nil(t, err)
},
},
{
Expand All @@ -159,7 +169,14 @@ func TestRun(t *testing.T) {
d := NewFakeDriver()
d.cloud = &azure.Cloud{}
d.NodeID = ""
d.Run("tcp://127.0.0.1:0", true)
ctx, cancelFn := context.WithCancel(context.Background())
var routines errgroup.Group
routines.Go(func() error { return d.Run(ctx, "tcp://127.0.0.1:0") })
time.Sleep(time.Millisecond * 500)
cancelFn()
time.Sleep(time.Millisecond * 500)
err := routines.Wait()
assert.Nil(t, err)
},
},
}
Expand Down
4 changes: 3 additions & 1 deletion pkg/blobplugin/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,9 @@ func handle() {
if driver == nil {
klog.Fatalln("Failed to initialize Azure Blob Storage CSI driver")
}
driver.Run(*endpoint, false)
if err := driver.Run(context.Background(), *endpoint); err != nil {
klog.Fatalf("Failed to run Azure Blob Storage CSI driver: %v", err)
}
}

func exportMetrics() {
Expand Down
118 changes: 0 additions & 118 deletions pkg/csi-common/server.go

This file was deleted.

67 changes: 0 additions & 67 deletions pkg/csi-common/server_test.go

This file was deleted.

30 changes: 29 additions & 1 deletion pkg/csi-common/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ package csicommon

import (
"fmt"
"net"
"os"
"runtime"
"strings"

"github.com/container-storage-interface/spec/lib/go/csi"
Expand All @@ -37,6 +40,31 @@ func ParseEndpoint(ep string) (string, string, error) {
return "", "", fmt.Errorf("Invalid endpoint: %v", ep)
}

func Listen(ctx context.Context, endpoint string) (net.Listener, error) {
proto, addr, err := ParseEndpoint(endpoint)
if err != nil {
klog.Errorf(err.Error())
return nil, err
}

if proto == "unix" {
if runtime.GOOS != "windows" {
addr = "/" + addr
}
if err := os.Remove(addr); err != nil && !os.IsNotExist(err) {
klog.Errorf("Failed to remove %s, error: %s", addr, err.Error())
return nil, err
}
}
listenConfig := net.ListenConfig{}
listener, err := listenConfig.Listen(ctx, proto, addr)
if err != nil {
klog.Errorf("Failed to listen: %v", err)
return nil, err
}
return listener, nil
}

func NewVolumeCapabilityAccessMode(mode csi.VolumeCapability_AccessMode_Mode) *csi.VolumeCapability_AccessMode {
return &csi.VolumeCapability_AccessMode{Mode: mode}
}
Expand Down Expand Up @@ -70,7 +98,7 @@ func getLogLevel(method string) int32 {
return 2
}

func logGRPC(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
func LogGRPC(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
level := klog.Level(getLogLevel(info.FullMethod))
klog.V(level).Infof("GRPC call: %s", info.FullMethod)
klog.V(level).Infof("GRPC request: %s", protosanitizer.StripSecrets(req))
Expand Down
Loading