Skip to content

Commit 04f10e4

Browse files
julienrbrtmergify[bot]
authored andcommitted
refactor(server/v2/cometbft): add codec.Codec and clean-up APIs (#22566)
(cherry picked from commit efc05e8)
1 parent 8f7fbf6 commit 04f10e4

File tree

11 files changed

+174
-147
lines changed

11 files changed

+174
-147
lines changed

server/v2/cometbft/abci.go

Lines changed: 24 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -6,14 +6,10 @@ import (
66
"errors"
77
"fmt"
88
"strings"
9-
"sync"
109
"sync/atomic"
1110

1211
abci "github.com/cometbft/cometbft/abci/types"
1312
abciproto "github.com/cometbft/cometbft/api/cometbft/abci/v1"
14-
codectypes "github.com/cosmos/cosmos-sdk/codec/types"
15-
sdk "github.com/cosmos/cosmos-sdk/types"
16-
txtypes "github.com/cosmos/cosmos-sdk/types/tx"
1713
gogoproto "github.com/cosmos/gogoproto/proto"
1814
protoreflect "google.golang.org/protobuf/reflect/protoreflect"
1915
"google.golang.org/protobuf/reflect/protoregistry"
@@ -37,6 +33,11 @@ import (
3733
"cosmossdk.io/server/v2/streaming"
3834
"cosmossdk.io/store/v2/snapshots"
3935
consensustypes "cosmossdk.io/x/consensus/types"
36+
37+
"github.com/cosmos/cosmos-sdk/codec"
38+
codectypes "github.com/cosmos/cosmos-sdk/codec/types"
39+
sdk "github.com/cosmos/cosmos-sdk/types"
40+
txtypes "github.com/cosmos/cosmos-sdk/types/tx"
4041
)
4142

4243
const (
@@ -45,22 +46,24 @@ const (
4546
QueryPathStore = "store"
4647
)
4748

48-
var _ abci.Application = (*Consensus[transaction.Tx])(nil)
49+
var _ abci.Application = (*consensus[transaction.Tx])(nil)
4950

50-
type Consensus[T transaction.Tx] struct {
51+
// consensus contains the implementation of the ABCI interface for CometBFT.
52+
type consensus[T transaction.Tx] struct {
5153
logger log.Logger
5254
appName, version string
5355
app appmanager.AppManager[T]
56+
appCodec codec.Codec
5457
txCodec transaction.Codec[T]
5558
store types.Store
56-
streaming streaming.Manager
5759
listener *appdata.Listener
5860
snapshotManager *snapshots.Manager
61+
streamingManager streaming.Manager
5962
mempool mempool.Mempool[T]
6063

6164
cfg Config
62-
indexedEvents map[string]struct{}
6365
chainID string
66+
indexedEvents map[string]struct{}
6467

6568
initialHeight uint64
6669
// this is only available after this node has committed a block (in FinalizeBlock),
@@ -81,60 +84,9 @@ type Consensus[T transaction.Tx] struct {
8184
getProtoRegistry func() (*protoregistry.Files, error)
8285
}
8386

84-
func NewConsensus[T transaction.Tx](
85-
logger log.Logger,
86-
appName string,
87-
app appmanager.AppManager[T],
88-
mp mempool.Mempool[T],
89-
indexedEvents map[string]struct{},
90-
queryHandlersMap map[string]appmodulev2.Handler,
91-
store types.Store,
92-
cfg Config,
93-
txCodec transaction.Codec[T],
94-
chainId string,
95-
) *Consensus[T] {
96-
return &Consensus[T]{
97-
appName: appName,
98-
version: getCometBFTServerVersion(),
99-
app: app,
100-
cfg: cfg,
101-
store: store,
102-
logger: logger,
103-
txCodec: txCodec,
104-
streaming: streaming.Manager{},
105-
snapshotManager: nil,
106-
mempool: mp,
107-
lastCommittedHeight: atomic.Int64{},
108-
prepareProposalHandler: nil,
109-
processProposalHandler: nil,
110-
verifyVoteExt: nil,
111-
extendVote: nil,
112-
chainID: chainId,
113-
indexedEvents: indexedEvents,
114-
initialHeight: 0,
115-
queryHandlersMap: queryHandlersMap,
116-
getProtoRegistry: sync.OnceValues(gogoproto.MergedRegistry),
117-
}
118-
}
119-
120-
// SetStreamingManager sets the streaming manager for the consensus module.
121-
func (c *Consensus[T]) SetStreamingManager(sm streaming.Manager) {
122-
c.streaming = sm
123-
}
124-
125-
// RegisterSnapshotExtensions registers the given extensions with the consensus module's snapshot manager.
126-
// It allows additional snapshotter implementations to be used for creating and restoring snapshots.
127-
func (c *Consensus[T]) RegisterSnapshotExtensions(extensions ...snapshots.ExtensionSnapshotter) error {
128-
if err := c.snapshotManager.RegisterExtensions(extensions...); err != nil {
129-
return fmt.Errorf("failed to register snapshot extensions: %w", err)
130-
}
131-
132-
return nil
133-
}
134-
13587
// CheckTx implements types.Application.
13688
// It is called by cometbft to verify transaction validity
137-
func (c *Consensus[T]) CheckTx(ctx context.Context, req *abciproto.CheckTxRequest) (*abciproto.CheckTxResponse, error) {
89+
func (c *consensus[T]) CheckTx(ctx context.Context, req *abciproto.CheckTxRequest) (*abciproto.CheckTxResponse, error) {
13890
decodedTx, err := c.txCodec.Decode(req.Tx)
13991
if err != nil {
14092
return nil, err
@@ -172,7 +124,7 @@ func (c *Consensus[T]) CheckTx(ctx context.Context, req *abciproto.CheckTxReques
172124
}
173125

174126
// Info implements types.Application.
175-
func (c *Consensus[T]) Info(ctx context.Context, _ *abciproto.InfoRequest) (*abciproto.InfoResponse, error) {
127+
func (c *consensus[T]) Info(ctx context.Context, _ *abciproto.InfoRequest) (*abciproto.InfoResponse, error) {
176128
version, _, err := c.store.StateLatest()
177129
if err != nil {
178130
return nil, err
@@ -212,7 +164,7 @@ func (c *Consensus[T]) Info(ctx context.Context, _ *abciproto.InfoRequest) (*abc
212164

213165
// Query implements types.Application.
214166
// It is called by cometbft to query application state.
215-
func (c *Consensus[T]) Query(ctx context.Context, req *abciproto.QueryRequest) (resp *abciproto.QueryResponse, err error) {
167+
func (c *consensus[T]) Query(ctx context.Context, req *abciproto.QueryRequest) (resp *abciproto.QueryResponse, err error) {
216168
resp, isGRPC, err := c.maybeRunGRPCQuery(ctx, req)
217169
if isGRPC {
218170
return resp, err
@@ -227,7 +179,7 @@ func (c *Consensus[T]) Query(ctx context.Context, req *abciproto.QueryRequest) (
227179

228180
switch path[0] {
229181
case QueryPathApp:
230-
resp, err = c.handlerQueryApp(ctx, path, req)
182+
resp, err = c.handleQueryApp(ctx, path, req)
231183

232184
case QueryPathStore:
233185
resp, err = c.handleQueryStore(path, req)
@@ -246,7 +198,7 @@ func (c *Consensus[T]) Query(ctx context.Context, req *abciproto.QueryRequest) (
246198
return resp, nil
247199
}
248200

249-
func (c *Consensus[T]) maybeRunGRPCQuery(ctx context.Context, req *abci.QueryRequest) (resp *abciproto.QueryResponse, isGRPC bool, err error) {
201+
func (c *consensus[T]) maybeRunGRPCQuery(ctx context.Context, req *abci.QueryRequest) (resp *abciproto.QueryResponse, isGRPC bool, err error) {
250202
// if this fails then we cannot serve queries anymore
251203
registry, err := c.getProtoRegistry()
252204
if err != nil {
@@ -288,7 +240,7 @@ func (c *Consensus[T]) maybeRunGRPCQuery(ctx context.Context, req *abci.QueryReq
288240

289241
txResult, _, err := c.app.Simulate(ctx, tx)
290242
if err != nil {
291-
return nil, true, fmt.Errorf("%v with gas used: '%d'", err, txResult.GasUsed)
243+
return nil, true, fmt.Errorf("failed with gas used: '%d': %w", txResult.GasUsed, err)
292244
}
293245

294246
msgResponses := make([]*codectypes.Any, 0, len(txResult.Resp))
@@ -337,7 +289,7 @@ func (c *Consensus[T]) maybeRunGRPCQuery(ctx context.Context, req *abci.QueryReq
337289
}
338290

339291
// InitChain implements types.Application.
340-
func (c *Consensus[T]) InitChain(ctx context.Context, req *abciproto.InitChainRequest) (*abciproto.InitChainResponse, error) {
292+
func (c *consensus[T]) InitChain(ctx context.Context, req *abciproto.InitChainRequest) (*abciproto.InitChainResponse, error) {
341293
c.logger.Info("InitChain", "initialHeight", req.InitialHeight, "chainID", req.ChainId)
342294

343295
// store chainID to be used later on in execution
@@ -421,7 +373,7 @@ func (c *Consensus[T]) InitChain(ctx context.Context, req *abciproto.InitChainRe
421373

422374
// PrepareProposal implements types.Application.
423375
// It is called by cometbft to prepare a proposal block.
424-
func (c *Consensus[T]) PrepareProposal(
376+
func (c *consensus[T]) PrepareProposal(
425377
ctx context.Context,
426378
req *abciproto.PrepareProposalRequest,
427379
) (resp *abciproto.PrepareProposalResponse, err error) {
@@ -457,7 +409,7 @@ func (c *Consensus[T]) PrepareProposal(
457409

458410
// ProcessProposal implements types.Application.
459411
// It is called by cometbft to process/verify a proposal block.
460-
func (c *Consensus[T]) ProcessProposal(
412+
func (c *consensus[T]) ProcessProposal(
461413
ctx context.Context,
462414
req *abciproto.ProcessProposalRequest,
463415
) (*abciproto.ProcessProposalResponse, error) {
@@ -491,7 +443,7 @@ func (c *Consensus[T]) ProcessProposal(
491443

492444
// FinalizeBlock implements types.Application.
493445
// It is called by cometbft to finalize a block.
494-
func (c *Consensus[T]) FinalizeBlock(
446+
func (c *consensus[T]) FinalizeBlock(
495447
ctx context.Context,
496448
req *abciproto.FinalizeBlockRequest,
497449
) (*abciproto.FinalizeBlockResponse, error) {
@@ -581,7 +533,7 @@ func (c *Consensus[T]) FinalizeBlock(
581533

582534
// Commit implements types.Application.
583535
// It is called by cometbft to notify the application that a block was committed.
584-
func (c *Consensus[T]) Commit(ctx context.Context, _ *abciproto.CommitRequest) (*abciproto.CommitResponse, error) {
536+
func (c *consensus[T]) Commit(ctx context.Context, _ *abciproto.CommitRequest) (*abciproto.CommitResponse, error) {
585537
lastCommittedHeight := c.lastCommittedHeight.Load()
586538

587539
c.snapshotManager.SnapshotIfApplicable(lastCommittedHeight)
@@ -599,7 +551,7 @@ func (c *Consensus[T]) Commit(ctx context.Context, _ *abciproto.CommitRequest) (
599551
// Vote extensions
600552

601553
// VerifyVoteExtension implements types.Application.
602-
func (c *Consensus[T]) VerifyVoteExtension(
554+
func (c *consensus[T]) VerifyVoteExtension(
603555
ctx context.Context,
604556
req *abciproto.VerifyVoteExtensionRequest,
605557
) (*abciproto.VerifyVoteExtensionResponse, error) {
@@ -641,7 +593,7 @@ func (c *Consensus[T]) VerifyVoteExtension(
641593
}
642594

643595
// ExtendVote implements types.Application.
644-
func (c *Consensus[T]) ExtendVote(ctx context.Context, req *abciproto.ExtendVoteRequest) (*abciproto.ExtendVoteResponse, error) {
596+
func (c *consensus[T]) ExtendVote(ctx context.Context, req *abciproto.ExtendVoteRequest) (*abciproto.ExtendVoteResponse, error) {
645597
// If vote extensions are not enabled, as a safety precaution, we return an
646598
// error.
647599
cp, err := c.GetConsensusParams(ctx)

server/v2/cometbft/abci_test.go

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"encoding/json"
77
"io"
88
"strings"
9+
"sync"
910
"testing"
1011
"time"
1112

@@ -637,7 +638,7 @@ func TestConsensus_Query(t *testing.T) {
637638
require.Equal(t, res.Value, []byte(nil))
638639
}
639640

640-
func setUpConsensus(t *testing.T, gasLimit uint64, mempool mempool.Mempool[mock.Tx]) *Consensus[mock.Tx] {
641+
func setUpConsensus(t *testing.T, gasLimit uint64, mempool mempool.Mempool[mock.Tx]) *consensus[mock.Tx] {
641642
t.Helper()
642643

643644
msgRouterBuilder := getMsgRouterBuilder(t, func(ctx context.Context, msg *gogotypes.BoolValue) (*gogotypes.BoolValue, error) {
@@ -699,9 +700,17 @@ func setUpConsensus(t *testing.T, gasLimit uint64, mempool mempool.Mempool[mock.
699700
nil,
700701
)
701702

702-
return NewConsensus[mock.Tx](log.NewNopLogger(), "testing-app", am,
703-
mempool, map[string]struct{}{}, nil, mockStore,
704-
Config{AppTomlConfig: DefaultAppTomlConfig()}, mock.TxCodec{}, "test")
703+
return &consensus[mock.Tx]{
704+
logger: log.NewNopLogger(),
705+
appName: "testing-app",
706+
app: am,
707+
mempool: mempool,
708+
store: mockStore,
709+
cfg: Config{AppTomlConfig: DefaultAppTomlConfig()},
710+
txCodec: mock.TxCodec{},
711+
chainID: "test",
712+
getProtoRegistry: sync.OnceValues(proto.MergedRegistry),
713+
}
705714
}
706715

707716
// Check target version same with store's latest version

server/v2/cometbft/commands.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -388,7 +388,7 @@ func (s *CometBFTServer[T]) BootstrapStateCmd() *cobra.Command {
388388
return err
389389
}
390390
if height == 0 {
391-
height, err = s.Consensus.store.GetLatestVersion()
391+
height, err = s.store.GetLatestVersion()
392392
if err != nil {
393393
return err
394394
}

server/v2/cometbft/grpc.go

Lines changed: 26 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,8 @@ package cometbft
33
import (
44
"context"
55

6-
v1 "github.com/cometbft/cometbft/api/cometbft/abci/v1"
6+
abci "github.com/cometbft/cometbft/abci/types"
7+
abciproto "github.com/cometbft/cometbft/api/cometbft/abci/v1"
78
"github.com/cosmos/gogoproto/proto"
89
"google.golang.org/grpc"
910
"google.golang.org/grpc/codes"
@@ -12,8 +13,8 @@ import (
1213
autocliv1 "cosmossdk.io/api/cosmos/autocli/v1"
1314
cmtv1beta1 "cosmossdk.io/api/cosmos/base/tendermint/v1beta1"
1415
"cosmossdk.io/core/server"
16+
corestore "cosmossdk.io/core/store"
1517
"cosmossdk.io/core/transaction"
16-
errorsmod "cosmossdk.io/errors/v2"
1718

1819
"github.com/cosmos/cosmos-sdk/client"
1920
"github.com/cosmos/cosmos-sdk/client/grpc/cmtservice"
@@ -23,17 +24,25 @@ import (
2324
txtypes "github.com/cosmos/cosmos-sdk/types/tx"
2425
)
2526

26-
// GRPCServiceRegistrar returns a function that registers the CometBFT gRPC service
27+
type appSimulator[T transaction.Tx] interface {
28+
Simulate(ctx context.Context, tx T) (server.TxResult, corestore.WriterMap, error)
29+
}
30+
31+
// gRPCServiceRegistrar returns a function that registers the CometBFT gRPC service
2732
// Those services are defined for backward compatibility.
2833
// Eventually, they will be removed in favor of the new gRPC services.
29-
func (c *Consensus[T]) GRPCServiceRegistrar(
34+
func gRPCServiceRegistrar[T transaction.Tx](
3035
clientCtx client.Context,
3136
cfg server.ConfigMap,
37+
cometBFTAppConfig *AppTomlConfig,
38+
txCodec transaction.Codec[T],
39+
consensus abci.Application,
40+
app appSimulator[T],
3241
) func(srv *grpc.Server) error {
3342
return func(srv *grpc.Server) error {
34-
cmtservice.RegisterServiceServer(srv, cmtservice.NewQueryServer(clientCtx.Client, c.Query, clientCtx.ConsensusAddressCodec))
35-
txtypes.RegisterServiceServer(srv, txServer[T]{clientCtx, c})
36-
nodeservice.RegisterServiceServer(srv, nodeServer[T]{cfg, c})
43+
cmtservice.RegisterServiceServer(srv, cmtservice.NewQueryServer(clientCtx.Client, consensus.Query, clientCtx.ConsensusAddressCodec))
44+
txtypes.RegisterServiceServer(srv, txServer[T]{clientCtx, txCodec, app})
45+
nodeservice.RegisterServiceServer(srv, nodeServer[T]{cfg, cometBFTAppConfig, consensus})
3746

3847
return nil
3948
}
@@ -86,7 +95,8 @@ var CometBFTAutoCLIDescriptor = &autocliv1.ServiceCommandDescriptor{
8695

8796
type txServer[T transaction.Tx] struct {
8897
clientCtx client.Context
89-
consensus *Consensus[T]
98+
txCodec transaction.Codec[T]
99+
app appSimulator[T]
90100
}
91101

92102
// BroadcastTx implements tx.ServiceServer.
@@ -132,12 +142,12 @@ func (t txServer[T]) Simulate(ctx context.Context, req *txtypes.SimulateRequest)
132142
return nil, status.Errorf(codes.InvalidArgument, "empty txBytes is not allowed")
133143
}
134144

135-
tx, err := t.consensus.txCodec.Decode(txBytes)
145+
tx, err := t.txCodec.Decode(txBytes)
136146
if err != nil {
137-
return nil, errorsmod.Wrap(err, "failed to decode tx")
147+
return nil, status.Errorf(codes.InvalidArgument, "failed to decode tx: %v", err)
138148
}
139149

140-
txResult, _, err := t.consensus.app.Simulate(ctx, tx)
150+
txResult, _, err := t.app.Simulate(ctx, tx)
141151
if err != nil {
142152
return nil, status.Errorf(codes.Unknown, "%v with gas used: '%d'", err, txResult.GasUsed)
143153
}
@@ -186,8 +196,9 @@ func (t txServer[T]) TxEncodeAmino(context.Context, *txtypes.TxEncodeAminoReques
186196
var _ txtypes.ServiceServer = txServer[transaction.Tx]{}
187197

188198
type nodeServer[T transaction.Tx] struct {
189-
cfg server.ConfigMap
190-
consensus *Consensus[T]
199+
cfg server.ConfigMap
200+
cometBFTAppConfig *AppTomlConfig
201+
consensus abci.Application
191202
}
192203

193204
func (s nodeServer[T]) Config(ctx context.Context, _ *nodeservice.ConfigRequest) (*nodeservice.ConfigResponse, error) {
@@ -201,12 +212,12 @@ func (s nodeServer[T]) Config(ctx context.Context, _ *nodeservice.ConfigRequest)
201212
MinimumGasPrice: minGasPricesStr,
202213
PruningKeepRecent: "ambiguous in v2",
203214
PruningInterval: "ambiguous in v2",
204-
HaltHeight: s.consensus.cfg.AppTomlConfig.HaltHeight,
215+
HaltHeight: s.cometBFTAppConfig.HaltHeight,
205216
}, nil
206217
}
207218

208219
func (s nodeServer[T]) Status(ctx context.Context, _ *nodeservice.StatusRequest) (*nodeservice.StatusResponse, error) {
209-
nodeInfo, err := s.consensus.Info(ctx, &v1.InfoRequest{})
220+
nodeInfo, err := s.consensus.Info(ctx, &abciproto.InfoRequest{})
210221
if err != nil {
211222
return nil, err
212223
}

0 commit comments

Comments
 (0)