Skip to content

Commit

Permalink
Update xrpc RpcServer
Browse files Browse the repository at this point in the history
  • Loading branch information
onanying committed Jun 30, 2023
1 parent 1fd038c commit bfa5e7e
Show file tree
Hide file tree
Showing 5 changed files with 138 additions and 53 deletions.
42 changes: 26 additions & 16 deletions src/xrpc/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -92,15 +92,19 @@ The necessary functions are encapsulated internally for unified management

```go
s := &xrpc.RpcServer{
GrpcAddr: "0.0.0.0:50000",
GatewayAddr: "0.0.0.0:50001",
Logger: &RpcLogger{SugaredLogger: zapLogger},
GrpcRegistrar: func(s *grpc.Server) {
pb.RegisterOrderServer(s, &service{})
Grpc: &xrpc.Grpc{
Addr: "0.0.0.0:50000",
Registrar: func(s *grpc.Server) {
pb.RegisterOrderServer(s, &service{})
},
},
GatewayRegistrar: func(mux *runtime.ServeMux, conn *grpc.ClientConn) {
pb.RegisterOrderHandler(context.Background(), mux, conn)
Gateway: &xrpc.Gateway{ // Optional
Addr: "0.0.0.0:50001",
Registrar: func(mux *runtime.ServeMux, conn *grpc.ClientConn) {
pb.RegisterOrderHandler(context.Background(), mux, conn)
},
},
Logger: &RpcLogger{SugaredLogger: zapLogger},
}
s.Serve()
```
Expand All @@ -125,17 +129,21 @@ if err != nil {
log.Fatal(err)
}
s := &xrpc.RpcServer{
GrpcAddr: "0.0.0.0:50000",
GatewayAddr: "0.0.0.0:50001",
Logger: &RpcLogger{SugaredLogger: zapLogger},
GrpcRegistrar: func(s *grpc.Server) {
pb.RegisterOrderServer(s, &service{})
Grpc: &xrpc.Grpc{
Addr: "0.0.0.0:50000",
Registrar: func(s *grpc.Server) {
pb.RegisterOrderServer(s, &service{})
},
},
GatewayRegistrar: func(mux *runtime.ServeMux, conn *grpc.ClientConn) {
pb.RegisterOrderHandler(context.Background(), mux, conn)
Gateway: &xrpc.Gateway{ // Optional
Addr: "0.0.0.0:50001",
Registrar: func(mux *runtime.ServeMux, conn *grpc.ClientConn) {
pb.RegisterOrderHandler(context.Background(), mux, conn)
},
},
Logger: &RpcLogger{SugaredLogger: zapLogger},
TLSConfig: tlsConf,
TLSClientConfig: tlsCliConf, // not empty, gateway requires
TLSClientConfig: tlsCliConf,
}
s.Serve()
```
Expand All @@ -157,7 +165,9 @@ Loggable Events

```go
s := &xrpc.RpcServer{
LoggableEvents: []logging.LoggableEvent{logging.StartCall, logging.FinishCall},
Grpc: &xrpc.Grpc{
LoggableEvents: []logging.LoggableEvent{logging.StartCall, logging.FinishCall},
}
}
```

Expand Down
2 changes: 2 additions & 0 deletions src/xrpc/rpcclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package xrpc
import (
"context"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/keepalive"
"time"
)
Expand All @@ -18,6 +19,7 @@ func NewGrpcClient(addr string, opts ...grpc.DialOption) (*grpc.ClientConn, erro
defer cancel()
dialOpts := []grpc.DialOption{
grpc.WithBlock(),
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithKeepaliveParams(keepalive.ClientParameters{
Time: 10 * time.Second,
Timeout: time.Second,
Expand Down
16 changes: 13 additions & 3 deletions src/xrpc/rpcclient_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@ import (
"context"
"fmt"
pb "github.com/mix-go/xrpc/testdata"
"io"
"log"
"net/http"
"os"
"strings"
"testing"
)
Expand All @@ -21,11 +23,18 @@ func TestNewGrpcClient(t *testing.T) {
resp, err := client.RequestForRelease(ctx, &pb.ReleaseRequest{
OrderNumber: "123456789",
})
fmt.Println(resp)
fmt.Println(resp, err)
}

func TestNewGatewayClient(t *testing.T) {
tlsConf, err := LoadTLSClientConfig("/certificates/ca.pem", "/certificates/client.pem", "/certificates/client.key")
client := &http.Client{}
resp, err := client.Post("http://127.0.0.1:50001/v1/request_for_release", "application/json", strings.NewReader(`{"order_number":"123456789"}`))
fmt.Println(resp.Body, err)
}

func TestNewGatewayTLSClient(t *testing.T) {
dir, _ := os.Getwd()
tlsConf, err := LoadTLSClientConfig(dir+"/certificates/ca.pem", dir+"/certificates/client.pem", dir+"/certificates/client.key")
if err != nil {
log.Fatal(err)
}
Expand All @@ -35,5 +44,6 @@ func TestNewGatewayClient(t *testing.T) {
},
}
resp, err := client.Post("http://127.0.0.1:50001/v1/request_for_release", "application/json", strings.NewReader(`{"order_number":"123456789"}`))
fmt.Println(resp.Body)
b, _ := io.ReadAll(resp.Body)
fmt.Println(string(b), err)
}
81 changes: 54 additions & 27 deletions src/xrpc/rpcserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,37 +16,58 @@ import (
)

type RpcServer struct {
GrpcAddr string
GatewayAddr string
// Required
*Grpc

// Optional
*Gateway

// Optional
Logger logging.Logger

// Optional, Use xrpc.NewTLSConfig or xrpc.LoadTLSConfig to create
TLSConfig *tls.Config

// Optional, Use xrpc.NewTLSClientConfig or xrpc.LoadTLSClientConfig to create
TLSClientConfig *tls.Config
}

type Gateway struct {
// Required
Addr string

// Required
Registrar func(mux *runtime.ServeMux, conn *grpc.ClientConn)

Server *http.Server
}

type Grpc struct {
// Required
Addr string

// No content: logging.StartCall, logging.FinishCall
// With content: logging.PayloadReceived, logging.PayloadSent
LoggableEvents []logging.LoggableEvent

GrpcRegistrar func(server *grpc.Server)
GatewayRegistrar func(mux *runtime.ServeMux, conn *grpc.ClientConn)
// Required
Registrar func(server *grpc.Server)

GrpcServer *grpc.Server
GatewayServer *http.Server
Listener net.Listener

// Use xrpc.NewTLSConfig or xrpc.LoadTLSConfig to create
TLSConfig *tls.Config
// Use xrpc.NewTLSClientConfig or xrpc.LoadTLSClientConfig to create
// Not empty, gateway require
TLSClientConfig *tls.Config
Server *grpc.Server

// Additional server config
// Optional, Additional server config
ServerOptions []grpc.ServerOption
}

func (t *RpcServer) Serve() error {
// listen
listen, err := net.Listen("tcp", t.GrpcAddr)
listen, err := net.Listen("tcp", t.Grpc.Addr)
if err != nil {
return err
}
t.Grpc.Listener = listen

// server
srvOpts := []grpc.ServerOption{
Expand All @@ -57,7 +78,7 @@ func (t *RpcServer) Serve() error {
}
if t.Logger != nil {
logOpts := []logging.Option{
logging.WithLogOnEvents(t.LoggableEvents...),
logging.WithLogOnEvents(t.Grpc.LoggableEvents...),
}
srvOpts = append(srvOpts,
grpc.ChainUnaryInterceptor(
Expand All @@ -70,16 +91,21 @@ func (t *RpcServer) Serve() error {
if t.TLSConfig != nil {
srvOpts = append(srvOpts, grpc.Creds(credentials.NewTLS(t.TLSConfig)))
}
if len(t.ServerOptions) > 0 {
srvOpts = append(srvOpts, t.ServerOptions...)
if len(t.Grpc.ServerOptions) > 0 {
srvOpts = append(srvOpts, t.Grpc.ServerOptions...)
}
s := grpc.NewServer(srvOpts...)
t.GrpcRegistrar(s)
go func() {
t.Grpc.Registrar(s)
serve := func() {
if err := s.Serve(listen); err != nil && !strings.Contains(err.Error(), "use of closed network connection") {
panic(err)
}
}()
}
if t.Gateway == nil {
serve()
return nil
}
go serve()

// gRPC-Gateway 就是通过它来代理请求(将HTTP请求转为RPC请求)
dialOpts := []grpc.DialOption{
Expand All @@ -94,25 +120,26 @@ func (t *RpcServer) Serve() error {
if t.TLSClientConfig != nil {
dialOpts = append(dialOpts, grpc.WithTransportCredentials(credentials.NewTLS(t.TLSClientConfig)))
}
addr := strings.ReplaceAll(t.GrpcAddr, "0.0.0.0", "127.0.0.1")
addr := strings.ReplaceAll(t.Grpc.Addr, "0.0.0.0", "127.0.0.1")
conn, err := grpc.Dial(addr, dialOpts...)
if err != nil {
return err
}

mux := runtime.NewServeMux()
t.GatewayRegistrar(mux, conn)
gwServer := &http.Server{
Addr: t.GatewayAddr,
t.Gateway.Registrar(mux, conn)
gateway := &http.Server{
Addr: t.Gateway.Addr,
Handler: mux,
}
if t.TLSConfig != nil {
gwServer.TLSConfig = t.TLSConfig
gateway.TLSConfig = t.TLSConfig
}
return gwServer.ListenAndServe()
return gateway.ListenAndServe()
}

func (t *RpcServer) Shutdown() error {
t.GrpcServer.Stop()
return t.GatewayServer.Shutdown(context.Background())
t.Grpc.Server.Stop()
_ = t.Grpc.Listener.Close()
return t.Gateway.Server.Shutdown(context.Background())
}
50 changes: 43 additions & 7 deletions src/xrpc/rpcserver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"github.com/grpc-ecosystem/grpc-gateway/v2/runtime"
pb "github.com/mix-go/xrpc/testdata"
"google.golang.org/grpc"
"log"
"os"
"testing"
)

Expand All @@ -14,16 +16,50 @@ type service struct {

func TestRPCServer_Serve(t *testing.T) {
s := &RpcServer{
GrpcAddr: "0.0.0.0:50000",
GatewayAddr: "0.0.0.0:50001",
Logger: nil,
GrpcRegistrar: func(s *grpc.Server) {
pb.RegisterOrderServer(s, &service{})
Grpc: &Grpc{
Addr: "0.0.0.0:50000",
Registrar: func(s *grpc.Server) {
pb.RegisterOrderServer(s, &service{})
},
},
GatewayRegistrar: func(mux *runtime.ServeMux, conn *grpc.ClientConn) {
pb.RegisterOrderHandler(context.Background(), mux, conn)
Gateway: &Gateway{ // Optional
Addr: "0.0.0.0:50001",
Registrar: func(mux *runtime.ServeMux, conn *grpc.ClientConn) {
pb.RegisterOrderHandler(context.Background(), mux, conn)
},
},
Logger: nil,
}
s.Serve()
// s.Shutdown()
}

func TestRpcServerTLS_Serve(t *testing.T) {
dir, _ := os.Getwd()
tlsConf, err := LoadTLSConfig(dir+"/certificates/ca.pem", dir+"/certificates/server.pem", dir+"/certificates/server.key")
if err != nil {
log.Fatal(err)
}
tlsCliConf, err := LoadTLSClientConfig(dir+"/certificates/ca.pem", dir+"/certificates/client.pem", dir+"/certificates/client.key")
if err != nil {
log.Fatal(err)
}
s := &RpcServer{
Grpc: &Grpc{
Addr: "0.0.0.0:50000",
Registrar: func(s *grpc.Server) {
pb.RegisterOrderServer(s, &service{})
},
},
Gateway: &Gateway{ // Optional
Addr: "0.0.0.0:50001",
Registrar: func(mux *runtime.ServeMux, conn *grpc.ClientConn) {
pb.RegisterOrderHandler(context.Background(), mux, conn)
},
},
Logger: nil,
TLSConfig: tlsConf,
TLSClientConfig: tlsCliConf,
}
s.Serve()
}

0 comments on commit bfa5e7e

Please sign in to comment.