From bfa5e7e2ba29acc520c4f285ca1b481d034ddf04 Mon Sep 17 00:00:00 2001 From: liujian Date: Fri, 30 Jun 2023 18:27:18 +0800 Subject: [PATCH] Update xrpc RpcServer --- src/xrpc/README.md | 42 ++++++++++++-------- src/xrpc/rpcclient.go | 2 + src/xrpc/rpcclient_test.go | 16 ++++++-- src/xrpc/rpcserver.go | 81 +++++++++++++++++++++++++------------- src/xrpc/rpcserver_test.go | 50 +++++++++++++++++++---- 5 files changed, 138 insertions(+), 53 deletions(-) diff --git a/src/xrpc/README.md b/src/xrpc/README.md index d1071f9..164d8dc 100644 --- a/src/xrpc/README.md +++ b/src/xrpc/README.md @@ -92,15 +92,19 @@ The necessary functions are encapsulated internally for unified management ```go s := &xrpc.RpcServer{ - GrpcAddr: "0.0.0.0:50000", - GatewayAddr: "0.0.0.0:50001", - Logger: &RpcLogger{SugaredLogger: zapLogger}, - GrpcRegistrar: func(s *grpc.Server) { - pb.RegisterOrderServer(s, &service{}) + Grpc: &xrpc.Grpc{ + Addr: "0.0.0.0:50000", + Registrar: func(s *grpc.Server) { + pb.RegisterOrderServer(s, &service{}) + }, }, - GatewayRegistrar: func(mux *runtime.ServeMux, conn *grpc.ClientConn) { - pb.RegisterOrderHandler(context.Background(), mux, conn) + Gateway: &xrpc.Gateway{ // Optional + Addr: "0.0.0.0:50001", + Registrar: func(mux *runtime.ServeMux, conn *grpc.ClientConn) { + pb.RegisterOrderHandler(context.Background(), mux, conn) + }, }, + Logger: &RpcLogger{SugaredLogger: zapLogger}, } s.Serve() ``` @@ -125,17 +129,21 @@ if err != nil { log.Fatal(err) } s := &xrpc.RpcServer{ - GrpcAddr: "0.0.0.0:50000", - GatewayAddr: "0.0.0.0:50001", - Logger: &RpcLogger{SugaredLogger: zapLogger}, - GrpcRegistrar: func(s *grpc.Server) { - pb.RegisterOrderServer(s, &service{}) + Grpc: &xrpc.Grpc{ + Addr: "0.0.0.0:50000", + Registrar: func(s *grpc.Server) { + pb.RegisterOrderServer(s, &service{}) + }, }, - GatewayRegistrar: func(mux *runtime.ServeMux, conn *grpc.ClientConn) { - pb.RegisterOrderHandler(context.Background(), mux, conn) + Gateway: &xrpc.Gateway{ // Optional + Addr: "0.0.0.0:50001", + Registrar: func(mux *runtime.ServeMux, conn *grpc.ClientConn) { + pb.RegisterOrderHandler(context.Background(), mux, conn) + }, }, + Logger: &RpcLogger{SugaredLogger: zapLogger}, TLSConfig: tlsConf, - TLSClientConfig: tlsCliConf, // not empty, gateway requires + TLSClientConfig: tlsCliConf, } s.Serve() ``` @@ -157,7 +165,9 @@ Loggable Events ```go s := &xrpc.RpcServer{ - LoggableEvents: []logging.LoggableEvent{logging.StartCall, logging.FinishCall}, + Grpc: &xrpc.Grpc{ + LoggableEvents: []logging.LoggableEvent{logging.StartCall, logging.FinishCall}, + } } ``` diff --git a/src/xrpc/rpcclient.go b/src/xrpc/rpcclient.go index 55c8c3c..3346a94 100644 --- a/src/xrpc/rpcclient.go +++ b/src/xrpc/rpcclient.go @@ -3,6 +3,7 @@ package xrpc import ( "context" "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/keepalive" "time" ) @@ -18,6 +19,7 @@ func NewGrpcClient(addr string, opts ...grpc.DialOption) (*grpc.ClientConn, erro defer cancel() dialOpts := []grpc.DialOption{ grpc.WithBlock(), + grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithKeepaliveParams(keepalive.ClientParameters{ Time: 10 * time.Second, Timeout: time.Second, diff --git a/src/xrpc/rpcclient_test.go b/src/xrpc/rpcclient_test.go index 701bd6a..0ddd7d2 100644 --- a/src/xrpc/rpcclient_test.go +++ b/src/xrpc/rpcclient_test.go @@ -4,8 +4,10 @@ import ( "context" "fmt" pb "github.com/mix-go/xrpc/testdata" + "io" "log" "net/http" + "os" "strings" "testing" ) @@ -21,11 +23,18 @@ func TestNewGrpcClient(t *testing.T) { resp, err := client.RequestForRelease(ctx, &pb.ReleaseRequest{ OrderNumber: "123456789", }) - fmt.Println(resp) + fmt.Println(resp, err) } func TestNewGatewayClient(t *testing.T) { - tlsConf, err := LoadTLSClientConfig("/certificates/ca.pem", "/certificates/client.pem", "/certificates/client.key") + client := &http.Client{} + resp, err := client.Post("http://127.0.0.1:50001/v1/request_for_release", "application/json", strings.NewReader(`{"order_number":"123456789"}`)) + fmt.Println(resp.Body, err) +} + +func TestNewGatewayTLSClient(t *testing.T) { + dir, _ := os.Getwd() + tlsConf, err := LoadTLSClientConfig(dir+"/certificates/ca.pem", dir+"/certificates/client.pem", dir+"/certificates/client.key") if err != nil { log.Fatal(err) } @@ -35,5 +44,6 @@ func TestNewGatewayClient(t *testing.T) { }, } resp, err := client.Post("http://127.0.0.1:50001/v1/request_for_release", "application/json", strings.NewReader(`{"order_number":"123456789"}`)) - fmt.Println(resp.Body) + b, _ := io.ReadAll(resp.Body) + fmt.Println(string(b), err) } diff --git a/src/xrpc/rpcserver.go b/src/xrpc/rpcserver.go index 8b1d29b..4f80cd8 100644 --- a/src/xrpc/rpcserver.go +++ b/src/xrpc/rpcserver.go @@ -16,37 +16,58 @@ import ( ) type RpcServer struct { - GrpcAddr string - GatewayAddr string + // Required + *Grpc + + // Optional + *Gateway // Optional Logger logging.Logger + + // Optional, Use xrpc.NewTLSConfig or xrpc.LoadTLSConfig to create + TLSConfig *tls.Config + + // Optional, Use xrpc.NewTLSClientConfig or xrpc.LoadTLSClientConfig to create + TLSClientConfig *tls.Config +} + +type Gateway struct { + // Required + Addr string + + // Required + Registrar func(mux *runtime.ServeMux, conn *grpc.ClientConn) + + Server *http.Server +} + +type Grpc struct { + // Required + Addr string + // No content: logging.StartCall, logging.FinishCall // With content: logging.PayloadReceived, logging.PayloadSent LoggableEvents []logging.LoggableEvent - GrpcRegistrar func(server *grpc.Server) - GatewayRegistrar func(mux *runtime.ServeMux, conn *grpc.ClientConn) + // Required + Registrar func(server *grpc.Server) - GrpcServer *grpc.Server - GatewayServer *http.Server + Listener net.Listener - // Use xrpc.NewTLSConfig or xrpc.LoadTLSConfig to create - TLSConfig *tls.Config - // Use xrpc.NewTLSClientConfig or xrpc.LoadTLSClientConfig to create - // Not empty, gateway require - TLSClientConfig *tls.Config + Server *grpc.Server - // Additional server config + // Optional, Additional server config ServerOptions []grpc.ServerOption } func (t *RpcServer) Serve() error { // listen - listen, err := net.Listen("tcp", t.GrpcAddr) + listen, err := net.Listen("tcp", t.Grpc.Addr) if err != nil { return err } + t.Grpc.Listener = listen // server srvOpts := []grpc.ServerOption{ @@ -57,7 +78,7 @@ func (t *RpcServer) Serve() error { } if t.Logger != nil { logOpts := []logging.Option{ - logging.WithLogOnEvents(t.LoggableEvents...), + logging.WithLogOnEvents(t.Grpc.LoggableEvents...), } srvOpts = append(srvOpts, grpc.ChainUnaryInterceptor( @@ -70,16 +91,21 @@ func (t *RpcServer) Serve() error { if t.TLSConfig != nil { srvOpts = append(srvOpts, grpc.Creds(credentials.NewTLS(t.TLSConfig))) } - if len(t.ServerOptions) > 0 { - srvOpts = append(srvOpts, t.ServerOptions...) + if len(t.Grpc.ServerOptions) > 0 { + srvOpts = append(srvOpts, t.Grpc.ServerOptions...) } s := grpc.NewServer(srvOpts...) - t.GrpcRegistrar(s) - go func() { + t.Grpc.Registrar(s) + serve := func() { if err := s.Serve(listen); err != nil && !strings.Contains(err.Error(), "use of closed network connection") { panic(err) } - }() + } + if t.Gateway == nil { + serve() + return nil + } + go serve() // gRPC-Gateway 就是通过它来代理请求(将HTTP请求转为RPC请求) dialOpts := []grpc.DialOption{ @@ -94,25 +120,26 @@ func (t *RpcServer) Serve() error { if t.TLSClientConfig != nil { dialOpts = append(dialOpts, grpc.WithTransportCredentials(credentials.NewTLS(t.TLSClientConfig))) } - addr := strings.ReplaceAll(t.GrpcAddr, "0.0.0.0", "127.0.0.1") + addr := strings.ReplaceAll(t.Grpc.Addr, "0.0.0.0", "127.0.0.1") conn, err := grpc.Dial(addr, dialOpts...) if err != nil { return err } mux := runtime.NewServeMux() - t.GatewayRegistrar(mux, conn) - gwServer := &http.Server{ - Addr: t.GatewayAddr, + t.Gateway.Registrar(mux, conn) + gateway := &http.Server{ + Addr: t.Gateway.Addr, Handler: mux, } if t.TLSConfig != nil { - gwServer.TLSConfig = t.TLSConfig + gateway.TLSConfig = t.TLSConfig } - return gwServer.ListenAndServe() + return gateway.ListenAndServe() } func (t *RpcServer) Shutdown() error { - t.GrpcServer.Stop() - return t.GatewayServer.Shutdown(context.Background()) + t.Grpc.Server.Stop() + _ = t.Grpc.Listener.Close() + return t.Gateway.Server.Shutdown(context.Background()) } diff --git a/src/xrpc/rpcserver_test.go b/src/xrpc/rpcserver_test.go index e85c3fa..8316764 100644 --- a/src/xrpc/rpcserver_test.go +++ b/src/xrpc/rpcserver_test.go @@ -5,6 +5,8 @@ import ( "github.com/grpc-ecosystem/grpc-gateway/v2/runtime" pb "github.com/mix-go/xrpc/testdata" "google.golang.org/grpc" + "log" + "os" "testing" ) @@ -14,16 +16,50 @@ type service struct { func TestRPCServer_Serve(t *testing.T) { s := &RpcServer{ - GrpcAddr: "0.0.0.0:50000", - GatewayAddr: "0.0.0.0:50001", - Logger: nil, - GrpcRegistrar: func(s *grpc.Server) { - pb.RegisterOrderServer(s, &service{}) + Grpc: &Grpc{ + Addr: "0.0.0.0:50000", + Registrar: func(s *grpc.Server) { + pb.RegisterOrderServer(s, &service{}) + }, }, - GatewayRegistrar: func(mux *runtime.ServeMux, conn *grpc.ClientConn) { - pb.RegisterOrderHandler(context.Background(), mux, conn) + Gateway: &Gateway{ // Optional + Addr: "0.0.0.0:50001", + Registrar: func(mux *runtime.ServeMux, conn *grpc.ClientConn) { + pb.RegisterOrderHandler(context.Background(), mux, conn) + }, }, + Logger: nil, } s.Serve() // s.Shutdown() } + +func TestRpcServerTLS_Serve(t *testing.T) { + dir, _ := os.Getwd() + tlsConf, err := LoadTLSConfig(dir+"/certificates/ca.pem", dir+"/certificates/server.pem", dir+"/certificates/server.key") + if err != nil { + log.Fatal(err) + } + tlsCliConf, err := LoadTLSClientConfig(dir+"/certificates/ca.pem", dir+"/certificates/client.pem", dir+"/certificates/client.key") + if err != nil { + log.Fatal(err) + } + s := &RpcServer{ + Grpc: &Grpc{ + Addr: "0.0.0.0:50000", + Registrar: func(s *grpc.Server) { + pb.RegisterOrderServer(s, &service{}) + }, + }, + Gateway: &Gateway{ // Optional + Addr: "0.0.0.0:50001", + Registrar: func(mux *runtime.ServeMux, conn *grpc.ClientConn) { + pb.RegisterOrderHandler(context.Background(), mux, conn) + }, + }, + Logger: nil, + TLSConfig: tlsConf, + TLSClientConfig: tlsCliConf, + } + s.Serve() +}