Skip to content

Commit

Permalink
Optimize the processing speed of output_grpc
Browse files Browse the repository at this point in the history
  • Loading branch information
vearne committed Sep 25, 2024
1 parent 5a1a0d5 commit 7089bbe
Show file tree
Hide file tree
Showing 7 changed files with 138 additions and 22 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
VERSION := v0.2.1
VERSION := v0.2.2

BIN_NAME = grpcr
CONTAINER = grpcr
Expand Down
6 changes: 5 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,10 @@ and printed in the console
```
./grpcr --input-file-directory="/tmp/mycapture" --output-stdout --output-grpc="grpc://127.0.0.1:35002"
```
Hint: You can use `input-file-replay-speed` to speed up the replay
```
--input-file-replay-speed=10
```

Capture gRPC requests on "127.0.0.1:35001",
keep only requests whose method suffix is Time, and print them in the console
Expand Down Expand Up @@ -159,7 +163,7 @@ and [buger/goreplay](https://github.com/buger/goreplay)
* [x] 11)Support for reading GRPC requests from RocketMQ
* [x] 12)Support custom filter
* [ ] 13)support TLS
* [ ] 14)Optimize the processing speed of output_grpc
* [x] 14)Optimize the processing speed of output_grpc

## donate
![donate](https://github.com/vearne/grpcreplay/raw/main/img/donate.jpg)
6 changes: 5 additions & 1 deletion README_zh.md
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,10 @@ sudo -s
```
./grpcr --input-file-directory="/tmp/mycapture" --output-stdout --output-grpc="grpc://127.0.0.1:35002"
```
提示: 你可以使用 `input-file-replay-speed` 加快重放的速度
```
--input-file-replay-speed=10
```

捕获"127.0.0.1:35001"上的gRPC请求,只保留method后缀为Time的请求,并打印在控制台中
```
Expand Down Expand Up @@ -154,7 +158,7 @@ export SIMPLE_LOG_LEVEL=debug
* [x] 11)支持从RocketMQ中读取GRPC请求
* [x] 12)支持自定义filter
* [ ] 13)支持TLS
* [ ] 14)优化output_grpc的处理速度
* [x] 14)优化output_grpc的处理速度

## 捐赠
![donate](https://github.com/vearne/grpcreplay/raw/main/img/donate.jpg)
2 changes: 1 addition & 1 deletion biz/plugins.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func NewPlugins(settings *config.AppSettings) *InOutPlugins {
if err != nil {
slog.Fatal("OutputGRPC addr error:%v", err)
}
plugins.registerPlugin(plugin.NewGRPCOutput, addr)
plugins.registerPlugin(plugin.NewGRPCOutput, addr, settings.OutputGRPCWorkerNumber)
}

for _, path := range settings.OutputFileDir {
Expand Down
2 changes: 2 additions & 0 deletions config/settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ type AppSettings struct {
// ######################## output ########################
OutputStdout bool `json:"output-stdout"`
OutputGRPC []string `json:"output-grpc"`
// multiple workers call services concurrently
OutputGRPCWorkerNumber int `json:"output-grpc-worker-number"`

// --- outputfile ---
OutputFileDir []string `json:"output-file-directory"`
Expand Down
3 changes: 3 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,9 @@ func init() {
# Redirect all incoming requests to xxx.com address
grpcr --input-raw="0.0.0.0:80" --output-grpc="grpc://xx.xx.xx.xx:35001")`)

flag.IntVar(&settings.OutputGRPCWorkerNumber, "output-grpc-worker-number", 5,
"multiple workers call services concurrently")

flag.Var(&config.MultiStringOption{Params: &settings.OutputFileDir},
"output-file-directory",
`Write incoming requests to file:
Expand Down
139 changes: 121 additions & 18 deletions plugin/output_grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ import (
"context"
"fmt"
"github.com/fullstorydev/grpcurl"
"github.com/jhump/protoreflect/desc"
"github.com/jhump/protoreflect/grpcreflect"
"github.com/patrickmn/go-cache"
"github.com/vearne/grpcreplay/protocol"
slog "github.com/vearne/simplelog"
"google.golang.org/grpc"
Expand All @@ -13,12 +15,71 @@ import (
"strings"
)

type DescSrcWrapper struct {
descSource grpcurl.DescriptorSource
innerCache *cache.Cache
}

func NewDescSrcWrapper(descSource grpcurl.DescriptorSource) *DescSrcWrapper {
var s DescSrcWrapper
s.descSource = descSource
s.innerCache = cache.New(cache.NoExpiration, cache.NoExpiration)
return &s
}

func (s *DescSrcWrapper) ListServices() ([]string, error) {
if value, exist := s.innerCache.Get("ListServices"); exist {
return (value).([]string), nil
}

itemList, err := s.descSource.ListServices()
if err != nil {
return nil, err
}

s.innerCache.Set("ListServices", itemList, cache.NoExpiration)
return itemList, nil
}

func (s *DescSrcWrapper) FindSymbol(fullyQualifiedName string) (desc.Descriptor, error) {
key := "fullyQualifiedName:" + fullyQualifiedName

if value, exist := s.innerCache.Get(key); exist {
return (value).(desc.Descriptor), nil
}

descriptor, err := s.descSource.FindSymbol(fullyQualifiedName)
if err != nil {
return nil, err
}

s.innerCache.Set(key, descriptor, cache.NoExpiration)
return descriptor, nil
}

func (s *DescSrcWrapper) AllExtensionsForType(typeName string) ([]*desc.FieldDescriptor, error) {
key := "AllExtensionsForType:" + typeName

if value, exist := s.innerCache.Get(key); exist {
return (value).([]*desc.FieldDescriptor), nil
}

descriptors, err := s.descSource.AllExtensionsForType(typeName)
if err != nil {
return nil, err
}

s.innerCache.Set(key, descriptors, cache.NoExpiration)
return descriptors, nil
}

type GRPCOutput struct {
descSource grpcurl.DescriptorSource
cc *grpc.ClientConn
msgChannel chan *protocol.Message
}

func NewGRPCOutput(addr string) *GRPCOutput {
func NewGRPCOutput(addr string, workerNum int) *GRPCOutput {
var err error
var o GRPCOutput

Expand All @@ -31,17 +92,73 @@ func NewGRPCOutput(addr string) *GRPCOutput {
// 通过反射获取接口定义
// *grpcreflect.Client
var refClient = grpcreflect.NewClientV1Alpha(ctx, reflectpb.NewServerReflectionClient(o.cc))
o.descSource = grpcurl.DescriptorSourceFromServer(ctx, refClient)
o.descSource = NewDescSrcWrapper(grpcurl.DescriptorSourceFromServer(ctx, refClient))

o.msgChannel = make(chan *protocol.Message, 100)

for i := 0; i < workerNum; i++ {
worker := NewGrpcWorker(addr, o.msgChannel, o.descSource)
go worker.execute()
}

slog.Info("create grpc output, addr:%v", addr)
return &o
}

func (o *GRPCOutput) Close() error {
close(o.msgChannel)
return o.cc.Close()
}

func (o *GRPCOutput) Write(msg *protocol.Message) (err error) {
o.msgChannel <- msg
return nil
}

func convertHeader(msg *protocol.Message) (headers []string) {
headers = make([]string, 0, len(msg.Data.Headers))
for key, value := range msg.Data.Headers {
if !IsPseudo(key) {
headers = append(headers, key+":"+value)
}
}
return headers
}

func IsPseudo(key string) bool {
return strings.HasPrefix(key, ":")
}

type GrpcWorker struct {
msgChannel chan *protocol.Message
descSource grpcurl.DescriptorSource
cc *grpc.ClientConn
}

func NewGrpcWorker(addr string, msgChannel chan *protocol.Message, descSource grpcurl.DescriptorSource) *GrpcWorker {
var err error
var w GrpcWorker
w.msgChannel = msgChannel
w.descSource = descSource

w.cc, err = grpcurl.BlockingDial(context.Background(), "tcp", addr, nil)
if err != nil {
slog.Fatal("grpcurl.BlockingDial :%v", err)
}

return &w
}

func (w *GrpcWorker) execute() {
for msg := range w.msgChannel {
err := w.Call(msg)
if err != nil {
slog.Error("Call, message:%v, error:%v", msg.Data.Method, err)
}
}
}

func (w *GrpcWorker) Call(msg *protocol.Message) (err error) {
if len(msg.Data.Method) <= 0 {
slog.Error("invalid msg:%v", msg)
return fmt.Errorf("invalid msg:%v", msg)
Expand All @@ -58,7 +175,7 @@ func (o *GRPCOutput) Write(msg *protocol.Message) (err error) {
IncludeTextSeparator: true,
AllowUnknownFields: false,
}
rf, formatter, err := grpcurl.RequestParserAndFormatter(grpcurl.FormatJSON, o.descSource, in, options)
rf, formatter, err := grpcurl.RequestParserAndFormatter(grpcurl.FormatJSON, w.descSource, in, options)
if err != nil {
slog.Fatal("grpcurl.RequestParserAndFormatter :%v", err)
}
Expand All @@ -76,20 +193,6 @@ func (o *GRPCOutput) Write(msg *protocol.Message) (err error) {
}

headers := convertHeader(msg)
err = grpcurl.InvokeRPC(context.Background(), o.descSource, o.cc, symbol, headers, h, rf.Next)
err = grpcurl.InvokeRPC(context.Background(), w.descSource, w.cc, symbol, headers, h, rf.Next)
return err
}

func convertHeader(msg *protocol.Message) (headers []string) {
headers = make([]string, 0, len(msg.Data.Headers))
for key, value := range msg.Data.Headers {
if !IsPseudo(key) {
headers = append(headers, key+":"+value)
}
}
return headers
}

func IsPseudo(key string) bool {
return strings.HasPrefix(key, ":")
}

0 comments on commit 7089bbe

Please sign in to comment.