Skip to content

Commit

Permalink
merge master, resolve conflicts
Browse files Browse the repository at this point in the history
  • Loading branch information
lyuxuan committed May 15, 2017
2 parents bdf9a64 + aacd01c commit 35d77ea
Show file tree
Hide file tree
Showing 21 changed files with 1,082 additions and 145 deletions.
5 changes: 5 additions & 0 deletions balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ package grpc

import (
"fmt"
"net"
"sync"

"golang.org/x/net/context"
Expand All @@ -60,6 +61,10 @@ type BalancerConfig struct {
// use to dial to a remote load balancer server. The Balancer implementations
// can ignore this if it does not need to talk to another party securely.
DialCreds credentials.TransportCredentials
// Dialer is the custom dialer the Balancer implementation can use to dial
// to a remote load balancer server. The Balancer implementations
// can ignore this if it doesn't need to talk to remote balancer.
Dialer func(context.Context, string) (net.Conn, error)
}

// BalancerGetOptions configures a Get call.
Expand Down
5 changes: 4 additions & 1 deletion call.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ func invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli
ctx = newContextWithRPCInfo(ctx)
sh := cc.dopts.copts.StatsHandler
if sh != nil {
ctx = sh.TagRPC(ctx, &stats.RPCTagInfo{FullMethodName: method})
ctx = sh.TagRPC(ctx, &stats.RPCTagInfo{FullMethodName: method, FailFast: c.failFast})
begin := &stats.Begin{
Client: true,
BeginTime: time.Now(),
Expand Down Expand Up @@ -238,6 +238,9 @@ func invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli
if cc.dopts.cp != nil {
callHdr.SendCompress = cc.dopts.cp.Type()
}
if c.creds != nil {
callHdr.Creds = c.creds
}

gopts := BalancerGetOptions{
BlockingWait: !c.failFast,
Expand Down
1 change: 1 addition & 0 deletions clientconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -407,6 +407,7 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
}
config := BalancerConfig{
DialCreds: credsClone,
Dialer: cc.dopts.copts.Dialer,
}
if err := cc.dopts.balancer.Start(target, config); err != nil {
waitC <- err
Expand Down
37 changes: 25 additions & 12 deletions grpclb.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ type balancer struct {
func (b *balancer) watchAddrUpdates(w naming.Watcher, ch chan []remoteBalancerInfo) error {
updates, err := w.Next()
if err != nil {
grpclog.Printf("grpclb: failed to get next addr update from watcher: %v", err)
return err
}
b.mu.Lock()
Expand Down Expand Up @@ -306,6 +307,7 @@ func (b *balancer) sendLoadReport(s *balanceLoadClientStream, interval time.Dura
ClientStats: &stats,
},
}); err != nil {
grpclog.Printf("grpclb: failed to send load report: %v", err)
return
}
}
Expand All @@ -316,7 +318,7 @@ func (b *balancer) callRemoteBalancer(lbc *loadBalancerClient, seq int) (retry b
defer cancel()
stream, err := lbc.BalanceLoad(ctx)
if err != nil {
grpclog.Printf("Failed to perform RPC to the remote balancer %v", err)
grpclog.Printf("grpclb: failed to perform RPC to the remote balancer %v", err)
return
}
b.mu.Lock()
Expand All @@ -333,17 +335,19 @@ func (b *balancer) callRemoteBalancer(lbc *loadBalancerClient, seq int) (retry b
},
}
if err := stream.Send(initReq); err != nil {
grpclog.Printf("grpclb: failed to send init request: %v", err)
// TODO: backoff on retry?
return true
}
reply, err := stream.Recv()
if err != nil {
grpclog.Printf("grpclb: failed to recv init response: %v", err)
// TODO: backoff on retry?
return true
}
initResp := reply.GetInitialResponse()
if initResp == nil {
grpclog.Println("Failed to receive the initial response from the remote balancer.")
grpclog.Println("grpclb: reply from remote balancer did not include initial response.")
return
}
// TODO: Support delegation.
Expand All @@ -364,6 +368,7 @@ func (b *balancer) callRemoteBalancer(lbc *loadBalancerClient, seq int) (retry b
for {
reply, err := stream.Recv()
if err != nil {
grpclog.Printf("grpclb: failed to recv server list: %v", err)
break
}
b.mu.Lock()
Expand Down Expand Up @@ -397,6 +402,7 @@ func (b *balancer) Start(target string, config BalancerConfig) error {
w, err := b.r.Resolve(target)
if err != nil {
b.mu.Unlock()
grpclog.Printf("grpclb: failed to resolve address: %v, err: %v", target, err)
return err
}
b.w = w
Expand All @@ -406,7 +412,7 @@ func (b *balancer) Start(target string, config BalancerConfig) error {
go func() {
for {
if err := b.watchAddrUpdates(w, balancerAddrsCh); err != nil {
grpclog.Printf("grpc: the naming watcher stops working due to %v.\n", err)
grpclog.Printf("grpclb: the naming watcher stops working due to %v.\n", err)
close(balancerAddrsCh)
return
}
Expand Down Expand Up @@ -490,22 +496,29 @@ func (b *balancer) Start(target string, config BalancerConfig) error {
cc.Close()
}
// Talk to the remote load balancer to get the server list.
var err error
creds := config.DialCreds
ccError = make(chan struct{})
if creds == nil {
cc, err = Dial(rb.addr, WithInsecure())
} else {
var (
err error
dopts []DialOption
)
if creds := config.DialCreds; creds != nil {
if rb.name != "" {
if err := creds.OverrideServerName(rb.name); err != nil {
grpclog.Printf("Failed to override the server name in the credentials: %v", err)
grpclog.Printf("grpclb: failed to override the server name in the credentials: %v", err)
continue
}
}
cc, err = Dial(rb.addr, WithTransportCredentials(creds))
dopts = append(dopts, WithTransportCredentials(creds))
} else {
dopts = append(dopts, WithInsecure())
}
if dialer := config.Dialer; dialer != nil {
// WithDialer takes a different type of function, so we instead use a special DialOption here.
dopts = append(dopts, func(o *dialOptions) { o.copts.Dialer = dialer })
}
ccError = make(chan struct{})
cc, err = Dial(rb.addr, dopts...)
if err != nil {
grpclog.Printf("Failed to setup a connection to the remote balancer %v: %v", rb.addr, err)
grpclog.Printf("grpclb: failed to setup a connection to the remote balancer %v: %v", rb.addr, err)
close(ccError)
continue
}
Expand Down
51 changes: 35 additions & 16 deletions grpclb/grpclb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
*
*/

// Package grpclb is currently used only for grpclb testing.
package grpclb

import (
Expand Down Expand Up @@ -59,6 +60,11 @@ var (
lbsn = "bar.com"
besn = "foo.com"
lbToken = "iamatoken"

// Resolver replaces 127.0.0.1 with fakeName in Next().
// Dialer replaces fakeName with 127.0.0.1 when dialing.
// This will test that custom dialer is passed from Dial to grpclb.
fakeName = "fake.Name"
)

type testWatcher struct {
Expand All @@ -81,6 +87,9 @@ func (w *testWatcher) Next() (updates []*naming.Update, err error) {
break
}
if u != nil {
// Resolver replaces 127.0.0.1 with fakeName in Next().
// Custom dialer will replace fakeName with 127.0.0.1 when dialing.
u.Addr = strings.Replace(u.Addr, "127.0.0.1", fakeName, 1)
updates = append(updates, u)
}
}
Expand Down Expand Up @@ -171,6 +180,13 @@ func (c *serverNameCheckCreds) OverrideServerName(s string) error {
return nil
}

// fakeNameDialer replaces fakeName with 127.0.0.1 when dialing.
// This will test that custom dialer is passed from Dial to grpclb.
func fakeNameDialer(addr string, timeout time.Duration) (net.Conn, error) {
addr = strings.Replace(addr, fakeName, "127.0.0.1", 1)
return net.DialTimeout("tcp", addr, timeout)
}

type remoteBalancer struct {
sls []*lbpb.ServerList
intervals []time.Duration
Expand Down Expand Up @@ -387,9 +403,9 @@ func TestGRPCLB(t *testing.T) {
}
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
cc, err := grpc.DialContext(ctx, besn, grpc.WithBalancer(grpc.NewGRPCLBBalancer(&testNameResolver{
addrs: []string{tss.lbAddr},
})), grpc.WithBlock(), grpc.WithTransportCredentials(&creds))
cc, err := grpc.DialContext(ctx, besn,
grpc.WithBalancer(grpc.NewGRPCLBBalancer(&testNameResolver{addrs: []string{tss.lbAddr}})),
grpc.WithBlock(), grpc.WithTransportCredentials(&creds), grpc.WithDialer(fakeNameDialer))
if err != nil {
t.Fatalf("Failed to dial to the backend %v", err)
}
Expand Down Expand Up @@ -425,9 +441,9 @@ func TestDropRequest(t *testing.T) {
}
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
cc, err := grpc.DialContext(ctx, besn, grpc.WithBalancer(grpc.NewGRPCLBBalancer(&testNameResolver{
addrs: []string{tss.lbAddr},
})), grpc.WithBlock(), grpc.WithTransportCredentials(&creds))
cc, err := grpc.DialContext(ctx, besn,
grpc.WithBalancer(grpc.NewGRPCLBBalancer(&testNameResolver{addrs: []string{tss.lbAddr}})),
grpc.WithBlock(), grpc.WithTransportCredentials(&creds), grpc.WithDialer(fakeNameDialer))
if err != nil {
t.Fatalf("Failed to dial to the backend %v", err)
}
Expand Down Expand Up @@ -476,9 +492,9 @@ func TestDropRequestFailedNonFailFast(t *testing.T) {
}
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
cc, err := grpc.DialContext(ctx, besn, grpc.WithBalancer(grpc.NewGRPCLBBalancer(&testNameResolver{
addrs: []string{tss.lbAddr},
})), grpc.WithBlock(), grpc.WithTransportCredentials(&creds))
cc, err := grpc.DialContext(ctx, besn,
grpc.WithBalancer(grpc.NewGRPCLBBalancer(&testNameResolver{addrs: []string{tss.lbAddr}})),
grpc.WithBlock(), grpc.WithTransportCredentials(&creds), grpc.WithDialer(fakeNameDialer))
if err != nil {
t.Fatalf("Failed to dial to the backend %v", err)
}
Expand Down Expand Up @@ -528,9 +544,9 @@ func TestServerExpiration(t *testing.T) {
}
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
cc, err := grpc.DialContext(ctx, besn, grpc.WithBalancer(grpc.NewGRPCLBBalancer(&testNameResolver{
addrs: []string{tss.lbAddr},
})), grpc.WithBlock(), grpc.WithTransportCredentials(&creds))
cc, err := grpc.DialContext(ctx, besn,
grpc.WithBalancer(grpc.NewGRPCLBBalancer(&testNameResolver{addrs: []string{tss.lbAddr}})),
grpc.WithBlock(), grpc.WithTransportCredentials(&creds), grpc.WithDialer(fakeNameDialer))
if err != nil {
t.Fatalf("Failed to dial to the backend %v", err)
}
Expand Down Expand Up @@ -589,7 +605,9 @@ func TestBalancerDisconnects(t *testing.T) {
resolver := &testNameResolver{
addrs: lbAddrs[:2],
}
cc, err := grpc.DialContext(ctx, besn, grpc.WithBalancer(grpc.NewGRPCLBBalancer(resolver)), grpc.WithBlock(), grpc.WithTransportCredentials(&creds))
cc, err := grpc.DialContext(ctx, besn,
grpc.WithBalancer(grpc.NewGRPCLBBalancer(resolver)),
grpc.WithBlock(), grpc.WithTransportCredentials(&creds), grpc.WithDialer(fakeNameDialer))
if err != nil {
t.Fatalf("Failed to dial to the backend %v", err)
}
Expand Down Expand Up @@ -681,9 +699,10 @@ func runAndGetStats(t *testing.T, dropForLoadBalancing, dropForRateLimiting bool

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
cc, err := grpc.DialContext(ctx, besn, grpc.WithBalancer(grpc.NewGRPCLBBalancer(&testNameResolver{
addrs: []string{tss.lbAddr},
})), grpc.WithBlock(), grpc.WithTransportCredentials(&creds), grpc.WithPerRPCCredentials(failPreRPCCred{}))
cc, err := grpc.DialContext(ctx, besn,
grpc.WithBalancer(grpc.NewGRPCLBBalancer(&testNameResolver{addrs: []string{tss.lbAddr}})),
grpc.WithTransportCredentials(&creds), grpc.WithPerRPCCredentials(failPreRPCCred{}),
grpc.WithBlock(), grpc.WithDialer(fakeNameDialer))
if err != nil {
t.Fatalf("Failed to dial to the backend %v", err)
}
Expand Down
4 changes: 2 additions & 2 deletions proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,15 +133,15 @@ func (p *proxyServer) stop() {
}

func TestHTTPConnect(t *testing.T) {
plis, err := net.Listen("tcp", ":0")
plis, err := net.Listen("tcp", "localhost:0")
if err != nil {
t.Fatalf("failed to listen: %v", err)
}
p := &proxyServer{t: t, lis: plis}
go p.run()
defer p.stop()

blis, err := net.Listen("tcp", ":0")
blis, err := net.Listen("tcp", "localhost:0")
if err != nil {
t.Fatalf("failed to listen: %v", err)
}
Expand Down
Loading

0 comments on commit 35d77ea

Please sign in to comment.