From f84f554301ce1e0300a1b8147f491fbf82391db8 Mon Sep 17 00:00:00 2001 From: Joe Betz Date: Fri, 20 Apr 2018 10:43:44 -0700 Subject: [PATCH] clientv3: Fix auth client to use endpoints instead of host when dialing, fix tests to block on dial when required. --- .../balancer/resolver/endpoint/endpoint.go | 13 +++++++++- clientv3/client.go | 24 ++++++++++++------- integration/cluster.go | 4 ++++ integration/v3_alarm_test.go | 2 ++ integration/v3_grpc_inflight_test.go | 6 +++-- integration/v3_grpc_test.go | 2 ++ 6 files changed, 39 insertions(+), 12 deletions(-) diff --git a/clientv3/balancer/resolver/endpoint/endpoint.go b/clientv3/balancer/resolver/endpoint/endpoint.go index 75a7e8b74b3..3f2d115e1f9 100644 --- a/clientv3/balancer/resolver/endpoint/endpoint.go +++ b/clientv3/balancer/resolver/endpoint/endpoint.go @@ -146,8 +146,19 @@ func (r *Resolver) Close() { bldr.removeResolver(r) } +// Target constructs a endpoint target with current resolver's clusterName. func (r *Resolver) Target(endpoint string) string { - return fmt.Sprintf("%s://%s/%s", scheme, r.clusterName, endpoint) + return Target(r.clusterName, endpoint) +} + +// Target constructs a endpoint resolver target. +func Target(clusterName, endpoint string) string { + return fmt.Sprintf("%s://%s/%s", scheme, clusterName, endpoint) +} + +// IsTarget checks if a given target string in an endpoint resolver target. +func IsTarget(target string) bool { + return strings.HasPrefix(target, "endpoint://") } // Parse endpoint parses a endpoint of the form (http|https)://*|(unix|unixs)://) and returns a diff --git a/clientv3/client.go b/clientv3/client.go index 008252f14a3..b513b4e7d9f 100644 --- a/clientv3/client.go +++ b/clientv3/client.go @@ -298,14 +298,13 @@ func (c *Client) getToken(ctx context.Context) error { for i := 0; i < len(c.cfg.Endpoints); i++ { endpoint := c.cfg.Endpoints[i] - host := getHost(endpoint) // use dial options without dopts to avoid reusing the client balancer var dOpts []grpc.DialOption dOpts, err = c.dialSetupOpts(c.resolver.Target(endpoint), c.cfg.DialOptions...) if err != nil { continue } - auth, err = newAuthenticator(ctx, host, dOpts, c) + auth, err = newAuthenticator(ctx, endpoint, dOpts, c) if err != nil { continue } @@ -327,8 +326,8 @@ func (c *Client) getToken(ctx context.Context) error { return err } -func (c *Client) dial(endpoint string, dopts ...grpc.DialOption) (*grpc.ClientConn, error) { - opts, err := c.dialSetupOpts(endpoint, dopts...) +func (c *Client) dial(ep string, dopts ...grpc.DialOption) (*grpc.ClientConn, error) { + opts, err := c.dialSetupOpts(ep, dopts...) if err != nil { return nil, err } @@ -367,11 +366,18 @@ func (c *Client) dial(endpoint string, dopts ...grpc.DialOption) (*grpc.ClientCo defer cancel() } - // TODO: The hosts check doesn't really make sense for a load balanced endpoint url for the new grpc load balancer interface. - // Is it safe/sane to use the provided endpoint here? - //host := getHost(endpoint) - //conn, err := grpc.DialContext(c.ctx, host, opts...) - conn, err := grpc.DialContext(dctx, endpoint, opts...) + // We pass a target to DialContext of the form: endpoint:/// that + // does not include scheme (http/https/unix/unixs) or path parts. + if endpoint.IsTarget(ep) { + clusterName, tep, err := endpoint.ParseTarget(ep) + if err != nil { + return nil, fmt.Errorf("failed to parse endpoint target '%s': %v", ep, err) + } + _, host, _ := endpoint.ParseEndpoint(tep) + ep = endpoint.Target(clusterName, host) + } + + conn, err := grpc.DialContext(dctx, ep, opts...) if err != nil { return nil, err } diff --git a/integration/cluster.go b/integration/cluster.go index e92aa95e5c7..6458b10724f 100644 --- a/integration/cluster.go +++ b/integration/cluster.go @@ -536,6 +536,7 @@ type member struct { PeerTLSInfo *transport.TLSInfo // ClientTLSInfo enables client TLS when set ClientTLSInfo *transport.TLSInfo + DialOptions []grpc.DialOption raftHandler *testutil.PauseableHandler s *etcdserver.EtcdServer @@ -744,6 +745,9 @@ func NewClientV3(m *member) (*clientv3.Client, error) { } cfg.TLS = tls } + if m.DialOptions != nil { + cfg.DialOptions = append(cfg.DialOptions, m.DialOptions...) + } return newClientV3(cfg) } diff --git a/integration/v3_alarm_test.go b/integration/v3_alarm_test.go index 01af6440689..049436bf6cd 100644 --- a/integration/v3_alarm_test.go +++ b/integration/v3_alarm_test.go @@ -191,10 +191,12 @@ func TestV3CorruptAlarm(t *testing.T) { } // Member 0 restarts into split brain. + clus.Members[0].WaitStarted(t) resp0, err0 := clus.Client(0).Get(context.TODO(), "abc") if err0 != nil { t.Fatal(err0) } + clus.Members[1].WaitStarted(t) resp1, err1 := clus.Client(1).Get(context.TODO(), "abc") if err1 != nil { t.Fatal(err1) diff --git a/integration/v3_grpc_inflight_test.go b/integration/v3_grpc_inflight_test.go index dd0d180cc0d..08c1a1f2369 100644 --- a/integration/v3_grpc_inflight_test.go +++ b/integration/v3_grpc_inflight_test.go @@ -25,6 +25,7 @@ import ( "github.com/coreos/etcd/pkg/testutil" "google.golang.org/grpc" + "google.golang.org/grpc/transport" ) // TestV3MaintenanceDefragmentInflightRange ensures inflight range requests @@ -81,8 +82,9 @@ func TestV3KVInflightRangeRequests(t *testing.T) { defer wg.Done() _, err := kvc.Range(ctx, &pb.RangeRequest{Key: []byte("foo"), Serializable: true}, grpc.FailFast(false)) if err != nil { - if err != nil && rpctypes.ErrorDesc(err) != context.Canceled.Error() { - t.Fatalf("inflight request should be canceld with %v, got %v", context.Canceled, err) + errDesc := rpctypes.ErrorDesc(err) + if err != nil && !(errDesc == context.Canceled.Error() || errDesc == transport.ErrConnClosing.Desc) { + t.Fatalf("inflight request should be canceled with '%v' or '%v', got '%v'", context.Canceled.Error(), transport.ErrConnClosing.Desc, errDesc) } } }() diff --git a/integration/v3_grpc_test.go b/integration/v3_grpc_test.go index c936703d5e3..b573881bb41 100644 --- a/integration/v3_grpc_test.go +++ b/integration/v3_grpc_test.go @@ -1570,6 +1570,7 @@ func TestTLSGRPCRejectSecureClient(t *testing.T) { defer clus.Terminate(t) clus.Members[0].ClientTLSInfo = &testTLSInfo + clus.Members[0].DialOptions = []grpc.DialOption{grpc.WithBlock()} client, err := NewClientV3(clus.Members[0]) if client != nil || err == nil { t.Fatalf("expected no client") @@ -1752,6 +1753,7 @@ func testTLSReload( continue } cli, cerr := clientv3.New(clientv3.Config{ + DialOptions: []grpc.DialOption{grpc.WithBlock()}, Endpoints: []string{clus.Members[0].GRPCAddr()}, DialTimeout: time.Second, TLS: cc,