-
Notifications
You must be signed in to change notification settings - Fork 4.1k
Make gRPC requests go through tendermint Query #8549
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
e0e2242
1f5cdd1
db1ccb9
af70bf7
66b8fb1
437ada0
7c44ee5
38775aa
e274572
40afd23
97178dc
2c5a017
2cc56a0
6f24859
31ac78a
60760d1
ba06cd4
de3c822
2405701
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -24,100 +24,109 @@ var _ gogogrpc.ClientConn = Context{} | |
| var protoCodec = encoding.GetCodec(proto.Name) | ||
|
|
||
| // Invoke implements the grpc ClientConn.Invoke method | ||
| func (ctx Context) Invoke(grpcCtx gocontext.Context, method string, args, reply interface{}, opts ...grpc.CallOption) (err error) { | ||
| func (ctx Context) Invoke(grpcCtx gocontext.Context, method string, req, reply interface{}, opts ...grpc.CallOption) (err error) { | ||
| // Two things can happen here: | ||
| // 1. either we're broadcasting a Tx, in which call we call Tendermint's broadcast endpoint directly, | ||
| // 2. or we are querying for state, in which case we call ABCI's Query. | ||
|
|
||
| // In both cases, we don't allow empty request args (it will panic unexpectedly). | ||
| if reflect.ValueOf(args).IsNil() { | ||
| // In both cases, we don't allow empty request req (it will panic unexpectedly). | ||
| if reflect.ValueOf(req).IsNil() { | ||
| return sdkerrors.Wrap(sdkerrors.ErrInvalidRequest, "request cannot be nil") | ||
| } | ||
|
|
||
| // Case 1. Broadcasting a Tx. | ||
| if isBroadcast(method) { | ||
| req, ok := args.(*tx.BroadcastTxRequest) | ||
| if reqProto, ok := req.(*tx.BroadcastTxRequest); ok { | ||
| if !ok { | ||
| return sdkerrors.Wrapf(sdkerrors.ErrInvalidRequest, "expected %T, got %T", (*tx.BroadcastTxRequest)(nil), args) | ||
| return sdkerrors.Wrapf(sdkerrors.ErrInvalidRequest, "expected %T, got %T", (*tx.BroadcastTxRequest)(nil), req) | ||
| } | ||
| res, ok := reply.(*tx.BroadcastTxResponse) | ||
| resProto, ok := reply.(*tx.BroadcastTxResponse) | ||
| if !ok { | ||
| return sdkerrors.Wrapf(sdkerrors.ErrInvalidRequest, "expected %T, got %T", (*tx.BroadcastTxResponse)(nil), args) | ||
| return sdkerrors.Wrapf(sdkerrors.ErrInvalidRequest, "expected %T, got %T", (*tx.BroadcastTxResponse)(nil), req) | ||
| } | ||
|
|
||
| broadcastRes, err := TxServiceBroadcast(grpcCtx, ctx, req) | ||
| broadcastRes, err := TxServiceBroadcast(grpcCtx, ctx, reqProto) | ||
| if err != nil { | ||
| return err | ||
| } | ||
| *res = *broadcastRes | ||
| *resProto = *broadcastRes | ||
|
|
||
| return err | ||
| } | ||
|
|
||
| // Case 2. Querying state. | ||
| reqBz, err := protoCodec.Marshal(args) | ||
| inMd, _ := metadata.FromOutgoingContext(grpcCtx) | ||
| abciRes, outMd, err := RunGRPCQuery(ctx, grpcCtx, method, req, inMd) | ||
| if err != nil { | ||
| return err | ||
| } | ||
|
|
||
| err = protoCodec.Unmarshal(abciRes.Value, reply) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do you think it would be possible to store an object in the
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. abciRes comes from tendermint, so not easily. We could create a wrapper around it, for optimization (not serializing/deserializing the same structs multiple times...) as you say. But that's a bigger refactor, and if we're talking about bigger refactors, a more sensible one would be to allow concurrent gRPC queries. |
||
| if err != nil { | ||
| return err | ||
| } | ||
|
|
||
| for _, callOpt := range opts { | ||
| header, ok := callOpt.(grpc.HeaderCallOption) | ||
| if !ok { | ||
| continue | ||
| } | ||
|
|
||
| *header.HeaderAddr = outMd | ||
| } | ||
|
|
||
| if ctx.InterfaceRegistry != nil { | ||
| return types.UnpackInterfaces(reply, ctx.InterfaceRegistry) | ||
| } | ||
|
|
||
| return nil | ||
| } | ||
|
|
||
| // NewStream implements the grpc ClientConn.NewStream method | ||
| func (Context) NewStream(gocontext.Context, *grpc.StreamDesc, string, ...grpc.CallOption) (grpc.ClientStream, error) { | ||
| return nil, fmt.Errorf("streaming rpc not supported") | ||
| } | ||
|
|
||
| // RunGRPCQuery runs a gRPC query from the clientCtx, given all necessary | ||
| // arguments for the gRPC method, and returns the ABCI response. It is used | ||
| // to factorize code between client (Invoke) and server (RegisterGRPCServer) | ||
| // gRPC handlers. | ||
| func RunGRPCQuery(ctx Context, grpcCtx gocontext.Context, method string, req interface{}, md metadata.MD) (abci.ResponseQuery, metadata.MD, error) { | ||
| reqBz, err := protoCodec.Marshal(req) | ||
| if err != nil { | ||
| return abci.ResponseQuery{}, nil, err | ||
| } | ||
|
|
||
| // parse height header | ||
| md, _ := metadata.FromOutgoingContext(grpcCtx) | ||
| if heights := md.Get(grpctypes.GRPCBlockHeightHeader); len(heights) > 0 { | ||
| height, err := strconv.ParseInt(heights[0], 10, 64) | ||
| if err != nil { | ||
| return err | ||
| return abci.ResponseQuery{}, nil, err | ||
| } | ||
| if height < 0 { | ||
| return sdkerrors.Wrapf( | ||
| return abci.ResponseQuery{}, nil, sdkerrors.Wrapf( | ||
| sdkerrors.ErrInvalidRequest, | ||
| "client.Context.Invoke: height (%d) from %q must be >= 0", height, grpctypes.GRPCBlockHeightHeader) | ||
| } | ||
|
|
||
| ctx = ctx.WithHeight(height) | ||
| } | ||
|
|
||
| req := abci.RequestQuery{ | ||
| abciReq := abci.RequestQuery{ | ||
| Path: method, | ||
| Data: reqBz, | ||
| } | ||
|
|
||
| res, err := ctx.QueryABCI(req) | ||
| abciRes, err := ctx.QueryABCI(abciReq) | ||
| if err != nil { | ||
| return err | ||
| } | ||
|
|
||
| err = protoCodec.Unmarshal(res.Value, reply) | ||
| if err != nil { | ||
| return err | ||
| return abci.ResponseQuery{}, nil, err | ||
| } | ||
|
|
||
| // Create header metadata. For now the headers contain: | ||
| // - block height | ||
| // We then parse all the call options, if the call option is a | ||
| // HeaderCallOption, then we manually set the value of that header to the | ||
| // metadata. | ||
| md = metadata.Pairs(grpctypes.GRPCBlockHeightHeader, strconv.FormatInt(res.Height, 10)) | ||
| for _, callOpt := range opts { | ||
| header, ok := callOpt.(grpc.HeaderCallOption) | ||
| if !ok { | ||
| continue | ||
| } | ||
|
|
||
| *header.HeaderAddr = md | ||
| } | ||
|
|
||
| if ctx.InterfaceRegistry != nil { | ||
| return types.UnpackInterfaces(reply, ctx.InterfaceRegistry) | ||
| } | ||
|
|
||
| return nil | ||
| } | ||
|
|
||
| // NewStream implements the grpc ClientConn.NewStream method | ||
| func (Context) NewStream(gocontext.Context, *grpc.StreamDesc, string, ...grpc.CallOption) (grpc.ClientStream, error) { | ||
| return nil, fmt.Errorf("streaming rpc not supported") | ||
| } | ||
| md = metadata.Pairs(grpctypes.GRPCBlockHeightHeader, strconv.FormatInt(abciRes.Height, 10)) | ||
|
|
||
| func isBroadcast(method string) bool { | ||
| return method == "/cosmos.tx.v1beta1.Service/BroadcastTx" | ||
| return abciRes, md, nil | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we use
SendHeaderinstead ofSetHeader?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Following the docs https://github.com/grpc/grpc-go/blob/master/Documentation/grpc-metadata.md#sending-metadata-1
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you know why before
SetHeaderwas used?