Skip to content

Commit

Permalink
support direct scheme on rpc resolver
Browse files Browse the repository at this point in the history
  • Loading branch information
kevwan committed Aug 18, 2020
1 parent 0214161 commit f03cfb0
Show file tree
Hide file tree
Showing 11 changed files with 170 additions and 134 deletions.
32 changes: 26 additions & 6 deletions example/rpc/client/lb/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,41 @@ package main

import (
"context"
"flag"
"fmt"
"log"
"time"

"github.com/tal-tech/go-zero/core/discov"
"github.com/tal-tech/go-zero/example/rpc/remote/unary"
"github.com/tal-tech/go-zero/rpcx"
)

var lb = flag.String("t", "direct", "the load balancer type")

func main() {
cli := rpcx.MustNewClient(rpcx.RpcClientConf{
Etcd: discov.EtcdConf{
Hosts: []string{"localhost:2379"},
Key: "rpcx",
},
})
flag.Parse()

var cli rpcx.Client
switch *lb {
case "direct":
cli = rpcx.MustNewClient(rpcx.RpcClientConf{
Endpoints: []string{
"localhost:3456",
"localhost:3457",
},
})
case "discov":
cli = rpcx.MustNewClient(rpcx.RpcClientConf{
Etcd: discov.EtcdConf{
Hosts: []string{"localhost:2379"},
Key: "rpcx",
},
})
default:
log.Fatal("bad load balancing type")
}

greet := unary.NewGreeterClient(cli.Conn())
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
Expand Down
12 changes: 8 additions & 4 deletions rpcx/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,10 @@ func NewClient(c RpcClientConf, options ...internal.ClientOption) (Client, error

var client Client
var err error
if len(c.Server) > 0 {
client, err = internal.NewDirectClient(c.Server, opts...)
if len(c.Endpoints) > 0 {
client, err = internal.NewClient(internal.BuildDirectTarget(c.Endpoints), opts...)
} else if err = c.Etcd.Validate(); err == nil {
client, err = internal.NewDiscovClient(c.Etcd.Hosts, c.Etcd.Key, opts...)
client, err = internal.NewClient(internal.BuildDiscovTarget(c.Etcd.Hosts, c.Etcd.Key), opts...)
}
if err != nil {
return nil, err
Expand All @@ -64,7 +64,7 @@ func NewClient(c RpcClientConf, options ...internal.ClientOption) (Client, error
}

func NewClientNoAuth(c discov.EtcdConf) (Client, error) {
client, err := internal.NewDiscovClient(c.Hosts, c.Key)
client, err := internal.NewClient(internal.BuildDiscovTarget(c.Hosts, c.Key))
if err != nil {
return nil, err
}
Expand All @@ -74,6 +74,10 @@ func NewClientNoAuth(c discov.EtcdConf) (Client, error) {
}, nil
}

func NewClientWithTarget(target string, opts ...internal.ClientOption) (Client, error) {
return internal.NewClient(target, opts...)
}

func (rc *RpcClient) Conn() *grpc.ClientConn {
return rc.client.Conn()
}
18 changes: 9 additions & 9 deletions rpcx/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,19 @@ type (
}

RpcClientConf struct {
Etcd discov.EtcdConf `json:",optional"`
Server string `json:",optional=!Etcd"`
App string `json:",optional"`
Token string `json:",optional"`
Timeout int64 `json:",optional"`
Etcd discov.EtcdConf `json:",optional"`
Endpoints []string `json:",optional=!Etcd"`
App string `json:",optional"`
Token string `json:",optional"`
Timeout int64 `json:",optional"`
}
)

func NewDirectClientConf(server, app, token string) RpcClientConf {
func NewDirectClientConf(endpoints []string, app, token string) RpcClientConf {
return RpcClientConf{
Server: server,
App: app,
Token: token,
Endpoints: endpoints,
App: app,
Token: token,
}
}

Expand Down
24 changes: 24 additions & 0 deletions rpcx/internal/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,45 @@ import (
"fmt"
"time"

"github.com/tal-tech/go-zero/rpcx/internal/balancer/p2c"
"github.com/tal-tech/go-zero/rpcx/internal/clientinterceptors"
"github.com/tal-tech/go-zero/rpcx/internal/resolver"
"google.golang.org/grpc"
)

const dialTimeout = time.Second * 3

func init() {
resolver.RegisterResolver()
}

type (
ClientOptions struct {
Timeout time.Duration
DialOptions []grpc.DialOption
}

ClientOption func(options *ClientOptions)

client struct {
conn *grpc.ClientConn
}
)

func NewClient(target string, opts ...ClientOption) (*client, error) {
opts = append(opts, WithDialOption(grpc.WithBalancerName(p2c.Name)))
conn, err := dial(target, opts...)
if err != nil {
return nil, err
}

return &client{conn: conn}, nil
}

func (c *client) Conn() *grpc.ClientConn {
return c.conn
}

func WithDialOption(opt grpc.DialOption) ClientOption {
return func(options *ClientOptions) {
options.DialOptions = append(options.DialOptions, opt)
Expand Down
26 changes: 0 additions & 26 deletions rpcx/internal/directclient.go

This file was deleted.

34 changes: 0 additions & 34 deletions rpcx/internal/discovclient.go

This file was deleted.

30 changes: 30 additions & 0 deletions rpcx/internal/resolver/directbuilder.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package resolver

import (
"strings"

"google.golang.org/grpc/resolver"
)

type directBuilder struct{}

func (d *directBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (
resolver.Resolver, error) {
var addrs []resolver.Address
endpoints := strings.Split(target.Endpoint, EndpointSep)

for _, val := range subset(endpoints, subsetSize) {
addrs = append(addrs, resolver.Address{
Addr: val,
})
}
cc.UpdateState(resolver.State{
Addresses: addrs,
})

return &nopResolver{cc: cc}, nil
}

func (d *directBuilder) Scheme() string {
return DirectScheme
}
39 changes: 39 additions & 0 deletions rpcx/internal/resolver/discovbuilder.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package resolver

import (
"strings"

"github.com/tal-tech/go-zero/core/discov"
"google.golang.org/grpc/resolver"
)

type discovBuilder struct{}

func (d *discovBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (
resolver.Resolver, error) {
hosts := strings.Split(target.Authority, EndpointSep)
sub, err := discov.NewSubscriber(hosts, target.Endpoint)
if err != nil {
return nil, err
}

update := func() {
var addrs []resolver.Address
for _, val := range subset(sub.Values(), subsetSize) {
addrs = append(addrs, resolver.Address{
Addr: val,
})
}
cc.UpdateState(resolver.State{
Addresses: addrs,
})
}
sub.AddListener(update)
update()

return &nopResolver{cc: cc}, nil
}

func (d *discovBuilder) Scheme() string {
return DiscovScheme
}
62 changes: 12 additions & 50 deletions rpcx/internal/resolver/resolver.go
Original file line number Diff line number Diff line change
@@ -1,68 +1,30 @@
package resolver

import (
"fmt"
"strings"

"github.com/tal-tech/go-zero/core/discov"
"google.golang.org/grpc/resolver"
)
import "google.golang.org/grpc/resolver"

const (
DirectScheme = "direct"
DiscovScheme = "discov"
EndpointSep = ","
subsetSize = 32
)

var builder discovBuilder

type discovBuilder struct{}

func (b *discovBuilder) Scheme() string {
return DiscovScheme
}

func (b *discovBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (
resolver.Resolver, error) {
if target.Scheme != DiscovScheme {
return nil, fmt.Errorf("bad scheme: %s", target.Scheme)
}

hosts := strings.Split(target.Authority, EndpointSep)
sub, err := discov.NewSubscriber(hosts, target.Endpoint)
if err != nil {
return nil, err
}

update := func() {
var addrs []resolver.Address
for _, val := range subset(sub.Values(), subsetSize) {
addrs = append(addrs, resolver.Address{
Addr: val,
})
}
cc.UpdateState(resolver.State{
Addresses: addrs,
})
}
sub.AddListener(update)
update()
var (
dirBuilder directBuilder
disBuilder discovBuilder
)

return &discovResolver{
cc: cc,
}, nil
func RegisterResolver() {
resolver.Register(&dirBuilder)
resolver.Register(&disBuilder)
}

type discovResolver struct {
type nopResolver struct {
cc resolver.ClientConn
}

func (r *discovResolver) Close() {
func (r *nopResolver) Close() {
}

func (r *discovResolver) ResolveNow(options resolver.ResolveNowOptions) {
}

func RegisterResolver() {
resolver.Register(&builder)
func (r *nopResolver) ResolveNow(options resolver.ResolveNowOptions) {
}
17 changes: 17 additions & 0 deletions rpcx/internal/target.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package internal

import (
"fmt"
"strings"

"github.com/tal-tech/go-zero/rpcx/internal/resolver"
)

func BuildDirectTarget(endpoints []string) string {
return fmt.Sprintf("%s:///%s", resolver.DirectScheme, strings.Join(endpoints, resolver.EndpointSep))
}

func BuildDiscovTarget(endpoints []string, key string) string {
return fmt.Sprintf("%s://%s/%s", resolver.DiscovScheme,
strings.Join(endpoints, resolver.EndpointSep), key)
}
Loading

0 comments on commit f03cfb0

Please sign in to comment.