Skip to content

Commit

Permalink
RAFT: client rpc reuse the same conn
Browse files Browse the repository at this point in the history
  • Loading branch information
moogacs committed Apr 17, 2024
1 parent 7d1189f commit c97c6e0
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 34 deletions.
1 change: 1 addition & 0 deletions cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ func (c *Service) Open(ctx context.Context, db store.Indexer) error {
func (c *Service) Close(ctx context.Context) (err error) {
err = c.Service.Close(ctx)
c.rpcService.Close()
c.client.Close()
return
}

Expand Down
75 changes: 41 additions & 34 deletions cluster/transport/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"fmt"
"net"
"strconv"
"sync"

cmd "github.com/weaviate/weaviate/cluster/proto/api"
"google.golang.org/grpc"
Expand Down Expand Up @@ -59,6 +60,10 @@ type rpcAddressResolver interface {
// Client is used for communication with remote nodes in a RAFT cluster.
type Client struct {
rpc rpcAddressResolver

connLock sync.Mutex
leaderAddr string
leaderConn *grpc.ClientConn
}

func NewClient(r rpcAddressResolver) *Client {
Expand All @@ -67,17 +72,11 @@ func NewClient(r rpcAddressResolver) *Client {

// Join joins this node to an existing cluster identified by its leader's address.
// If a new leader has been elected, the request is redirected to the new leader.
func (cl *Client) Join(ctx context.Context, leaderAddr string, req *cmd.JoinPeerRequest) (*cmd.JoinPeerResponse, error) {
addr, err := cl.rpc.Address(leaderAddr)
func (cl *Client) Join(ctx context.Context, leaderAddress string, req *cmd.JoinPeerRequest) (*cmd.JoinPeerResponse, error) {
conn, err := cl.getConn(leaderAddress)
if err != nil {
return nil, fmt.Errorf("resolve address: %w", err)
return nil, err
}

conn, err := grpc.DialContext(ctx, addr, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
return nil, fmt.Errorf("dial: %w", err)
}
defer conn.Close()
c := cmd.NewClusterServiceClient(conn)
return c.JoinPeer(ctx, req)
}
Expand All @@ -101,59 +100,67 @@ func (cl *Client) Notify(ctx context.Context, rAddr string, req *cmd.NotifyPeerR
// Remove removes this node from an existing cluster identified by its leader's address.
// If a new leader has been elected, the request is redirected to the new leader.
func (cl *Client) Remove(ctx context.Context, leaderAddress string, req *cmd.RemovePeerRequest) (*cmd.RemovePeerResponse, error) {
addr, err := cl.rpc.Address(leaderAddress)
if err != nil {
return nil, fmt.Errorf("resolve address: %w", err)
}

conn, err := grpc.DialContext(ctx, addr, grpc.WithTransportCredentials(insecure.NewCredentials()))
conn, err := cl.getConn(leaderAddress)
if err != nil {
return nil, fmt.Errorf("dial: %w", err)
return nil, err
}
defer conn.Close()
c := cmd.NewClusterServiceClient(conn)
return c.RemovePeer(ctx, req)
}

func (cl *Client) Apply(leaderAddr string, req *cmd.ApplyRequest) (*cmd.ApplyResponse, error) {
ctx := context.Background()
addr, err := cl.rpc.Address(leaderAddr)
conn, err := cl.getConn(leaderAddr)
if err != nil {
return nil, fmt.Errorf("resolve address: %w", err)
return nil, err
}

conn, err := grpc.DialContext(
ctx,
addr,
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithDefaultServiceConfig(serviceConfig),
)
if err != nil {
return nil, fmt.Errorf("dial: %w", err)
}
defer conn.Close()
c := cmd.NewClusterServiceClient(conn)
return c.Apply(ctx, req)
}

func (cl *Client) Query(ctx context.Context, leaderAddress string, req *cmd.QueryRequest) (*cmd.QueryResponse, error) {
conn, err := cl.getConn(leaderAddress)
if err != nil {
return nil, err
}

c := cmd.NewClusterServiceClient(conn)
return c.Query(ctx, req)
}

func (cl *Client) Close() {
cl.leaderConn.Close()
}

func (cl *Client) getConn(leaderAddress string) (*grpc.ClientConn, error) {
cl.connLock.Lock()
defer cl.connLock.Unlock()

if cl.leaderConn != nil && leaderAddress == cl.leaderAddr {
return cl.leaderConn, nil
}

if cl.leaderConn != nil {
// close open conn if leader addr changed
cl.leaderConn.Close()
}

addr, err := cl.rpc.Address(leaderAddress)
if err != nil {
return nil, fmt.Errorf("resolve address: %w", err)
}

conn, err := grpc.DialContext(
ctx,
conn, err := grpc.Dial(
addr,
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithDefaultServiceConfig(serviceConfig),
)
if err != nil {
return nil, fmt.Errorf("dial: %w", err)
}
defer conn.Close()
c := cmd.NewClusterServiceClient(conn)
return c.Query(ctx, req)

return conn, nil
}

func NewRPCResolver(isLocalHost bool, rpcPort int) rpcAddressResolver {
Expand Down

0 comments on commit c97c6e0

Please sign in to comment.