-
Notifications
You must be signed in to change notification settings - Fork 336
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Use Connection() from util package #234
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Original file line number | Diff line number | Diff line change | ||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -20,12 +20,12 @@ import ( | |||||||||||||
"context" | ||||||||||||||
"fmt" | ||||||||||||||
"math" | ||||||||||||||
"net" | ||||||||||||||
"os" | ||||||||||||||
"strings" | ||||||||||||||
"time" | ||||||||||||||
|
||||||||||||||
"github.com/container-storage-interface/spec/lib/go/csi" | ||||||||||||||
"github.com/kubernetes-csi/csi-lib-utils/connection" | ||||||||||||||
"github.com/kubernetes-csi/csi-lib-utils/protosanitizer" | ||||||||||||||
"github.com/kubernetes-csi/external-provisioner/pkg/features" | ||||||||||||||
snapapi "github.com/kubernetes-csi/external-snapshotter/pkg/apis/volumesnapshot/v1alpha1" | ||||||||||||||
|
@@ -46,7 +46,6 @@ import ( | |||||||||||||
"k8s.io/klog" | ||||||||||||||
|
||||||||||||||
"google.golang.org/grpc" | ||||||||||||||
"google.golang.org/grpc/connectivity" | ||||||||||||||
) | ||||||||||||||
|
||||||||||||||
type deprecatedSecretParamsMap struct { | ||||||||||||||
|
@@ -185,55 +184,18 @@ func logGRPC(ctx context.Context, method string, req, reply interface{}, cc *grp | |||||||||||||
return err | ||||||||||||||
} | ||||||||||||||
|
||||||||||||||
func Connect(address string, timeout time.Duration) (*grpc.ClientConn, error) { | ||||||||||||||
klog.V(2).Infof("Connecting to %s", address) | ||||||||||||||
dialOptions := []grpc.DialOption{ | ||||||||||||||
grpc.WithInsecure(), | ||||||||||||||
grpc.WithBackoffMaxDelay(time.Second), | ||||||||||||||
grpc.WithUnaryInterceptor(logGRPC), | ||||||||||||||
} | ||||||||||||||
if strings.HasPrefix(address, "/") { | ||||||||||||||
dialOptions = append(dialOptions, grpc.WithDialer(func(addr string, timeout time.Duration) (net.Conn, error) { | ||||||||||||||
return net.DialTimeout("unix", addr, timeout) | ||||||||||||||
})) | ||||||||||||||
} | ||||||||||||||
conn, err := grpc.Dial(address, dialOptions...) | ||||||||||||||
func Connect(address string) (*grpc.ClientConn, error) { | ||||||||||||||
return connection.Connect(address, connection.OnConnectionLoss(connection.ExitOnConnectionLoss())) | ||||||||||||||
} | ||||||||||||||
|
||||||||||||||
if err != nil { | ||||||||||||||
return nil, err | ||||||||||||||
} | ||||||||||||||
ctx, cancel := context.WithTimeout(context.Background(), timeout) | ||||||||||||||
defer cancel() | ||||||||||||||
for { | ||||||||||||||
if !conn.WaitForStateChange(ctx, conn.GetState()) { | ||||||||||||||
klog.V(4).Infof("Connection timed out") | ||||||||||||||
return conn, fmt.Errorf("Connection timed out") | ||||||||||||||
} | ||||||||||||||
if conn.GetState() == connectivity.Ready { | ||||||||||||||
klog.V(3).Infof("Connected") | ||||||||||||||
return conn, nil | ||||||||||||||
} | ||||||||||||||
klog.V(4).Infof("Still trying, connection is %s", conn.GetState()) | ||||||||||||||
} | ||||||||||||||
func Probe(conn *grpc.ClientConn, singleCallTimeout time.Duration) error { | ||||||||||||||
return connection.ProbeForever(conn, singleCallTimeout) | ||||||||||||||
} | ||||||||||||||
|
||||||||||||||
func GetDriverName(conn *grpc.ClientConn, timeout time.Duration) (string, error) { | ||||||||||||||
ctx, cancel := context.WithTimeout(context.Background(), timeout) | ||||||||||||||
defer cancel() | ||||||||||||||
|
||||||||||||||
client := csi.NewIdentityClient(conn) | ||||||||||||||
|
||||||||||||||
req := csi.GetPluginInfoRequest{} | ||||||||||||||
|
||||||||||||||
rsp, err := client.GetPluginInfo(ctx, &req) | ||||||||||||||
if err != nil { | ||||||||||||||
return "", err | ||||||||||||||
} | ||||||||||||||
name := rsp.GetName() | ||||||||||||||
if name == "" { | ||||||||||||||
return "", fmt.Errorf("name is empty") | ||||||||||||||
} | ||||||||||||||
return name, nil | ||||||||||||||
return connection.GetDriverName(ctx, conn) | ||||||||||||||
} | ||||||||||||||
|
||||||||||||||
func getDriverCapabilities(conn *grpc.ClientConn, timeout time.Duration) (sets.Int, error) { | ||||||||||||||
|
@@ -248,30 +210,16 @@ func getDriverCapabilities(conn *grpc.ClientConn, timeout time.Duration) (sets.I | |||||||||||||
} | ||||||||||||||
|
||||||||||||||
capabilities := make(sets.Int) | ||||||||||||||
for _, cap := range pluginCaps { | ||||||||||||||
if cap == nil { | ||||||||||||||
continue | ||||||||||||||
} | ||||||||||||||
service := cap.GetService() | ||||||||||||||
if service == nil { | ||||||||||||||
continue | ||||||||||||||
} | ||||||||||||||
switch service.GetType() { | ||||||||||||||
for cap := range pluginCaps { | ||||||||||||||
switch cap { | ||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is mixing plugin and controller capabilities into the same set correct? Could you have conflicts in values? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
external-provisioner/pkg/controller/controller.go Lines 160 to 165 in e63995f
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks for pointing that out. Is there a reason why it needs to have one more level of translation from CSI capabilities? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we sort it out in a separate PR? I'd like to cache the capabilities so provisioner does not load them from the driver every time it needs them. Refactoring of the enum would be better there. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sure do you want to use csi-lib-utils 0.4.0-rc1 here? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Updated to 0.4.0-rc1 |
||||||||||||||
case csi.PluginCapability_Service_CONTROLLER_SERVICE: | ||||||||||||||
capabilities.Insert(PluginCapability_CONTROLLER_SERVICE) | ||||||||||||||
case csi.PluginCapability_Service_VOLUME_ACCESSIBILITY_CONSTRAINTS: | ||||||||||||||
capabilities.Insert(PluginCapability_ACCESSIBILITY_CONSTRAINTS) | ||||||||||||||
} | ||||||||||||||
} | ||||||||||||||
for _, cap := range controllerCaps { | ||||||||||||||
if cap == nil { | ||||||||||||||
continue | ||||||||||||||
} | ||||||||||||||
rpc := cap.GetRpc() | ||||||||||||||
if rpc == nil { | ||||||||||||||
continue | ||||||||||||||
} | ||||||||||||||
switch rpc.GetType() { | ||||||||||||||
for cap := range controllerCaps { | ||||||||||||||
switch cap { | ||||||||||||||
case csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME: | ||||||||||||||
capabilities.Insert(ControllerCapability_CREATE_DELETE_VOLUME) | ||||||||||||||
case csi.ControllerServiceCapability_RPC_CREATE_DELETE_SNAPSHOT: | ||||||||||||||
|
@@ -281,32 +229,16 @@ func getDriverCapabilities(conn *grpc.ClientConn, timeout time.Duration) (sets.I | |||||||||||||
return capabilities, nil | ||||||||||||||
} | ||||||||||||||
|
||||||||||||||
func getPluginCapabilities(conn *grpc.ClientConn, timeout time.Duration) ([]*csi.PluginCapability, error) { | ||||||||||||||
func getPluginCapabilities(conn *grpc.ClientConn, timeout time.Duration) (connection.PluginCapabilitySet, error) { | ||||||||||||||
ctx, cancel := context.WithTimeout(context.Background(), timeout) | ||||||||||||||
defer cancel() | ||||||||||||||
|
||||||||||||||
client := csi.NewIdentityClient(conn) | ||||||||||||||
req := csi.GetPluginCapabilitiesRequest{} | ||||||||||||||
|
||||||||||||||
rsp, err := client.GetPluginCapabilities(ctx, &req) | ||||||||||||||
if err != nil { | ||||||||||||||
return nil, err | ||||||||||||||
} | ||||||||||||||
return rsp.GetCapabilities(), nil | ||||||||||||||
return connection.GetPluginCapabilities(ctx, conn) | ||||||||||||||
} | ||||||||||||||
|
||||||||||||||
func getControllerCapabilities(conn *grpc.ClientConn, timeout time.Duration) ([]*csi.ControllerServiceCapability, error) { | ||||||||||||||
func getControllerCapabilities(conn *grpc.ClientConn, timeout time.Duration) (connection.ControllerCapabilitySet, error) { | ||||||||||||||
ctx, cancel := context.WithTimeout(context.Background(), timeout) | ||||||||||||||
defer cancel() | ||||||||||||||
|
||||||||||||||
client := csi.NewControllerClient(conn) | ||||||||||||||
req := csi.ControllerGetCapabilitiesRequest{} | ||||||||||||||
|
||||||||||||||
rsp, err := client.ControllerGetCapabilities(ctx, &req) | ||||||||||||||
if err != nil { | ||||||||||||||
return nil, err | ||||||||||||||
} | ||||||||||||||
return rsp.GetCapabilities(), nil | ||||||||||||||
return connection.GetControllerCapabilities(ctx, conn) | ||||||||||||||
} | ||||||||||||||
|
||||||||||||||
// NewCSIProvisioner creates new CSI provisioner | ||||||||||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
and update getcapabilities?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added