From 7089bbe9e8d4b5777b088e98abf107db4e7f92f6 Mon Sep 17 00:00:00 2001 From: vearne Date: Wed, 25 Sep 2024 15:00:38 +0800 Subject: [PATCH] Optimize the processing speed of output_grpc --- Makefile | 2 +- README.md | 6 +- README_zh.md | 6 +- biz/plugins.go | 2 +- config/settings.go | 2 + main.go | 3 + plugin/output_grpc.go | 139 ++++++++++++++++++++++++++++++++++++------ 7 files changed, 138 insertions(+), 22 deletions(-) diff --git a/Makefile b/Makefile index 5788b71..df5759f 100644 --- a/Makefile +++ b/Makefile @@ -1,4 +1,4 @@ -VERSION := v0.2.1 +VERSION := v0.2.2 BIN_NAME = grpcr CONTAINER = grpcr diff --git a/README.md b/README.md index 605787e..371d440 100644 --- a/README.md +++ b/README.md @@ -92,6 +92,10 @@ and printed in the console ``` ./grpcr --input-file-directory="/tmp/mycapture" --output-stdout --output-grpc="grpc://127.0.0.1:35002" ``` +Hint: You can use `input-file-replay-speed` to speed up the replay +``` +--input-file-replay-speed=10 +``` Capture gRPC requests on "127.0.0.1:35001", keep only requests whose method suffix is Time, and print them in the console @@ -159,7 +163,7 @@ and [buger/goreplay](https://github.com/buger/goreplay) * [x] 11)Support for reading GRPC requests from RocketMQ * [x] 12)Support custom filter * [ ] 13)support TLS -* [ ] 14)Optimize the processing speed of output_grpc +* [x] 14)Optimize the processing speed of output_grpc ## donate ![donate](https://github.com/vearne/grpcreplay/raw/main/img/donate.jpg) \ No newline at end of file diff --git a/README_zh.md b/README_zh.md index 1d8f059..e991bdf 100644 --- a/README_zh.md +++ b/README_zh.md @@ -89,6 +89,10 @@ sudo -s ``` ./grpcr --input-file-directory="/tmp/mycapture" --output-stdout --output-grpc="grpc://127.0.0.1:35002" ``` +提示: 你可以使用 `input-file-replay-speed` 加快重放的速度 +``` +--input-file-replay-speed=10 +``` 捕获"127.0.0.1:35001"上的gRPC请求,只保留method后缀为Time的请求,并打印在控制台中 ``` @@ -154,7 +158,7 @@ export SIMPLE_LOG_LEVEL=debug * [x] 11)支持从RocketMQ中读取GRPC请求 * [x] 12)支持自定义filter * [ ] 13)支持TLS -* [ ] 14)优化output_grpc的处理速度 +* [x] 14)优化output_grpc的处理速度 ## 捐赠 ![donate](https://github.com/vearne/grpcreplay/raw/main/img/donate.jpg) diff --git a/biz/plugins.go b/biz/plugins.go index 2cca630..c22ec90 100644 --- a/biz/plugins.go +++ b/biz/plugins.go @@ -57,7 +57,7 @@ func NewPlugins(settings *config.AppSettings) *InOutPlugins { if err != nil { slog.Fatal("OutputGRPC addr error:%v", err) } - plugins.registerPlugin(plugin.NewGRPCOutput, addr) + plugins.registerPlugin(plugin.NewGRPCOutput, addr, settings.OutputGRPCWorkerNumber) } for _, path := range settings.OutputFileDir { diff --git a/config/settings.go b/config/settings.go index eb335e7..1551384 100644 --- a/config/settings.go +++ b/config/settings.go @@ -73,6 +73,8 @@ type AppSettings struct { // ######################## output ######################## OutputStdout bool `json:"output-stdout"` OutputGRPC []string `json:"output-grpc"` + // multiple workers call services concurrently + OutputGRPCWorkerNumber int `json:"output-grpc-worker-number"` // --- outputfile --- OutputFileDir []string `json:"output-file-directory"` diff --git a/main.go b/main.go index cf154c6..39f68fa 100644 --- a/main.go +++ b/main.go @@ -75,6 +75,9 @@ func init() { # Redirect all incoming requests to xxx.com address grpcr --input-raw="0.0.0.0:80" --output-grpc="grpc://xx.xx.xx.xx:35001")`) + flag.IntVar(&settings.OutputGRPCWorkerNumber, "output-grpc-worker-number", 5, + "multiple workers call services concurrently") + flag.Var(&config.MultiStringOption{Params: &settings.OutputFileDir}, "output-file-directory", `Write incoming requests to file: diff --git a/plugin/output_grpc.go b/plugin/output_grpc.go index b84360c..e424829 100644 --- a/plugin/output_grpc.go +++ b/plugin/output_grpc.go @@ -4,7 +4,9 @@ import ( "context" "fmt" "github.com/fullstorydev/grpcurl" + "github.com/jhump/protoreflect/desc" "github.com/jhump/protoreflect/grpcreflect" + "github.com/patrickmn/go-cache" "github.com/vearne/grpcreplay/protocol" slog "github.com/vearne/simplelog" "google.golang.org/grpc" @@ -13,12 +15,71 @@ import ( "strings" ) +type DescSrcWrapper struct { + descSource grpcurl.DescriptorSource + innerCache *cache.Cache +} + +func NewDescSrcWrapper(descSource grpcurl.DescriptorSource) *DescSrcWrapper { + var s DescSrcWrapper + s.descSource = descSource + s.innerCache = cache.New(cache.NoExpiration, cache.NoExpiration) + return &s +} + +func (s *DescSrcWrapper) ListServices() ([]string, error) { + if value, exist := s.innerCache.Get("ListServices"); exist { + return (value).([]string), nil + } + + itemList, err := s.descSource.ListServices() + if err != nil { + return nil, err + } + + s.innerCache.Set("ListServices", itemList, cache.NoExpiration) + return itemList, nil +} + +func (s *DescSrcWrapper) FindSymbol(fullyQualifiedName string) (desc.Descriptor, error) { + key := "fullyQualifiedName:" + fullyQualifiedName + + if value, exist := s.innerCache.Get(key); exist { + return (value).(desc.Descriptor), nil + } + + descriptor, err := s.descSource.FindSymbol(fullyQualifiedName) + if err != nil { + return nil, err + } + + s.innerCache.Set(key, descriptor, cache.NoExpiration) + return descriptor, nil +} + +func (s *DescSrcWrapper) AllExtensionsForType(typeName string) ([]*desc.FieldDescriptor, error) { + key := "AllExtensionsForType:" + typeName + + if value, exist := s.innerCache.Get(key); exist { + return (value).([]*desc.FieldDescriptor), nil + } + + descriptors, err := s.descSource.AllExtensionsForType(typeName) + if err != nil { + return nil, err + } + + s.innerCache.Set(key, descriptors, cache.NoExpiration) + return descriptors, nil +} + type GRPCOutput struct { descSource grpcurl.DescriptorSource cc *grpc.ClientConn + msgChannel chan *protocol.Message } -func NewGRPCOutput(addr string) *GRPCOutput { +func NewGRPCOutput(addr string, workerNum int) *GRPCOutput { var err error var o GRPCOutput @@ -31,17 +92,73 @@ func NewGRPCOutput(addr string) *GRPCOutput { // 通过反射获取接口定义 // *grpcreflect.Client var refClient = grpcreflect.NewClientV1Alpha(ctx, reflectpb.NewServerReflectionClient(o.cc)) - o.descSource = grpcurl.DescriptorSourceFromServer(ctx, refClient) + o.descSource = NewDescSrcWrapper(grpcurl.DescriptorSourceFromServer(ctx, refClient)) + + o.msgChannel = make(chan *protocol.Message, 100) + + for i := 0; i < workerNum; i++ { + worker := NewGrpcWorker(addr, o.msgChannel, o.descSource) + go worker.execute() + } slog.Info("create grpc output, addr:%v", addr) return &o } func (o *GRPCOutput) Close() error { + close(o.msgChannel) return o.cc.Close() } func (o *GRPCOutput) Write(msg *protocol.Message) (err error) { + o.msgChannel <- msg + return nil +} + +func convertHeader(msg *protocol.Message) (headers []string) { + headers = make([]string, 0, len(msg.Data.Headers)) + for key, value := range msg.Data.Headers { + if !IsPseudo(key) { + headers = append(headers, key+":"+value) + } + } + return headers +} + +func IsPseudo(key string) bool { + return strings.HasPrefix(key, ":") +} + +type GrpcWorker struct { + msgChannel chan *protocol.Message + descSource grpcurl.DescriptorSource + cc *grpc.ClientConn +} + +func NewGrpcWorker(addr string, msgChannel chan *protocol.Message, descSource grpcurl.DescriptorSource) *GrpcWorker { + var err error + var w GrpcWorker + w.msgChannel = msgChannel + w.descSource = descSource + + w.cc, err = grpcurl.BlockingDial(context.Background(), "tcp", addr, nil) + if err != nil { + slog.Fatal("grpcurl.BlockingDial :%v", err) + } + + return &w +} + +func (w *GrpcWorker) execute() { + for msg := range w.msgChannel { + err := w.Call(msg) + if err != nil { + slog.Error("Call, message:%v, error:%v", msg.Data.Method, err) + } + } +} + +func (w *GrpcWorker) Call(msg *protocol.Message) (err error) { if len(msg.Data.Method) <= 0 { slog.Error("invalid msg:%v", msg) return fmt.Errorf("invalid msg:%v", msg) @@ -58,7 +175,7 @@ func (o *GRPCOutput) Write(msg *protocol.Message) (err error) { IncludeTextSeparator: true, AllowUnknownFields: false, } - rf, formatter, err := grpcurl.RequestParserAndFormatter(grpcurl.FormatJSON, o.descSource, in, options) + rf, formatter, err := grpcurl.RequestParserAndFormatter(grpcurl.FormatJSON, w.descSource, in, options) if err != nil { slog.Fatal("grpcurl.RequestParserAndFormatter :%v", err) } @@ -76,20 +193,6 @@ func (o *GRPCOutput) Write(msg *protocol.Message) (err error) { } headers := convertHeader(msg) - err = grpcurl.InvokeRPC(context.Background(), o.descSource, o.cc, symbol, headers, h, rf.Next) + err = grpcurl.InvokeRPC(context.Background(), w.descSource, w.cc, symbol, headers, h, rf.Next) return err } - -func convertHeader(msg *protocol.Message) (headers []string) { - headers = make([]string, 0, len(msg.Data.Headers)) - for key, value := range msg.Data.Headers { - if !IsPseudo(key) { - headers = append(headers, key+":"+value) - } - } - return headers -} - -func IsPseudo(key string) bool { - return strings.HasPrefix(key, ":") -}