Skip to content

Commit 5b2856f

Browse files
Make gRPC requests go through tendermint Query (#8549) (#8585)
* Make gRPC requests go through tendermint Query * Remove commented code * Dry run in InitChain? * Save type of first run * Add metadata in repsonse * Factorize some code * Fix lint * Update comments * Fix md test * Fix test expected * Don't put RunGRPCQuery as clientCtx method * Update baseapp/grpcserver.go Co-authored-by: Robert Zaremba <robert@zaremba.ch> * Address review comments Co-authored-by: Robert Zaremba <robert@zaremba.ch> Co-authored-by: Robert Zaremba <robert@zaremba.ch>
1 parent 35a243a commit 5b2856f

File tree

8 files changed

+142
-93
lines changed

8 files changed

+142
-93
lines changed

baseapp/grpcrouter.go

Lines changed: 32 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package baseapp
22

33
import (
44
"fmt"
5+
"reflect"
56

67
gogogrpc "github.com/gogo/protobuf/grpc"
78
abci "github.com/tendermint/tendermint/abci/types"
@@ -12,13 +13,19 @@ import (
1213
"github.com/cosmos/cosmos-sdk/client/grpc/reflection"
1314
codectypes "github.com/cosmos/cosmos-sdk/codec/types"
1415
sdk "github.com/cosmos/cosmos-sdk/types"
16+
sdkerrors "github.com/cosmos/cosmos-sdk/types/errors"
1517
)
1618

1719
var protoCodec = encoding.GetCodec(proto.Name)
1820

1921
// GRPCQueryRouter routes ABCI Query requests to GRPC handlers
2022
type GRPCQueryRouter struct {
21-
routes map[string]GRPCQueryHandler
23+
routes map[string]GRPCQueryHandler
24+
// returnTypes is a map of FQ method name => its return type. It is used
25+
// for cache purposes: the first time a method handler is run, we save its
26+
// return type in this map. Then, on subsequent method handler calls, we
27+
// decode the ABCI response bytes using the cached return type.
28+
returnTypes map[string]reflect.Type
2229
interfaceRegistry codectypes.InterfaceRegistry
2330
serviceData []serviceData
2431
}
@@ -34,7 +41,8 @@ var _ gogogrpc.Server = &GRPCQueryRouter{}
3441
// NewGRPCQueryRouter creates a new GRPCQueryRouter
3542
func NewGRPCQueryRouter() *GRPCQueryRouter {
3643
return &GRPCQueryRouter{
37-
routes: map[string]GRPCQueryHandler{},
44+
returnTypes: map[string]reflect.Type{},
45+
routes: map[string]GRPCQueryHandler{},
3846
}
3947
}
4048

@@ -89,8 +97,17 @@ func (qrt *GRPCQueryRouter) RegisterService(sd *grpc.ServiceDesc, handler interf
8997
if qrt.interfaceRegistry != nil {
9098
return codectypes.UnpackInterfaces(i, qrt.interfaceRegistry)
9199
}
100+
92101
return nil
93102
}, nil)
103+
104+
// If it's the first time we call this handler, then we save
105+
// the return type of the handler in the `returnTypes` map.
106+
// The return type will be used for decoding subsequent requests.
107+
if _, found := qrt.returnTypes[fqName]; !found {
108+
qrt.returnTypes[fqName] = reflect.TypeOf(res)
109+
}
110+
94111
if err != nil {
95112
return abci.ResponseQuery{}, err
96113
}
@@ -127,3 +144,16 @@ func (qrt *GRPCQueryRouter) SetInterfaceRegistry(interfaceRegistry codectypes.In
127144
reflection.NewReflectionServiceServer(interfaceRegistry),
128145
)
129146
}
147+
148+
// returnTypeOf returns the return type of a gRPC method handler. With the way the
149+
// `returnTypes` cache map is set up, the return type of a method handler is
150+
// guaranteed to be found if it's retrieved **after** the method handler ran at
151+
// least once. If not, then a logic error is return.
152+
func (qrt *GRPCQueryRouter) returnTypeOf(method string) (reflect.Type, error) {
153+
returnType, found := qrt.returnTypes[method]
154+
if !found {
155+
return nil, sdkerrors.Wrapf(sdkerrors.ErrLogic, "cannot find %s return type", method)
156+
}
157+
158+
return returnType, nil
159+
}

baseapp/grpcserver.go

Lines changed: 47 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -2,67 +2,78 @@ package baseapp
22

33
import (
44
"context"
5-
"strconv"
5+
"reflect"
66

77
gogogrpc "github.com/gogo/protobuf/grpc"
88
grpcmiddleware "github.com/grpc-ecosystem/go-grpc-middleware"
99
grpcrecovery "github.com/grpc-ecosystem/go-grpc-middleware/recovery"
1010
"google.golang.org/grpc"
11-
"google.golang.org/grpc/codes"
1211
"google.golang.org/grpc/metadata"
13-
"google.golang.org/grpc/status"
1412

15-
sdk "github.com/cosmos/cosmos-sdk/types"
13+
"github.com/cosmos/cosmos-sdk/client"
1614
sdkerrors "github.com/cosmos/cosmos-sdk/types/errors"
17-
grpctypes "github.com/cosmos/cosmos-sdk/types/grpc"
15+
"github.com/cosmos/cosmos-sdk/types/tx"
1816
)
1917

2018
// GRPCQueryRouter returns the GRPCQueryRouter of a BaseApp.
2119
func (app *BaseApp) GRPCQueryRouter() *GRPCQueryRouter { return app.grpcQueryRouter }
2220

2321
// RegisterGRPCServer registers gRPC services directly with the gRPC server.
24-
func (app *BaseApp) RegisterGRPCServer(server gogogrpc.Server) {
25-
// Define an interceptor for all gRPC queries: this interceptor will create
26-
// a new sdk.Context, and pass it into the query handler.
27-
interceptor := func(grpcCtx context.Context, req interface{}, _ *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
28-
// If there's some metadata in the context, retrieve it.
29-
md, ok := metadata.FromIncomingContext(grpcCtx)
30-
if !ok {
31-
return nil, status.Error(codes.Internal, "unable to retrieve metadata")
32-
}
22+
func (app *BaseApp) RegisterGRPCServer(clientCtx client.Context, server gogogrpc.Server) {
23+
// Define an interceptor for all gRPC queries: this interceptor will route
24+
// the query through the `clientCtx`, which itself queries Tendermint.
25+
interceptor := func(grpcCtx context.Context, req interface{}, info *grpc.UnaryServerInfo, _ grpc.UnaryHandler) (interface{}, error) {
26+
// Two things can happen here:
27+
// 1. either we're broadcasting a Tx, in which case we call Tendermint's broadcast endpoint directly,
28+
// 2. or we are querying for state, in which case we call ABCI's Query.
3329

34-
// Get height header from the request context, if present.
35-
var height int64
36-
if heightHeaders := md.Get(grpctypes.GRPCBlockHeightHeader); len(heightHeaders) > 0 {
37-
height, err = strconv.ParseInt(heightHeaders[0], 10, 64)
38-
if err != nil {
39-
return nil, sdkerrors.Wrapf(
40-
sdkerrors.ErrInvalidRequest,
41-
"Baseapp.RegisterGRPCServer: invalid height header %q: %v", grpctypes.GRPCBlockHeightHeader, err)
42-
}
43-
if err := checkNegativeHeight(height); err != nil {
44-
return nil, err
30+
// Case 1. Broadcasting a Tx.
31+
if reqProto, ok := req.(*tx.BroadcastTxRequest); ok {
32+
if !ok {
33+
return nil, sdkerrors.Wrapf(sdkerrors.ErrInvalidRequest, "expected %T, got %T", (*tx.BroadcastTxRequest)(nil), req)
4534
}
35+
36+
return client.TxServiceBroadcast(grpcCtx, clientCtx, reqProto)
37+
}
38+
39+
// Case 2. Querying state.
40+
inMd, _ := metadata.FromIncomingContext(grpcCtx)
41+
abciRes, outMd, err := client.RunGRPCQuery(clientCtx, grpcCtx, info.FullMethod, req, inMd)
42+
if err != nil {
43+
return nil, err
4644
}
4745

48-
// Create the sdk.Context. Passing false as 2nd arg, as we can't
49-
// actually support proofs with gRPC right now.
50-
sdkCtx, err := app.createQueryContext(height, false)
46+
// We need to know the return type of the grpc method for
47+
// unmarshalling abciRes.Value.
48+
//
49+
// When we call each method handler for the first time, we save its
50+
// return type in the `returnTypes` map (see the method handler in
51+
// `grpcrouter.go`). By this time, the method handler has already run
52+
// at least once (in the RunGRPCQuery call), so we're sure the
53+
// returnType maps is populated for this method. We're retrieving it
54+
// for decoding.
55+
returnType, err := app.GRPCQueryRouter().returnTypeOf(info.FullMethod)
5156
if err != nil {
5257
return nil, err
5358
}
5459

55-
// Attach the sdk.Context into the gRPC's context.Context.
56-
grpcCtx = context.WithValue(grpcCtx, sdk.SdkContextKey, sdkCtx)
60+
// returnType is a pointer to a struct. Here, we're creating res which
61+
// is a new pointer to the underlying struct.
62+
res := reflect.New(returnType.Elem()).Interface()
5763

58-
// Add relevant gRPC headers
59-
if height == 0 {
60-
height = sdkCtx.BlockHeight() // If height was not set in the request, set it to the latest
64+
err = protoCodec.Unmarshal(abciRes.Value, res)
65+
if err != nil {
66+
return nil, err
67+
}
68+
69+
// Send the metadata header back. The metadata currently includes:
70+
// - block height.
71+
err = grpc.SendHeader(grpcCtx, outMd)
72+
if err != nil {
73+
return nil, err
6174
}
62-
md = metadata.Pairs(grpctypes.GRPCBlockHeightHeader, strconv.FormatInt(height, 10))
63-
grpc.SetHeader(grpcCtx, md)
6475

65-
return handler(grpcCtx, req)
76+
return res, nil
6677
}
6778

6879
// Loop through all services and methods, add the interceptor, and register

client/grpc_query.go

Lines changed: 54 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -24,100 +24,109 @@ var _ gogogrpc.ClientConn = Context{}
2424
var protoCodec = encoding.GetCodec(proto.Name)
2525

2626
// Invoke implements the grpc ClientConn.Invoke method
27-
func (ctx Context) Invoke(grpcCtx gocontext.Context, method string, args, reply interface{}, opts ...grpc.CallOption) (err error) {
27+
func (ctx Context) Invoke(grpcCtx gocontext.Context, method string, req, reply interface{}, opts ...grpc.CallOption) (err error) {
2828
// Two things can happen here:
2929
// 1. either we're broadcasting a Tx, in which call we call Tendermint's broadcast endpoint directly,
3030
// 2. or we are querying for state, in which case we call ABCI's Query.
3131

32-
// In both cases, we don't allow empty request args (it will panic unexpectedly).
33-
if reflect.ValueOf(args).IsNil() {
32+
// In both cases, we don't allow empty request req (it will panic unexpectedly).
33+
if reflect.ValueOf(req).IsNil() {
3434
return sdkerrors.Wrap(sdkerrors.ErrInvalidRequest, "request cannot be nil")
3535
}
3636

3737
// Case 1. Broadcasting a Tx.
38-
if isBroadcast(method) {
39-
req, ok := args.(*tx.BroadcastTxRequest)
38+
if reqProto, ok := req.(*tx.BroadcastTxRequest); ok {
4039
if !ok {
41-
return sdkerrors.Wrapf(sdkerrors.ErrInvalidRequest, "expected %T, got %T", (*tx.BroadcastTxRequest)(nil), args)
40+
return sdkerrors.Wrapf(sdkerrors.ErrInvalidRequest, "expected %T, got %T", (*tx.BroadcastTxRequest)(nil), req)
4241
}
43-
res, ok := reply.(*tx.BroadcastTxResponse)
42+
resProto, ok := reply.(*tx.BroadcastTxResponse)
4443
if !ok {
45-
return sdkerrors.Wrapf(sdkerrors.ErrInvalidRequest, "expected %T, got %T", (*tx.BroadcastTxResponse)(nil), args)
44+
return sdkerrors.Wrapf(sdkerrors.ErrInvalidRequest, "expected %T, got %T", (*tx.BroadcastTxResponse)(nil), req)
4645
}
4746

48-
broadcastRes, err := TxServiceBroadcast(grpcCtx, ctx, req)
47+
broadcastRes, err := TxServiceBroadcast(grpcCtx, ctx, reqProto)
4948
if err != nil {
5049
return err
5150
}
52-
*res = *broadcastRes
51+
*resProto = *broadcastRes
5352

5453
return err
5554
}
5655

5756
// Case 2. Querying state.
58-
reqBz, err := protoCodec.Marshal(args)
57+
inMd, _ := metadata.FromOutgoingContext(grpcCtx)
58+
abciRes, outMd, err := RunGRPCQuery(ctx, grpcCtx, method, req, inMd)
5959
if err != nil {
6060
return err
6161
}
6262

63+
err = protoCodec.Unmarshal(abciRes.Value, reply)
64+
if err != nil {
65+
return err
66+
}
67+
68+
for _, callOpt := range opts {
69+
header, ok := callOpt.(grpc.HeaderCallOption)
70+
if !ok {
71+
continue
72+
}
73+
74+
*header.HeaderAddr = outMd
75+
}
76+
77+
if ctx.InterfaceRegistry != nil {
78+
return types.UnpackInterfaces(reply, ctx.InterfaceRegistry)
79+
}
80+
81+
return nil
82+
}
83+
84+
// NewStream implements the grpc ClientConn.NewStream method
85+
func (Context) NewStream(gocontext.Context, *grpc.StreamDesc, string, ...grpc.CallOption) (grpc.ClientStream, error) {
86+
return nil, fmt.Errorf("streaming rpc not supported")
87+
}
88+
89+
// RunGRPCQuery runs a gRPC query from the clientCtx, given all necessary
90+
// arguments for the gRPC method, and returns the ABCI response. It is used
91+
// to factorize code between client (Invoke) and server (RegisterGRPCServer)
92+
// gRPC handlers.
93+
func RunGRPCQuery(ctx Context, grpcCtx gocontext.Context, method string, req interface{}, md metadata.MD) (abci.ResponseQuery, metadata.MD, error) {
94+
reqBz, err := protoCodec.Marshal(req)
95+
if err != nil {
96+
return abci.ResponseQuery{}, nil, err
97+
}
98+
6399
// parse height header
64-
md, _ := metadata.FromOutgoingContext(grpcCtx)
65100
if heights := md.Get(grpctypes.GRPCBlockHeightHeader); len(heights) > 0 {
66101
height, err := strconv.ParseInt(heights[0], 10, 64)
67102
if err != nil {
68-
return err
103+
return abci.ResponseQuery{}, nil, err
69104
}
70105
if height < 0 {
71-
return sdkerrors.Wrapf(
106+
return abci.ResponseQuery{}, nil, sdkerrors.Wrapf(
72107
sdkerrors.ErrInvalidRequest,
73108
"client.Context.Invoke: height (%d) from %q must be >= 0", height, grpctypes.GRPCBlockHeightHeader)
74109
}
75110

76111
ctx = ctx.WithHeight(height)
77112
}
78113

79-
req := abci.RequestQuery{
114+
abciReq := abci.RequestQuery{
80115
Path: method,
81116
Data: reqBz,
82117
}
83118

84-
res, err := ctx.QueryABCI(req)
119+
abciRes, err := ctx.QueryABCI(abciReq)
85120
if err != nil {
86-
return err
87-
}
88-
89-
err = protoCodec.Unmarshal(res.Value, reply)
90-
if err != nil {
91-
return err
121+
return abci.ResponseQuery{}, nil, err
92122
}
93123

94124
// Create header metadata. For now the headers contain:
95125
// - block height
96126
// We then parse all the call options, if the call option is a
97127
// HeaderCallOption, then we manually set the value of that header to the
98128
// metadata.
99-
md = metadata.Pairs(grpctypes.GRPCBlockHeightHeader, strconv.FormatInt(res.Height, 10))
100-
for _, callOpt := range opts {
101-
header, ok := callOpt.(grpc.HeaderCallOption)
102-
if !ok {
103-
continue
104-
}
105-
106-
*header.HeaderAddr = md
107-
}
108-
109-
if ctx.InterfaceRegistry != nil {
110-
return types.UnpackInterfaces(reply, ctx.InterfaceRegistry)
111-
}
112-
113-
return nil
114-
}
115-
116-
// NewStream implements the grpc ClientConn.NewStream method
117-
func (Context) NewStream(gocontext.Context, *grpc.StreamDesc, string, ...grpc.CallOption) (grpc.ClientStream, error) {
118-
return nil, fmt.Errorf("streaming rpc not supported")
119-
}
129+
md = metadata.Pairs(grpctypes.GRPCBlockHeightHeader, strconv.FormatInt(abciRes.Height, 10))
120130

121-
func isBroadcast(method string) bool {
122-
return method == "/cosmos.tx.v1beta1.Service/BroadcastTx"
131+
return abciRes, md, nil
123132
}

server/grpc/server.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,13 +8,14 @@ import (
88
"google.golang.org/grpc"
99
"google.golang.org/grpc/reflection"
1010

11+
"github.com/cosmos/cosmos-sdk/client"
1112
"github.com/cosmos/cosmos-sdk/server/types"
1213
)
1314

1415
// StartGRPCServer starts a gRPC server on the given address.
15-
func StartGRPCServer(app types.Application, address string) (*grpc.Server, error) {
16+
func StartGRPCServer(clientCtx client.Context, app types.Application, address string) (*grpc.Server, error) {
1617
grpcSrv := grpc.NewServer()
17-
app.RegisterGRPCServer(grpcSrv)
18+
app.RegisterGRPCServer(clientCtx, grpcSrv)
1819

1920
// Reflection allows external clients to see what services and methods
2021
// the gRPC server exposes.

server/grpc/server_test.go

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ func (s *IntegrationTestSuite) TestGRPCServer_BankBalance() {
9292
grpc.Header(&header),
9393
)
9494
blockHeight = header.Get(grpctypes.GRPCBlockHeightHeader)
95-
s.Require().Equal([]string{"1"}, blockHeight)
95+
s.Require().NotEmpty(blockHeight[0]) // blockHeight is []string, first element is block height.
9696
}
9797

9898
func (s *IntegrationTestSuite) TestGRPCServer_Reflection() {
@@ -124,8 +124,6 @@ func (s *IntegrationTestSuite) TestGRPCServer_GetTxsEvent() {
124124
Events: []string{"message.action=send"},
125125
},
126126
)
127-
// TODO Once https://github.com/cosmos/cosmos-sdk/pull/8029 is merged, this
128-
// should not error anymore.
129127
s.Require().NoError(err)
130128
}
131129

@@ -185,9 +183,9 @@ func (s *IntegrationTestSuite) TestGRPCServerInvalidHeaderHeights() {
185183
value string
186184
wantErr string
187185
}{
188-
{"-1", "height < 0"},
186+
{"-1", "\"x-cosmos-block-height\" must be >= 0"},
189187
{"9223372036854775808", "value out of range"}, // > max(int64) by 1
190-
{"-10", "height < 0"},
188+
{"-10", "\"x-cosmos-block-height\" must be >= 0"},
191189
{"18446744073709551615", "value out of range"}, // max uint64, which is > max(int64)
192190
{"-9223372036854775809", "value out of range"}, // Out of the range of for negative int64
193191
}

server/start.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -304,7 +304,7 @@ func startInProcess(ctx *Context, clientCtx client.Context, appCreator types.App
304304

305305
var grpcSrv *grpc.Server
306306
if config.GRPC.Enable {
307-
grpcSrv, err = servergrpc.StartGRPCServer(app, config.GRPC.Address)
307+
grpcSrv, err = servergrpc.StartGRPCServer(clientCtx, app, config.GRPC.Address)
308308
if err != nil {
309309
return err
310310
}

0 commit comments

Comments
 (0)