diff --git a/CHANGELOG_PENDING.md b/CHANGELOG_PENDING.md index 25c97aff750..945edbd599d 100644 --- a/CHANGELOG_PENDING.md +++ b/CHANGELOG_PENDING.md @@ -12,6 +12,8 @@ - Go API - [p2p] \#9625 Remove unused p2p/trust package (@cmwaters) + - [rpc] \#9655 Remove global environment and replace with constructor. (@williambanfield,@tychoish) + - [node] \#9655 Move DBContext and DBProvider from the node package to the config package. (@williambanfield,@tychoish) - Blockchain Protocol @@ -22,6 +24,7 @@ - [tools/tm-signer-harness] \#6498 Set OS home dir to instead of the hardcoded PATH. (@JayT106) - [metrics] \#9682 move state-syncing and block-syncing metrics to their respective packages (@cmwaters) labels have moved from block_syncing -> blocksync_syncing and state_syncing -> statesync_syncing + - [inspect] \#9655 Add a new `inspect` command for introspecting the state and block store of a crashed tendermint node. (@williambanfield) ### FEATURES diff --git a/cmd/tendermint/commands/inspect.go b/cmd/tendermint/commands/inspect.go new file mode 100644 index 00000000000..9e473ec0084 --- /dev/null +++ b/cmd/tendermint/commands/inspect.go @@ -0,0 +1,87 @@ +package commands + +import ( + "context" + "os" + "os/signal" + "syscall" + + "github.com/spf13/cobra" + + cfg "github.com/tendermint/tendermint/config" + "github.com/tendermint/tendermint/inspect" + "github.com/tendermint/tendermint/state" + "github.com/tendermint/tendermint/state/indexer/block" + "github.com/tendermint/tendermint/store" + "github.com/tendermint/tendermint/types" +) + +// InspectCmd is the command for starting an inspect server. +var InspectCmd = &cobra.Command{ + Use: "inspect", + Short: "Run an inspect server for investigating Tendermint state", + Long: ` + inspect runs a subset of Tendermint's RPC endpoints that are useful for debugging + issues with Tendermint. + + When the Tendermint consensus engine detects inconsistent state, it will crash the + Tendermint process. Tendermint will not start up while in this inconsistent state. + The inspect command can be used to query the block and state store using Tendermint + RPC calls to debug issues of inconsistent state. + `, + + RunE: runInspect, +} + +func init() { + InspectCmd.Flags(). + String("rpc.laddr", + config.RPC.ListenAddress, "RPC listenener address. Port required") + InspectCmd.Flags(). + String("db-backend", + config.DBBackend, "database backend: goleveldb | cleveldb | boltdb | rocksdb | badgerdb") + InspectCmd.Flags(). + String("db-dir", config.DBPath, "database directory") +} + +func runInspect(cmd *cobra.Command, args []string) error { + ctx, cancel := context.WithCancel(cmd.Context()) + defer cancel() + + c := make(chan os.Signal, 1) + signal.Notify(c, syscall.SIGTERM, syscall.SIGINT) + go func() { + <-c + cancel() + }() + + blockStoreDB, err := cfg.DefaultDBProvider(&cfg.DBContext{ID: "blockstore", Config: config}) + if err != nil { + return err + } + blockStore := store.NewBlockStore(blockStoreDB) + defer blockStore.Close() + + stateDB, err := cfg.DefaultDBProvider(&cfg.DBContext{ID: "state", Config: config}) + if err != nil { + return err + } + stateStore := state.NewStore(stateDB, state.StoreOptions{DiscardABCIResponses: false}) + defer stateStore.Close() + + genDoc, err := types.GenesisDocFromFile(config.GenesisFile()) + if err != nil { + return err + } + txIndexer, blockIndexer, err := block.IndexerFromConfig(config, cfg.DefaultDBProvider, genDoc.ChainID) + if err != nil { + return err + } + ins := inspect.New(config.RPC, blockStore, stateStore, txIndexer, blockIndexer, logger) + + logger.Info("starting inspect server") + if err := ins.Run(ctx); err != nil { + return err + } + return nil +} diff --git a/cmd/tendermint/main.go b/cmd/tendermint/main.go index 59e7a1b1288..47cad15ed2a 100644 --- a/cmd/tendermint/main.go +++ b/cmd/tendermint/main.go @@ -30,6 +30,7 @@ func main() { cmd.VersionCmd, cmd.RollbackStateCmd, cmd.CompactGoLevelDBCmd, + cmd.InspectCmd, debug.DebugCmd, cli.NewCompletionCmd(rootCmd, true), ) diff --git a/config/db.go b/config/db.go new file mode 100644 index 00000000000..bbc2869446a --- /dev/null +++ b/config/db.go @@ -0,0 +1,30 @@ +package config + +import ( + "context" + + dbm "github.com/tendermint/tm-db" + + "github.com/tendermint/tendermint/libs/log" + "github.com/tendermint/tendermint/libs/service" +) + +// ServiceProvider takes a config and a logger and returns a ready to go Node. +type ServiceProvider func(context.Context, *Config, log.Logger) (service.Service, error) + +// DBContext specifies config information for loading a new DB. +type DBContext struct { + ID string + Config *Config +} + +// DBProvider takes a DBContext and returns an instantiated DB. +type DBProvider func(*DBContext) (dbm.DB, error) + +// DefaultDBProvider returns a database using the DBBackend and DBDir +// specified in the Config. +func DefaultDBProvider(ctx *DBContext) (dbm.DB, error) { + dbType := dbm.BackendType(ctx.Config.DBBackend) + + return dbm.NewDB(ctx.ID, dbType, ctx.Config.DBDir()) +} diff --git a/consensus/replay.go b/consensus/replay.go index 83569aff204..edac91425ec 100644 --- a/consensus/replay.go +++ b/consensus/replay.go @@ -54,7 +54,7 @@ func (cs *State) readReplayMessage(msg *TimedWALMessage, newStepSub types.Subscr if m.Height != m2.Height || m.Round != m2.Round || m.Step != m2.Step { return fmt.Errorf("roundState mismatch. Got %v; Expected %v", m2, m) } - case <-newStepSub.Cancelled(): + case <-newStepSub.Canceled(): return fmt.Errorf("failed to read off newStepSub.Out(). newStepSub was canceled") case <-ticker: return fmt.Errorf("failed to read off newStepSub.Out()") diff --git a/consensus/replay_test.go b/consensus/replay_test.go index 44bbe09bf0d..f4c4e14e8cf 100644 --- a/consensus/replay_test.go +++ b/consensus/replay_test.go @@ -97,7 +97,7 @@ func startNewStateAndWaitForBlock(t *testing.T, consensusReplayConfig *cfg.Confi require.NoError(t, err) select { case <-newBlockSub.Out(): - case <-newBlockSub.Cancelled(): + case <-newBlockSub.Canceled(): t.Fatal("newBlockSub was canceled") case <-time.After(120 * time.Second): t.Fatal("Timed out waiting for new block (see trace above)") @@ -1198,6 +1198,7 @@ func (bs *mockBlockStore) PruneBlocks(height int64, state sm.State) (uint64, int } func (bs *mockBlockStore) DeleteLatestBlock() error { return nil } +func (bs *mockBlockStore) Close() error { return nil } //--------------------------------------- // Test handshake/init chain diff --git a/docs/tools/debugging.md b/docs/tools/debugging.md index 17fa0ec1174..2c9998fbea6 100644 --- a/docs/tools/debugging.md +++ b/docs/tools/debugging.md @@ -55,3 +55,47 @@ given destination directory. Each archive will contain: Note: goroutine.out and heap.out will only be written if a profile address is provided and is operational. This command is blocking and will log any error. + +## Tendermint Inspect + +Tendermint includes an `inspect` command for querying Tendermint's state store and block +store over Tendermint RPC. + +When the Tendermint consensus engine detects inconsistent state, it will crash the +entire Tendermint process. +While in this inconsistent state, a node running Tendermint's consensus engine will not start up. +The `inspect` command runs only a subset of Tendermint's RPC endpoints for querying the block store +and state store. +`inspect` allows operators to query a read-only view of the stage. +`inspect` does not run the consensus engine at all and can therefore be used to debug +processes that have crashed due to inconsistent state. + +### Running inspect + +Start up the `inspect` tool on the machine where Tendermint crashed using: +```bash +tendermint inspect --home= +``` + +`inspect` will use the data directory specified in your Tendermint configuration file. +`inspect` will also run the RPC server at the address specified in your Tendermint configuration file. + +### Using inspect + +With the `inspect` server running, you can access RPC endpoints that are critically important +for debugging. +Calling the `/status`, `/consensus_state` and `/dump_consensus_state` RPC endpoint +will return useful information about the Tendermint consensus state. + +To start the `inspect` process, run +```bash +tendermint inspect +``` + +### RPC endpoints + +The list of available RPC endpoints can be found by making a request to the RPC port. +For an `inspect` process running on `127.0.0.1:26657`, navigate your browser to +`http://127.0.0.1:26657/` to retrieve the list of enabled RPC endpoints. + +Additional information on the Tendermint RPC endpoints can be found in the [rpc documentation](https://docs.tendermint.com/master/rpc). diff --git a/go.sum b/go.sum index 142fa8d978e..d7973207e88 100644 --- a/go.sum +++ b/go.sum @@ -1361,6 +1361,7 @@ golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20210220032951-036812b2e83c h1:5KslGYwFpkhGh+Q16bwMP3cOontH8FOep7tGV86Y7SQ= golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o= diff --git a/inspect/doc.go b/inspect/doc.go new file mode 100644 index 00000000000..0d17771b586 --- /dev/null +++ b/inspect/doc.go @@ -0,0 +1,36 @@ +/* +Package inspect provides a tool for investigating the state of a +failed Tendermint node. + +This package provides the Inspector type. The Inspector type runs a subset of the Tendermint +RPC endpoints that are useful for debugging issues with Tendermint consensus. + +When a node running the Tendermint consensus engine detects an inconsistent consensus state, +the entire node will crash. The Tendermint consensus engine cannot run in this +inconsistent state so the node will not be able to start up again. + +The RPC endpoints provided by the Inspector type allow for a node operator to inspect +the block store and state store to better understand what may have caused the inconsistent state. + +The Inspector type's lifecycle is controlled by a context.Context + + ins := inspect.NewFromConfig(rpcConfig) + ctx, cancelFunc:= context.WithCancel(context.Background()) + + // Run blocks until the Inspector server is shut down. + go ins.Run(ctx) + ... + + // calling the cancel function will stop the running inspect server + cancelFunc() + +Inspector serves its RPC endpoints on the address configured in the RPC configuration + + rpcConfig.ListenAddress = "tcp://127.0.0.1:26657" + ins := inspect.NewFromConfig(rpcConfig) + go ins.Run(ctx) + +The list of available RPC endpoints can then be viewed by navigating to +http://127.0.0.1:26657/ in the web browser. +*/ +package inspect diff --git a/inspect/inspect.go b/inspect/inspect.go new file mode 100644 index 00000000000..71c44f7bbd3 --- /dev/null +++ b/inspect/inspect.go @@ -0,0 +1,138 @@ +package inspect + +import ( + "context" + "errors" + "net" + "os" + + "github.com/tendermint/tendermint/config" + "github.com/tendermint/tendermint/inspect/rpc" + "github.com/tendermint/tendermint/libs/log" + tmstrings "github.com/tendermint/tendermint/libs/strings" + rpccore "github.com/tendermint/tendermint/rpc/core" + "github.com/tendermint/tendermint/state" + "github.com/tendermint/tendermint/state/indexer" + "github.com/tendermint/tendermint/state/indexer/block" + "github.com/tendermint/tendermint/state/txindex" + "github.com/tendermint/tendermint/store" + "github.com/tendermint/tendermint/types" + + "golang.org/x/sync/errgroup" +) + +var ( + logger = log.NewTMLogger(log.NewSyncWriter(os.Stdout)) +) + +// Inspector manages an RPC service that exports methods to debug a failed node. +// After a node shuts down due to a consensus failure, it will no longer start +// up its state cannot easily be inspected. An Inspector value provides a similar interface +// to the node, using the underlying Tendermint data stores, without bringing up +// any other components. A caller can query the Inspector service to inspect the +// persisted state and debug the failure. +type Inspector struct { + routes rpccore.RoutesMap + + config *config.RPCConfig + + logger log.Logger + + // References to the state store and block store are maintained to enable + // the Inspector to safely close them on shutdown. + ss state.Store + bs state.BlockStore +} + +// New returns an Inspector that serves RPC on the specified BlockStore and StateStore. +// The Inspector type does not modify the state or block stores. +// The sinks are used to enable block and transaction querying via the RPC server. +// The caller is responsible for starting and stopping the Inspector service. +// +//nolint:lll +func New(cfg *config.RPCConfig, bs state.BlockStore, ss state.Store, txidx txindex.TxIndexer, blkidx indexer.BlockIndexer, lg log.Logger) *Inspector { + routes := rpc.Routes(*cfg, ss, bs, txidx, blkidx, logger) + eb := types.NewEventBus() + eb.SetLogger(logger.With("module", "events")) + return &Inspector{ + routes: routes, + config: cfg, + logger: logger, + ss: ss, + bs: bs, + } +} + +// NewFromConfig constructs an Inspector using the values defined in the passed in config. +func NewFromConfig(cfg *config.Config) (*Inspector, error) { + bsDB, err := config.DefaultDBProvider(&config.DBContext{ID: "blockstore", Config: cfg}) + if err != nil { + return nil, err + } + bs := store.NewBlockStore(bsDB) + sDB, err := config.DefaultDBProvider(&config.DBContext{ID: "state", Config: cfg}) + if err != nil { + return nil, err + } + genDoc, err := types.GenesisDocFromFile(cfg.GenesisFile()) + if err != nil { + return nil, err + } + txidx, blkidx, err := block.IndexerFromConfig(cfg, config.DefaultDBProvider, genDoc.ChainID) + if err != nil { + return nil, err + } + lg := logger.With("module", "inspect") + ss := state.NewStore(sDB, state.StoreOptions{}) + return New(cfg.RPC, bs, ss, txidx, blkidx, lg), nil +} + +// Run starts the Inspector servers and blocks until the servers shut down. The passed +// in context is used to control the lifecycle of the servers. +func (ins *Inspector) Run(ctx context.Context) error { + defer ins.bs.Close() + defer ins.ss.Close() + + return startRPCServers(ctx, ins.config, ins.logger, ins.routes) +} + +func startRPCServers(ctx context.Context, cfg *config.RPCConfig, logger log.Logger, routes rpccore.RoutesMap) error { + g, tctx := errgroup.WithContext(ctx) + listenAddrs := tmstrings.SplitAndTrimEmpty(cfg.ListenAddress, ",", " ") + rh := rpc.Handler(cfg, routes, logger) + for _, listenerAddr := range listenAddrs { + server := rpc.Server{ + Logger: logger, + Config: cfg, + Handler: rh, + Addr: listenerAddr, + } + if cfg.IsTLSEnabled() { + keyFile := cfg.KeyFile() + certFile := cfg.CertFile() + listenerAddr := listenerAddr + g.Go(func() error { + logger.Info("RPC HTTPS server starting", "address", listenerAddr, + "certfile", certFile, "keyfile", keyFile) + err := server.ListenAndServeTLS(tctx, certFile, keyFile) + if !errors.Is(err, net.ErrClosed) { + return err + } + logger.Info("RPC HTTPS server stopped", "address", listenerAddr) + return nil + }) + } else { + listenerAddr := listenerAddr + g.Go(func() error { + logger.Info("RPC HTTP server starting", "address", listenerAddr) + err := server.ListenAndServe(tctx) + if !errors.Is(err, net.ErrClosed) { + return err + } + logger.Info("RPC HTTP server stopped", "address", listenerAddr) + return nil + }) + } + } + return g.Wait() +} diff --git a/inspect/inspect_test.go b/inspect/inspect_test.go new file mode 100644 index 00000000000..c4ad7a5e44b --- /dev/null +++ b/inspect/inspect_test.go @@ -0,0 +1,605 @@ +package inspect_test + +import ( + "context" + "fmt" + "net" + "os" + "strings" + "sync" + "testing" + "time" + + "github.com/fortytw2/leaktest" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + abcitypes "github.com/tendermint/tendermint/abci/types" + "github.com/tendermint/tendermint/config" + "github.com/tendermint/tendermint/inspect" + "github.com/tendermint/tendermint/internal/test" + "github.com/tendermint/tendermint/libs/log" + "github.com/tendermint/tendermint/libs/pubsub/query" + "github.com/tendermint/tendermint/proto/tendermint/state" + httpclient "github.com/tendermint/tendermint/rpc/client/http" + indexermocks "github.com/tendermint/tendermint/state/indexer/mocks" + statemocks "github.com/tendermint/tendermint/state/mocks" + txindexmocks "github.com/tendermint/tendermint/state/txindex/mocks" + "github.com/tendermint/tendermint/types" +) + +func TestInspectConstructor(t *testing.T) { + cfg := test.ResetTestRoot("test") + t.Cleanup(leaktest.Check(t)) + defer func() { _ = os.RemoveAll(cfg.RootDir) }() + t.Run("from config", func(t *testing.T) { + d, err := inspect.NewFromConfig(cfg) + require.NoError(t, err) + require.NotNil(t, d) + }) + +} + +func TestInspectRun(t *testing.T) { + cfg := test.ResetTestRoot("test") + t.Cleanup(leaktest.Check(t)) + defer func() { _ = os.RemoveAll(cfg.RootDir) }() + t.Run("from config", func(t *testing.T) { + d, err := inspect.NewFromConfig(cfg) + require.NoError(t, err) + ctx, cancel := context.WithCancel(context.Background()) + stoppedWG := &sync.WaitGroup{} + stoppedWG.Add(1) + go func() { + require.NoError(t, d.Run(ctx)) + stoppedWG.Done() + }() + cancel() + stoppedWG.Wait() + }) + +} + +func TestBlock(t *testing.T) { + testHeight := int64(1) + testBlock := new(types.Block) + testBlock.Header.Height = testHeight + testBlock.Header.LastCommitHash = []byte("test hash") + stateStoreMock := &statemocks.Store{} + stateStoreMock.On("Close").Return(nil) + + blockStoreMock := &statemocks.BlockStore{} + blockStoreMock.On("Height").Return(testHeight) + blockStoreMock.On("Base").Return(int64(0)) + blockStoreMock.On("LoadBlockMeta", testHeight).Return(&types.BlockMeta{}) + blockStoreMock.On("LoadBlock", testHeight).Return(testBlock) + blockStoreMock.On("Close").Return(nil) + + txIndexerMock := &txindexmocks.TxIndexer{} + blkIdxMock := &indexermocks.BlockIndexer{} + + rpcConfig := config.TestRPCConfig() + l := log.TestingLogger() + d := inspect.New(rpcConfig, blockStoreMock, stateStoreMock, txIndexerMock, blkIdxMock, l) + ctx, cancel := context.WithCancel(context.Background()) + wg := &sync.WaitGroup{} + wg.Add(1) + + startedWG := &sync.WaitGroup{} + startedWG.Add(1) + go func() { + startedWG.Done() + defer wg.Done() + require.NoError(t, d.Run(ctx)) + }() + // FIXME: used to induce context switch. + // Determine more deterministic method for prompting a context switch + startedWG.Wait() + requireConnect(t, rpcConfig.ListenAddress, 20) + cli, err := httpclient.New(rpcConfig.ListenAddress, "/websocket") + require.NoError(t, err) + resultBlock, err := cli.Block(context.Background(), &testHeight) + require.NoError(t, err) + require.Equal(t, testBlock.Height, resultBlock.Block.Height) + require.Equal(t, testBlock.LastCommitHash, resultBlock.Block.LastCommitHash) + cancel() + wg.Wait() + + blockStoreMock.AssertExpectations(t) + stateStoreMock.AssertExpectations(t) +} + +func TestTxSearch(t *testing.T) { + testHash := []byte("test") + testTx := []byte("tx") + testQuery := fmt.Sprintf("tx.hash='%s'", string(testHash)) + testTxResult := &abcitypes.TxResult{ + Height: 1, + Index: 100, + Tx: testTx, + } + + stateStoreMock := &statemocks.Store{} + stateStoreMock.On("Close").Return(nil) + blockStoreMock := &statemocks.BlockStore{} + blockStoreMock.On("Close").Return(nil) + txIndexerMock := &txindexmocks.TxIndexer{} + blkIdxMock := &indexermocks.BlockIndexer{} + txIndexerMock.On("Search", mock.Anything, + mock.MatchedBy(func(q *query.Query) bool { + return testQuery == strings.ReplaceAll(q.String(), " ", "") + })). + Return([]*abcitypes.TxResult{testTxResult}, nil) + + rpcConfig := config.TestRPCConfig() + l := log.TestingLogger() + d := inspect.New(rpcConfig, blockStoreMock, stateStoreMock, txIndexerMock, blkIdxMock, l) + ctx, cancel := context.WithCancel(context.Background()) + wg := &sync.WaitGroup{} + wg.Add(1) + + startedWG := &sync.WaitGroup{} + startedWG.Add(1) + go func() { + startedWG.Done() + defer wg.Done() + require.NoError(t, d.Run(ctx)) + }() + // FIXME: used to induce context switch. + // Determine more deterministic method for prompting a context switch + startedWG.Wait() + requireConnect(t, rpcConfig.ListenAddress, 20) + cli, err := httpclient.New(rpcConfig.ListenAddress, "/websocket") + require.NoError(t, err) + + var page = 1 + resultTxSearch, err := cli.TxSearch(context.Background(), testQuery, false, &page, &page, "") + require.NoError(t, err) + require.Len(t, resultTxSearch.Txs, 1) + require.Equal(t, types.Tx(testTx), resultTxSearch.Txs[0].Tx) + + cancel() + wg.Wait() + + txIndexerMock.AssertExpectations(t) + stateStoreMock.AssertExpectations(t) + blockStoreMock.AssertExpectations(t) +} +func TestTx(t *testing.T) { + testHash := []byte("test") + testTx := []byte("tx") + + stateStoreMock := &statemocks.Store{} + stateStoreMock.On("Close").Return(nil) + blockStoreMock := &statemocks.BlockStore{} + blockStoreMock.On("Close").Return(nil) + blkIdxMock := &indexermocks.BlockIndexer{} + txIndexerMock := &txindexmocks.TxIndexer{} + txIndexerMock.On("Get", testHash).Return(&abcitypes.TxResult{ + Tx: testTx, + }, nil) + + rpcConfig := config.TestRPCConfig() + l := log.TestingLogger() + d := inspect.New(rpcConfig, blockStoreMock, stateStoreMock, txIndexerMock, blkIdxMock, l) + ctx, cancel := context.WithCancel(context.Background()) + wg := &sync.WaitGroup{} + wg.Add(1) + + startedWG := &sync.WaitGroup{} + startedWG.Add(1) + go func() { + startedWG.Done() + defer wg.Done() + require.NoError(t, d.Run(ctx)) + }() + // FIXME: used to induce context switch. + // Determine more deterministic method for prompting a context switch + startedWG.Wait() + requireConnect(t, rpcConfig.ListenAddress, 20) + cli, err := httpclient.New(rpcConfig.ListenAddress, "/websocket") + require.NoError(t, err) + + res, err := cli.Tx(context.Background(), testHash, false) + require.NoError(t, err) + require.Equal(t, types.Tx(testTx), res.Tx) + + cancel() + wg.Wait() + + txIndexerMock.AssertExpectations(t) + stateStoreMock.AssertExpectations(t) + blockStoreMock.AssertExpectations(t) +} +func TestConsensusParams(t *testing.T) { + testHeight := int64(1) + testMaxGas := int64(55) + stateStoreMock := &statemocks.Store{} + stateStoreMock.On("Close").Return(nil) + blockStoreMock := &statemocks.BlockStore{} + blockStoreMock.On("Close").Return(nil) + blockStoreMock.On("Height").Return(testHeight) + blockStoreMock.On("Base").Return(int64(0)) + stateStoreMock.On("LoadConsensusParams", testHeight).Return(types.ConsensusParams{ + Block: types.BlockParams{ + MaxGas: testMaxGas, + }, + }, nil) + txIndexerMock := &txindexmocks.TxIndexer{} + blkIdxMock := &indexermocks.BlockIndexer{} + rpcConfig := config.TestRPCConfig() + l := log.TestingLogger() + d := inspect.New(rpcConfig, blockStoreMock, stateStoreMock, txIndexerMock, blkIdxMock, l) + + ctx, cancel := context.WithCancel(context.Background()) + wg := &sync.WaitGroup{} + wg.Add(1) + + startedWG := &sync.WaitGroup{} + startedWG.Add(1) + go func() { + startedWG.Done() + defer wg.Done() + require.NoError(t, d.Run(ctx)) + }() + // FIXME: used to induce context switch. + // Determine more deterministic method for prompting a context switch + startedWG.Wait() + requireConnect(t, rpcConfig.ListenAddress, 20) + cli, err := httpclient.New(rpcConfig.ListenAddress, "/websocket") + require.NoError(t, err) + params, err := cli.ConsensusParams(context.Background(), &testHeight) + require.NoError(t, err) + require.Equal(t, params.ConsensusParams.Block.MaxGas, testMaxGas) + + cancel() + wg.Wait() + + blockStoreMock.AssertExpectations(t) + stateStoreMock.AssertExpectations(t) +} + +func TestBlockResults(t *testing.T) { + testHeight := int64(1) + testGasUsed := int64(100) + stateStoreMock := &statemocks.Store{} + stateStoreMock.On("Close").Return(nil) + // tmstate "github.com/tendermint/tendermint/proto/tendermint/state" + stateStoreMock.On("LoadABCIResponses", testHeight).Return(&state.ABCIResponses{ + DeliverTxs: []*abcitypes.ResponseDeliverTx{ + { + GasUsed: testGasUsed, + }, + }, + EndBlock: &abcitypes.ResponseEndBlock{}, + BeginBlock: &abcitypes.ResponseBeginBlock{}, + }, nil) + blockStoreMock := &statemocks.BlockStore{} + blockStoreMock.On("Close").Return(nil) + blockStoreMock.On("Base").Return(int64(0)) + blockStoreMock.On("Height").Return(testHeight) + txIndexerMock := &txindexmocks.TxIndexer{} + blkIdxMock := &indexermocks.BlockIndexer{} + rpcConfig := config.TestRPCConfig() + l := log.TestingLogger() + d := inspect.New(rpcConfig, blockStoreMock, stateStoreMock, txIndexerMock, blkIdxMock, l) + + ctx, cancel := context.WithCancel(context.Background()) + wg := &sync.WaitGroup{} + wg.Add(1) + + startedWG := &sync.WaitGroup{} + startedWG.Add(1) + go func() { + startedWG.Done() + defer wg.Done() + require.NoError(t, d.Run(ctx)) + }() + // FIXME: used to induce context switch. + // Determine more deterministic method for prompting a context switch + startedWG.Wait() + requireConnect(t, rpcConfig.ListenAddress, 20) + cli, err := httpclient.New(rpcConfig.ListenAddress, "/websocket") + require.NoError(t, err) + res, err := cli.BlockResults(context.Background(), &testHeight) + require.NoError(t, err) + require.Equal(t, res.TxsResults[0].GasUsed, testGasUsed) + + cancel() + wg.Wait() + + blockStoreMock.AssertExpectations(t) + stateStoreMock.AssertExpectations(t) +} + +func TestCommit(t *testing.T) { + testHeight := int64(1) + testRound := int32(101) + stateStoreMock := &statemocks.Store{} + stateStoreMock.On("Close").Return(nil) + blockStoreMock := &statemocks.BlockStore{} + blockStoreMock.On("Close").Return(nil) + blockStoreMock.On("Base").Return(int64(0)) + blockStoreMock.On("Height").Return(testHeight) + blockStoreMock.On("LoadBlockMeta", testHeight).Return(&types.BlockMeta{}, nil) + blockStoreMock.On("LoadSeenCommit", testHeight).Return(&types.Commit{ + Height: testHeight, + Round: testRound, + }, nil) + txIndexerMock := &txindexmocks.TxIndexer{} + blkIdxMock := &indexermocks.BlockIndexer{} + rpcConfig := config.TestRPCConfig() + l := log.TestingLogger() + d := inspect.New(rpcConfig, blockStoreMock, stateStoreMock, txIndexerMock, blkIdxMock, l) + + ctx, cancel := context.WithCancel(context.Background()) + wg := &sync.WaitGroup{} + wg.Add(1) + + startedWG := &sync.WaitGroup{} + startedWG.Add(1) + go func() { + startedWG.Done() + defer wg.Done() + require.NoError(t, d.Run(ctx)) + }() + // FIXME: used to induce context switch. + // Determine more deterministic method for prompting a context switch + startedWG.Wait() + requireConnect(t, rpcConfig.ListenAddress, 20) + cli, err := httpclient.New(rpcConfig.ListenAddress, "/websocket") + require.NoError(t, err) + res, err := cli.Commit(context.Background(), &testHeight) + require.NoError(t, err) + require.NotNil(t, res) + require.Equal(t, res.SignedHeader.Commit.Round, testRound) + + cancel() + wg.Wait() + + blockStoreMock.AssertExpectations(t) + stateStoreMock.AssertExpectations(t) +} + +func TestBlockByHash(t *testing.T) { + testHeight := int64(1) + testHash := []byte("test hash") + testBlock := new(types.Block) + testBlock.Header.Height = testHeight + testBlock.Header.LastCommitHash = testHash + stateStoreMock := &statemocks.Store{} + stateStoreMock.On("Close").Return(nil) + blockStoreMock := &statemocks.BlockStore{} + blockStoreMock.On("Close").Return(nil) + blockStoreMock.On("LoadBlockMeta", testHeight).Return(&types.BlockMeta{ + BlockID: types.BlockID{ + Hash: testHash, + }, + Header: types.Header{ + Height: testHeight, + }, + }, nil) + blockStoreMock.On("LoadBlockByHash", testHash).Return(testBlock, nil) + txIndexerMock := &txindexmocks.TxIndexer{} + blkIdxMock := &indexermocks.BlockIndexer{} + rpcConfig := config.TestRPCConfig() + l := log.TestingLogger() + d := inspect.New(rpcConfig, blockStoreMock, stateStoreMock, txIndexerMock, blkIdxMock, l) + + ctx, cancel := context.WithCancel(context.Background()) + wg := &sync.WaitGroup{} + wg.Add(1) + + startedWG := &sync.WaitGroup{} + startedWG.Add(1) + go func() { + startedWG.Done() + defer wg.Done() + require.NoError(t, d.Run(ctx)) + }() + // FIXME: used to induce context switch. + // Determine more deterministic method for prompting a context switch + startedWG.Wait() + requireConnect(t, rpcConfig.ListenAddress, 20) + cli, err := httpclient.New(rpcConfig.ListenAddress, "/websocket") + require.NoError(t, err) + res, err := cli.BlockByHash(context.Background(), testHash) + require.NoError(t, err) + require.NotNil(t, res) + require.Equal(t, []byte(res.BlockID.Hash), testHash) + + cancel() + wg.Wait() + + blockStoreMock.AssertExpectations(t) + stateStoreMock.AssertExpectations(t) +} + +func TestBlockchain(t *testing.T) { + testHeight := int64(1) + testBlock := new(types.Block) + testBlockHash := []byte("test hash") + testBlock.Header.Height = testHeight + testBlock.Header.LastCommitHash = testBlockHash + stateStoreMock := &statemocks.Store{} + stateStoreMock.On("Close").Return(nil) + + blockStoreMock := &statemocks.BlockStore{} + blockStoreMock.On("Close").Return(nil) + blockStoreMock.On("Height").Return(testHeight) + blockStoreMock.On("Base").Return(int64(0)) + blockStoreMock.On("LoadBlockMeta", testHeight).Return(&types.BlockMeta{ + BlockID: types.BlockID{ + Hash: testBlockHash, + }, + }) + txIndexerMock := &txindexmocks.TxIndexer{} + blkIdxMock := &indexermocks.BlockIndexer{} + rpcConfig := config.TestRPCConfig() + l := log.TestingLogger() + d := inspect.New(rpcConfig, blockStoreMock, stateStoreMock, txIndexerMock, blkIdxMock, l) + + ctx, cancel := context.WithCancel(context.Background()) + wg := &sync.WaitGroup{} + wg.Add(1) + + startedWG := &sync.WaitGroup{} + startedWG.Add(1) + go func() { + startedWG.Done() + defer wg.Done() + require.NoError(t, d.Run(ctx)) + }() + // FIXME: used to induce context switch. + // Determine more deterministic method for prompting a context switch + startedWG.Wait() + requireConnect(t, rpcConfig.ListenAddress, 20) + cli, err := httpclient.New(rpcConfig.ListenAddress, "/websocket") + require.NoError(t, err) + res, err := cli.BlockchainInfo(context.Background(), 0, 100) + require.NoError(t, err) + require.NotNil(t, res) + require.Equal(t, testBlockHash, []byte(res.BlockMetas[0].BlockID.Hash)) + + cancel() + wg.Wait() + + blockStoreMock.AssertExpectations(t) + stateStoreMock.AssertExpectations(t) +} + +func TestValidators(t *testing.T) { + testHeight := int64(1) + testVotingPower := int64(100) + testValidators := types.ValidatorSet{ + Validators: []*types.Validator{ + { + VotingPower: testVotingPower, + }, + }, + } + stateStoreMock := &statemocks.Store{} + stateStoreMock.On("Close").Return(nil) + stateStoreMock.On("LoadValidators", testHeight).Return(&testValidators, nil) + + blockStoreMock := &statemocks.BlockStore{} + blockStoreMock.On("Close").Return(nil) + blockStoreMock.On("Height").Return(testHeight) + blockStoreMock.On("Base").Return(int64(0)) + txIndexerMock := &txindexmocks.TxIndexer{} + blkIdxMock := &indexermocks.BlockIndexer{} + rpcConfig := config.TestRPCConfig() + l := log.TestingLogger() + d := inspect.New(rpcConfig, blockStoreMock, stateStoreMock, txIndexerMock, blkIdxMock, l) + + ctx, cancel := context.WithCancel(context.Background()) + wg := &sync.WaitGroup{} + wg.Add(1) + + startedWG := &sync.WaitGroup{} + startedWG.Add(1) + go func() { + startedWG.Done() + defer wg.Done() + require.NoError(t, d.Run(ctx)) + }() + // FIXME: used to induce context switch. + // Determine more deterministic method for prompting a context switch + startedWG.Wait() + requireConnect(t, rpcConfig.ListenAddress, 20) + cli, err := httpclient.New(rpcConfig.ListenAddress, "/websocket") + require.NoError(t, err) + + testPage := 1 + testPerPage := 100 + res, err := cli.Validators(context.Background(), &testHeight, &testPage, &testPerPage) + require.NoError(t, err) + require.NotNil(t, res) + require.Equal(t, testVotingPower, res.Validators[0].VotingPower) + + cancel() + wg.Wait() + + blockStoreMock.AssertExpectations(t) + stateStoreMock.AssertExpectations(t) +} + +func TestBlockSearch(t *testing.T) { + testHeight := int64(1) + testBlockHash := []byte("test hash") + testQuery := "block.height = 1" + stateStoreMock := &statemocks.Store{} + stateStoreMock.On("Close").Return(nil) + + blockStoreMock := &statemocks.BlockStore{} + blockStoreMock.On("Close").Return(nil) + + txIndexerMock := &txindexmocks.TxIndexer{} + blkIdxMock := &indexermocks.BlockIndexer{} + blockStoreMock.On("LoadBlock", testHeight).Return(&types.Block{ + Header: types.Header{ + Height: testHeight, + }, + }, nil) + blockStoreMock.On("LoadBlockMeta", testHeight).Return(&types.BlockMeta{ + BlockID: types.BlockID{ + Hash: testBlockHash, + }, + }) + blkIdxMock.On("Search", mock.Anything, + mock.MatchedBy(func(q *query.Query) bool { return testQuery == q.String() })). + Return([]int64{testHeight}, nil) + rpcConfig := config.TestRPCConfig() + l := log.TestingLogger() + d := inspect.New(rpcConfig, blockStoreMock, stateStoreMock, txIndexerMock, blkIdxMock, l) + + ctx, cancel := context.WithCancel(context.Background()) + wg := &sync.WaitGroup{} + wg.Add(1) + + startedWG := &sync.WaitGroup{} + startedWG.Add(1) + go func() { + startedWG.Done() + defer wg.Done() + require.NoError(t, d.Run(ctx)) + }() + // FIXME: used to induce context switch. + // Determine more deterministic method for prompting a context switch + startedWG.Wait() + requireConnect(t, rpcConfig.ListenAddress, 20) + cli, err := httpclient.New(rpcConfig.ListenAddress, "/websocket") + require.NoError(t, err) + + testPage := 1 + testPerPage := 100 + testOrderBy := "desc" + res, err := cli.BlockSearch(context.Background(), testQuery, &testPage, &testPerPage, testOrderBy) + require.NoError(t, err) + require.NotNil(t, res) + require.Equal(t, testBlockHash, []byte(res.Blocks[0].BlockID.Hash)) + + cancel() + wg.Wait() + + blockStoreMock.AssertExpectations(t) + stateStoreMock.AssertExpectations(t) +} + +func requireConnect(t testing.TB, addr string, retries int) { + parts := strings.SplitN(addr, "://", 2) + if len(parts) != 2 { + t.Fatalf("malformed address to dial: %s", addr) + } + var err error + for i := 0; i < retries; i++ { + var conn net.Conn + conn, err = net.Dial(parts[0], parts[1]) + if err == nil { + conn.Close() + return + } + // FIXME attempt to yield and let the other goroutine continue execution. + time.Sleep(time.Microsecond * 100) + } + t.Fatalf("unable to connect to server %s after %d tries: %s", addr, retries, err) +} diff --git a/inspect/rpc/rpc.go b/inspect/rpc/rpc.go new file mode 100644 index 00000000000..0aa287511d6 --- /dev/null +++ b/inspect/rpc/rpc.go @@ -0,0 +1,128 @@ +package rpc + +import ( + "context" + "net/http" + "time" + + "github.com/rs/cors" + + "github.com/tendermint/tendermint/config" + "github.com/tendermint/tendermint/libs/log" + "github.com/tendermint/tendermint/rpc/core" + "github.com/tendermint/tendermint/rpc/jsonrpc/server" + "github.com/tendermint/tendermint/state" + "github.com/tendermint/tendermint/state/indexer" + "github.com/tendermint/tendermint/state/txindex" +) + +// Server defines parameters for running an Inspector rpc server. +type Server struct { + Addr string // TCP address to listen on, ":http" if empty + Handler http.Handler + Logger log.Logger + Config *config.RPCConfig +} + +// Routes returns the set of routes used by the Inspector server. +func Routes(cfg config.RPCConfig, s state.Store, bs state.BlockStore, txidx txindex.TxIndexer, blkidx indexer.BlockIndexer, logger log.Logger) core.RoutesMap { //nolint: lll + env := &core.Environment{ + Config: cfg, + BlockIndexer: blkidx, + TxIndexer: txidx, + StateStore: s, + BlockStore: bs, + ConsensusReactor: waitSyncCheckerImpl{}, + Logger: logger, + } + return core.RoutesMap{ + "blockchain": server.NewRPCFunc(env.BlockchainInfo, "minHeight,maxHeight"), + "consensus_params": server.NewRPCFunc(env.ConsensusParams, "height"), + "block": server.NewRPCFunc(env.Block, "height"), + "block_by_hash": server.NewRPCFunc(env.BlockByHash, "hash"), + "block_results": server.NewRPCFunc(env.BlockResults, "height"), + "commit": server.NewRPCFunc(env.Commit, "height"), + "header": server.NewRPCFunc(env.Header, "height"), + "header_by_hash": server.NewRPCFunc(env.HeaderByHash, "hash"), + "validators": server.NewRPCFunc(env.Validators, "height,page,per_page"), + "tx": server.NewRPCFunc(env.Tx, "hash,prove"), + "tx_search": server.NewRPCFunc(env.TxSearch, "query,prove,page,per_page,order_by"), + "block_search": server.NewRPCFunc(env.BlockSearch, "query,page,per_page,order_by"), + } +} + +// Handler returns the http.Handler configured for use with an Inspector server. Handler +// registers the routes on the http.Handler and also registers the websocket handler +// and the CORS handler if specified by the configuration options. +func Handler(rpcConfig *config.RPCConfig, routes core.RoutesMap, logger log.Logger) http.Handler { + mux := http.NewServeMux() + wmLogger := logger.With("protocol", "websocket") + wm := server.NewWebsocketManager(routes, + server.ReadLimit(rpcConfig.MaxBodyBytes)) + wm.SetLogger(wmLogger) + mux.HandleFunc("/websocket", wm.WebsocketHandler) + + server.RegisterRPCFuncs(mux, routes, logger) + var rootHandler http.Handler = mux + if rpcConfig.IsCorsEnabled() { + rootHandler = addCORSHandler(rpcConfig, mux) + } + return rootHandler +} + +func addCORSHandler(rpcConfig *config.RPCConfig, h http.Handler) http.Handler { + corsMiddleware := cors.New(cors.Options{ + AllowedOrigins: rpcConfig.CORSAllowedOrigins, + AllowedMethods: rpcConfig.CORSAllowedMethods, + AllowedHeaders: rpcConfig.CORSAllowedHeaders, + }) + h = corsMiddleware.Handler(h) + return h +} + +type waitSyncCheckerImpl struct{} + +func (waitSyncCheckerImpl) WaitSync() bool { + return false +} + +// ListenAndServe listens on the address specified in srv.Addr and handles any +// incoming requests over HTTP using the Inspector rpc handler specified on the server. +func (srv *Server) ListenAndServe(ctx context.Context) error { + listener, err := server.Listen(srv.Addr, srv.Config.MaxOpenConnections) + if err != nil { + return err + } + go func() { + <-ctx.Done() + listener.Close() + }() + return server.Serve(listener, srv.Handler, srv.Logger, serverRPCConfig(srv.Config)) +} + +// ListenAndServeTLS listens on the address specified in srv.Addr. ListenAndServeTLS handles +// incoming requests over HTTPS using the Inspector rpc handler specified on the server. +func (srv *Server) ListenAndServeTLS(ctx context.Context, certFile, keyFile string) error { + listener, err := server.Listen(srv.Addr, srv.Config.MaxOpenConnections) + if err != nil { + return err + } + go func() { + <-ctx.Done() + listener.Close() + }() + return server.ServeTLS(listener, srv.Handler, certFile, keyFile, srv.Logger, serverRPCConfig(srv.Config)) +} + +func serverRPCConfig(r *config.RPCConfig) *server.Config { + cfg := server.DefaultConfig() + cfg.MaxBodyBytes = r.MaxBodyBytes + cfg.MaxHeaderBytes = r.MaxHeaderBytes + // If necessary adjust global WriteTimeout to ensure it's greater than + // TimeoutBroadcastTxCommit. + // See https://github.com/tendermint/tendermint/issues/3435 + if cfg.WriteTimeout <= r.TimeoutBroadcastTxCommit { + cfg.WriteTimeout = r.TimeoutBroadcastTxCommit + 1*time.Second + } + return cfg +} diff --git a/libs/pubsub/pubsub.go b/libs/pubsub/pubsub.go index 321e775c882..3a1d66b3749 100644 --- a/libs/pubsub/pubsub.go +++ b/libs/pubsub/pubsub.go @@ -27,7 +27,7 @@ // select { // case msg <- subscription.Out(): // // handle msg.Data() and msg.Events() -// case <-subscription.Cancelled(): +// case <-subscription.Canceled(): // return subscription.Err() // } // } diff --git a/libs/pubsub/pubsub_test.go b/libs/pubsub/pubsub_test.go index 8edf1250847..324928b2903 100644 --- a/libs/pubsub/pubsub_test.go +++ b/libs/pubsub/pubsub_test.go @@ -431,7 +431,7 @@ func benchmarkNClients(n int, b *testing.B) { select { case <-subscription.Out(): continue - case <-subscription.Cancelled(): + case <-subscription.Canceled(): return } } @@ -472,7 +472,7 @@ func benchmarkNClientsOneQuery(n int, b *testing.B) { select { case <-subscription.Out(): continue - case <-subscription.Cancelled(): + case <-subscription.Canceled(): return } } @@ -501,7 +501,7 @@ func assertReceive(t *testing.T, expected interface{}, ch <-chan pubsub.Message, } func assertCancelled(t *testing.T, subscription *pubsub.Subscription, err error) { - _, ok := <-subscription.Cancelled() + _, ok := <-subscription.Canceled() assert.False(t, ok) assert.Equal(t, err, subscription.Err()) } diff --git a/libs/pubsub/subscription.go b/libs/pubsub/subscription.go index 141569310a4..d07abd86da7 100644 --- a/libs/pubsub/subscription.go +++ b/libs/pubsub/subscription.go @@ -43,11 +43,9 @@ func (s *Subscription) Out() <-chan Message { return s.out } -// Cancelled returns a channel that's closed when the subscription is +// Canceled returns a channel that's closed when the subscription is // terminated and supposed to be used in a select statement. -// -//nolint:misspell -func (s *Subscription) Cancelled() <-chan struct{} { +func (s *Subscription) Canceled() <-chan struct{} { return s.canceled } diff --git a/libs/strings/string.go b/libs/strings/string.go index 46675923319..37026dcc208 100644 --- a/libs/strings/string.go +++ b/libs/strings/string.go @@ -32,6 +32,27 @@ func SplitAndTrim(s, sep, cutset string) []string { return spl } +// SplitAndTrimEmpty slices s into all subslices separated by sep and returns a +// slice of the string s with all leading and trailing Unicode code points +// contained in cutset removed. If sep is empty, SplitAndTrim splits after each +// UTF-8 sequence. First part is equivalent to strings.SplitN with a count of +// -1. also filter out empty strings, only return non-empty strings. +func SplitAndTrimEmpty(s, sep, cutset string) []string { + if s == "" { + return []string{} + } + + spl := strings.Split(s, sep) + nonEmptyStrings := make([]string, 0, len(spl)) + for i := 0; i < len(spl); i++ { + element := strings.Trim(spl[i], cutset) + if element != "" { + nonEmptyStrings = append(nonEmptyStrings, element) + } + } + return nonEmptyStrings +} + // Returns true if s is a non-empty printable non-tab ascii character. func IsASCIIText(s string) bool { if len(s) == 0 { diff --git a/light/proxy/proxy.go b/light/proxy/proxy.go index 359f6a8fe4a..70dca1c1077 100644 --- a/light/proxy/proxy.go +++ b/light/proxy/proxy.go @@ -113,7 +113,7 @@ func (p *Proxy) listen() (net.Listener, *http.ServeMux, error) { } // 4) Start listening for new connections. - listener, err := rpcserver.Listen(p.Addr, p.Config) + listener, err := rpcserver.Listen(p.Addr, p.Config.MaxOpenConnections) if err != nil { return nil, mux, err } diff --git a/node/node.go b/node/node.go index ddf86e0dcff..0829db7af89 100644 --- a/node/node.go +++ b/node/node.go @@ -141,7 +141,7 @@ func NewNode(config *cfg.Config, nodeKey *p2p.NodeKey, clientCreator proxy.ClientCreator, genesisDocProvider GenesisDocProvider, - dbProvider DBProvider, + dbProvider cfg.DBProvider, metricsProvider MetricsProvider, logger log.Logger, options ...Option, @@ -503,12 +503,12 @@ func (n *Node) OnStop() { } // ConfigureRPC makes sure RPC has all the objects it needs to operate. -func (n *Node) ConfigureRPC() error { +func (n *Node) ConfigureRPC() (*rpccore.Environment, error) { pubKey, err := n.privValidator.GetPubKey() - if err != nil { - return fmt.Errorf("can't get pubkey: %w", err) + if pubKey == nil || err != nil { + return nil, fmt.Errorf("can't get pubkey: %w", err) } - rpccore.SetEnvironment(&rpccore.Environment{ + rpcCoreEnv := rpccore.Environment{ ProxyAppQuery: n.proxyApp.Query(), ProxyAppMempool: n.proxyApp.Mempool(), @@ -518,8 +518,8 @@ func (n *Node) ConfigureRPC() error { ConsensusState: n.consensusState, P2PPeers: n.sw, P2PTransport: n, + PubKey: pubKey, - PubKey: pubKey, GenDoc: n.genesisDoc, TxIndexer: n.txIndexer, BlockIndexer: n.blockIndexer, @@ -530,24 +530,24 @@ func (n *Node) ConfigureRPC() error { Logger: n.Logger.With("module", "rpc"), Config: *n.config.RPC, - }) - if err := rpccore.InitGenesisChunks(); err != nil { - return err } - - return nil + if err := rpcCoreEnv.InitGenesisChunks(); err != nil { + return nil, err + } + return &rpcCoreEnv, nil } func (n *Node) startRPC() ([]net.Listener, error) { - err := n.ConfigureRPC() + env, err := n.ConfigureRPC() if err != nil { return nil, err } listenAddrs := splitAndTrimEmpty(n.config.RPC.ListenAddress, ",", " ") + routes := env.GetRoutes() if n.config.RPC.Unsafe { - rpccore.AddUnsafeRoutes() + env.AddUnsafeRoutes(routes) } config := rpcserver.DefaultConfig() @@ -567,7 +567,7 @@ func (n *Node) startRPC() ([]net.Listener, error) { mux := http.NewServeMux() rpcLogger := n.Logger.With("module", "rpc-server") wmLogger := rpcLogger.With("protocol", "websocket") - wm := rpcserver.NewWebsocketManager(rpccore.Routes, + wm := rpcserver.NewWebsocketManager(routes, rpcserver.OnDisconnect(func(remoteAddr string) { err := n.eventBus.UnsubscribeAll(context.Background(), remoteAddr) if err != nil && err != tmpubsub.ErrSubscriptionNotFound { @@ -579,10 +579,10 @@ func (n *Node) startRPC() ([]net.Listener, error) { ) wm.SetLogger(wmLogger) mux.HandleFunc("/websocket", wm.WebsocketHandler) - rpcserver.RegisterRPCFuncs(mux, rpccore.Routes, rpcLogger) + rpcserver.RegisterRPCFuncs(mux, routes, rpcLogger) listener, err := rpcserver.Listen( listenAddr, - config, + config.MaxOpenConnections, ) if err != nil { return nil, err @@ -640,12 +640,12 @@ func (n *Node) startRPC() ([]net.Listener, error) { if config.WriteTimeout <= n.config.RPC.TimeoutBroadcastTxCommit { config.WriteTimeout = n.config.RPC.TimeoutBroadcastTxCommit + 1*time.Second } - listener, err := rpcserver.Listen(grpcListenAddr, config) + listener, err := rpcserver.Listen(grpcListenAddr, config.MaxOpenConnections) if err != nil { return nil, err } go func() { - if err := grpccore.StartGRPCServer(listener); err != nil { + if err := grpccore.StartGRPCServer(env, listener); err != nil { n.Logger.Error("Error starting gRPC server", "err", err) } }() diff --git a/node/node_test.go b/node/node_test.go index 4fdf63f3f4e..01a8e01ab95 100644 --- a/node/node_test.go +++ b/node/node_test.go @@ -53,7 +53,7 @@ func TestNodeStartStop(t *testing.T) { require.NoError(t, err) select { case <-blocksSub.Out(): - case <-blocksSub.Cancelled(): + case <-blocksSub.Canceled(): t.Fatal("blocksSub was canceled") case <-time.After(10 * time.Second): t.Fatal("timed out waiting for the node to produce a block") @@ -461,7 +461,7 @@ func TestNodeNewNodeCustomReactors(t *testing.T) { nodeKey, proxy.DefaultClientCreator(config.ProxyApp, config.ABCI, config.DBDir()), DefaultGenesisDocProviderFunc(config), - DefaultDBProvider, + cfg.DefaultDBProvider, DefaultMetricsProvider(config.Instrumentation), log.TestingLogger(), CustomReactors(map[string]p2p.Reactor{"FOO": cr, "BLOCKSYNC": customBlocksyncReactor}), diff --git a/node/setup.go b/node/setup.go index 5bada8b6f2c..d710869c7ff 100644 --- a/node/setup.go +++ b/node/setup.go @@ -7,6 +7,8 @@ import ( "fmt" "net" "strings" + + _ "net/http/pprof" //nolint: gosec // securely exposed on separate, optional port "time" dbm "github.com/tendermint/tm-db" @@ -17,6 +19,7 @@ import ( cs "github.com/tendermint/tendermint/consensus" "github.com/tendermint/tendermint/crypto" "github.com/tendermint/tendermint/evidence" + "github.com/tendermint/tendermint/statesync" tmjson "github.com/tendermint/tendermint/libs/json" "github.com/tendermint/tendermint/libs/log" @@ -30,13 +33,8 @@ import ( "github.com/tendermint/tendermint/proxy" sm "github.com/tendermint/tendermint/state" "github.com/tendermint/tendermint/state/indexer" - blockidxkv "github.com/tendermint/tendermint/state/indexer/block/kv" - blockidxnull "github.com/tendermint/tendermint/state/indexer/block/null" - "github.com/tendermint/tendermint/state/indexer/sink/psql" + "github.com/tendermint/tendermint/state/indexer/block" "github.com/tendermint/tendermint/state/txindex" - "github.com/tendermint/tendermint/state/txindex/kv" - "github.com/tendermint/tendermint/state/txindex/null" - "github.com/tendermint/tendermint/statesync" "github.com/tendermint/tendermint/store" "github.com/tendermint/tendermint/types" "github.com/tendermint/tendermint/version" @@ -44,24 +42,8 @@ import ( _ "github.com/lib/pq" // provide the psql db driver ) -// DBContext specifies config information for loading a new DB. -type DBContext struct { - ID string - Config *cfg.Config -} - -// DBProvider takes a DBContext and returns an instantiated DB. -type DBProvider func(*DBContext) (dbm.DB, error) - const readHeaderTimeout = 10 * time.Second -// DefaultDBProvider returns a database using the DBBackend and DBDir -// specified in the ctx.Config. -func DefaultDBProvider(ctx *DBContext) (dbm.DB, error) { - dbType := dbm.BackendType(ctx.Config.DBBackend) - return dbm.NewDB(ctx.ID, dbType, ctx.Config.DBDir()) -} - // GenesisDocProvider returns a GenesisDoc. // It allows the GenesisDoc to be pulled from sources other than the // filesystem, for instance from a distributed key-value store cluster. @@ -92,7 +74,7 @@ func DefaultNewNode(config *cfg.Config, logger log.Logger) (*Node, error) { nodeKey, proxy.DefaultClientCreator(config.ProxyApp, config.ABCI, config.DBDir()), DefaultGenesisDocProviderFunc(config), - DefaultDBProvider, + cfg.DefaultDBProvider, DefaultMetricsProvider(config.Instrumentation), logger, ) @@ -124,15 +106,15 @@ type blockSyncReactor interface { //------------------------------------------------------------------------------ -func initDBs(config *cfg.Config, dbProvider DBProvider) (blockStore *store.BlockStore, stateDB dbm.DB, err error) { +func initDBs(config *cfg.Config, dbProvider cfg.DBProvider) (blockStore *store.BlockStore, stateDB dbm.DB, err error) { var blockStoreDB dbm.DB - blockStoreDB, err = dbProvider(&DBContext{"blockstore", config}) + blockStoreDB, err = dbProvider(&cfg.DBContext{ID: "blockstore", Config: config}) if err != nil { return } blockStore = store.NewBlockStore(blockStoreDB) - stateDB, err = dbProvider(&DBContext{"state", config}) + stateDB, err = dbProvider(&cfg.DBContext{ID: "state", Config: config}) if err != nil { return } @@ -161,7 +143,7 @@ func createAndStartEventBus(logger log.Logger) (*types.EventBus, error) { func createAndStartIndexerService( config *cfg.Config, chainID string, - dbProvider DBProvider, + dbProvider cfg.DBProvider, eventBus *types.EventBus, logger log.Logger, ) (*txindex.IndexerService, txindex.TxIndexer, indexer.BlockIndexer, error) { @@ -169,31 +151,9 @@ func createAndStartIndexerService( txIndexer txindex.TxIndexer blockIndexer indexer.BlockIndexer ) - - switch config.TxIndex.Indexer { - case "kv": - store, err := dbProvider(&DBContext{"tx_index", config}) - if err != nil { - return nil, nil, nil, err - } - - txIndexer = kv.NewTxIndex(store) - blockIndexer = blockidxkv.New(dbm.NewPrefixDB(store, []byte("block_events"))) - - case "psql": - if config.TxIndex.PsqlConn == "" { - return nil, nil, nil, errors.New(`no psql-conn is set for the "psql" indexer`) - } - es, err := psql.NewEventSink(config.TxIndex.PsqlConn, chainID) - if err != nil { - return nil, nil, nil, fmt.Errorf("creating psql indexer: %w", err) - } - txIndexer = es.TxIndexer() - blockIndexer = es.BlockIndexer() - - default: - txIndexer = &null.TxIndex{} - blockIndexer = &blockidxnull.BlockerIndexer{} + txIndexer, blockIndexer, err := block.IndexerFromConfig(config, dbProvider, chainID) + if err != nil { + return nil, nil, nil, err } indexerService := txindex.NewIndexerService(txIndexer, blockIndexer, eventBus, false) @@ -315,10 +275,10 @@ func createMempoolAndMempoolReactor( } } -func createEvidenceReactor(config *cfg.Config, dbProvider DBProvider, +func createEvidenceReactor(config *cfg.Config, dbProvider cfg.DBProvider, stateStore sm.Store, blockStore *store.BlockStore, logger log.Logger, ) (*evidence.Reactor, *evidence.Pool, error) { - evidenceDB, err := dbProvider(&DBContext{"evidence", config}) + evidenceDB, err := dbProvider(&cfg.DBContext{ID: "evidence", Config: config}) if err != nil { return nil, nil, err } diff --git a/rpc/client/local/local.go b/rpc/client/local/local.go index ca324dee02b..31f78f6b5a7 100644 --- a/rpc/client/local/local.go +++ b/rpc/client/local/local.go @@ -41,22 +41,20 @@ type Local struct { *types.EventBus Logger log.Logger ctx *rpctypes.Context + env *core.Environment } // NewLocal configures a client that calls the Node directly. -// -// Note that given how rpc/core works with package singletons, that -// you can only have one node per process. So make sure test cases -// don't run in parallel, or try to simulate an entire network in -// one process... func New(node *nm.Node) *Local { - if err := node.ConfigureRPC(); err != nil { + env, err := node.ConfigureRPC() + if err != nil { node.Logger.Error("Error configuring RPC", "err", err) } return &Local{ EventBus: node.EventBus(), Logger: log.NewNopLogger(), ctx: &rpctypes.Context{}, + env: env, } } @@ -68,11 +66,11 @@ func (c *Local) SetLogger(l log.Logger) { } func (c *Local) Status(ctx context.Context) (*ctypes.ResultStatus, error) { - return core.Status(c.ctx) + return c.env.Status(c.ctx) } func (c *Local) ABCIInfo(ctx context.Context) (*ctypes.ResultABCIInfo, error) { - return core.ABCIInfo(c.ctx) + return c.env.ABCIInfo(c.ctx) } func (c *Local) ABCIQuery(ctx context.Context, path string, data bytes.HexBytes) (*ctypes.ResultABCIQuery, error) { @@ -84,55 +82,55 @@ func (c *Local) ABCIQueryWithOptions( path string, data bytes.HexBytes, opts rpcclient.ABCIQueryOptions) (*ctypes.ResultABCIQuery, error) { - return core.ABCIQuery(c.ctx, path, data, opts.Height, opts.Prove) + return c.env.ABCIQuery(c.ctx, path, data, opts.Height, opts.Prove) } func (c *Local) BroadcastTxCommit(ctx context.Context, tx types.Tx) (*ctypes.ResultBroadcastTxCommit, error) { - return core.BroadcastTxCommit(c.ctx, tx) + return c.env.BroadcastTxCommit(c.ctx, tx) } func (c *Local) BroadcastTxAsync(ctx context.Context, tx types.Tx) (*ctypes.ResultBroadcastTx, error) { - return core.BroadcastTxAsync(c.ctx, tx) + return c.env.BroadcastTxAsync(c.ctx, tx) } func (c *Local) BroadcastTxSync(ctx context.Context, tx types.Tx) (*ctypes.ResultBroadcastTx, error) { - return core.BroadcastTxSync(c.ctx, tx) + return c.env.BroadcastTxSync(c.ctx, tx) } func (c *Local) UnconfirmedTxs(ctx context.Context, limit *int) (*ctypes.ResultUnconfirmedTxs, error) { - return core.UnconfirmedTxs(c.ctx, limit) + return c.env.UnconfirmedTxs(c.ctx, limit) } func (c *Local) NumUnconfirmedTxs(ctx context.Context) (*ctypes.ResultUnconfirmedTxs, error) { - return core.NumUnconfirmedTxs(c.ctx) + return c.env.NumUnconfirmedTxs(c.ctx) } func (c *Local) CheckTx(ctx context.Context, tx types.Tx) (*ctypes.ResultCheckTx, error) { - return core.CheckTx(c.ctx, tx) + return c.env.CheckTx(c.ctx, tx) } func (c *Local) NetInfo(ctx context.Context) (*ctypes.ResultNetInfo, error) { - return core.NetInfo(c.ctx) + return c.env.NetInfo(c.ctx) } func (c *Local) DumpConsensusState(ctx context.Context) (*ctypes.ResultDumpConsensusState, error) { - return core.DumpConsensusState(c.ctx) + return c.env.DumpConsensusState(c.ctx) } func (c *Local) ConsensusState(ctx context.Context) (*ctypes.ResultConsensusState, error) { - return core.ConsensusState(c.ctx) + return c.env.GetConsensusState(c.ctx) } func (c *Local) ConsensusParams(ctx context.Context, height *int64) (*ctypes.ResultConsensusParams, error) { - return core.ConsensusParams(c.ctx, height) + return c.env.ConsensusParams(c.ctx, height) } func (c *Local) Health(ctx context.Context) (*ctypes.ResultHealth, error) { - return core.Health(c.ctx) + return c.env.Health(c.ctx) } func (c *Local) DialSeeds(ctx context.Context, seeds []string) (*ctypes.ResultDialSeeds, error) { - return core.UnsafeDialSeeds(c.ctx, seeds) + return c.env.UnsafeDialSeeds(c.ctx, seeds) } func (c *Local) DialPeers( @@ -142,51 +140,51 @@ func (c *Local) DialPeers( unconditional, private bool, ) (*ctypes.ResultDialPeers, error) { - return core.UnsafeDialPeers(c.ctx, peers, persistent, unconditional, private) + return c.env.UnsafeDialPeers(c.ctx, peers, persistent, unconditional, private) } func (c *Local) BlockchainInfo(ctx context.Context, minHeight, maxHeight int64) (*ctypes.ResultBlockchainInfo, error) { - return core.BlockchainInfo(c.ctx, minHeight, maxHeight) + return c.env.BlockchainInfo(c.ctx, minHeight, maxHeight) } func (c *Local) Genesis(ctx context.Context) (*ctypes.ResultGenesis, error) { - return core.Genesis(c.ctx) + return c.env.Genesis(c.ctx) } func (c *Local) GenesisChunked(ctx context.Context, id uint) (*ctypes.ResultGenesisChunk, error) { - return core.GenesisChunked(c.ctx, id) + return c.env.GenesisChunked(c.ctx, id) } func (c *Local) Block(ctx context.Context, height *int64) (*ctypes.ResultBlock, error) { - return core.Block(c.ctx, height) + return c.env.Block(c.ctx, height) } func (c *Local) BlockByHash(ctx context.Context, hash []byte) (*ctypes.ResultBlock, error) { - return core.BlockByHash(c.ctx, hash) + return c.env.BlockByHash(c.ctx, hash) } func (c *Local) BlockResults(ctx context.Context, height *int64) (*ctypes.ResultBlockResults, error) { - return core.BlockResults(c.ctx, height) + return c.env.BlockResults(c.ctx, height) } func (c *Local) Header(ctx context.Context, height *int64) (*ctypes.ResultHeader, error) { - return core.Header(c.ctx, height) + return c.env.Header(c.ctx, height) } func (c *Local) HeaderByHash(ctx context.Context, hash bytes.HexBytes) (*ctypes.ResultHeader, error) { - return core.HeaderByHash(c.ctx, hash) + return c.env.HeaderByHash(c.ctx, hash) } func (c *Local) Commit(ctx context.Context, height *int64) (*ctypes.ResultCommit, error) { - return core.Commit(c.ctx, height) + return c.env.Commit(c.ctx, height) } func (c *Local) Validators(ctx context.Context, height *int64, page, perPage *int) (*ctypes.ResultValidators, error) { - return core.Validators(c.ctx, height, page, perPage) + return c.env.Validators(c.ctx, height, page, perPage) } func (c *Local) Tx(ctx context.Context, hash []byte, prove bool) (*ctypes.ResultTx, error) { - return core.Tx(c.ctx, hash, prove) + return c.env.Tx(c.ctx, hash, prove) } func (c *Local) TxSearch( @@ -197,7 +195,7 @@ func (c *Local) TxSearch( perPage *int, orderBy string, ) (*ctypes.ResultTxSearch, error) { - return core.TxSearch(c.ctx, query, prove, page, perPage, orderBy) + return c.env.TxSearch(c.ctx, query, prove, page, perPage, orderBy) } func (c *Local) BlockSearch( @@ -206,11 +204,11 @@ func (c *Local) BlockSearch( page, perPage *int, orderBy string, ) (*ctypes.ResultBlockSearch, error) { - return core.BlockSearch(c.ctx, query, page, perPage, orderBy) + return c.env.BlockSearch(c.ctx, query, page, perPage, orderBy) } func (c *Local) BroadcastEvidence(ctx context.Context, ev types.Evidence) (*ctypes.ResultBroadcastEvidence, error) { - return core.BroadcastEvidence(c.ctx, ev) + return c.env.BroadcastEvidence(c.ctx, ev) } func (c *Local) Subscribe( @@ -262,7 +260,7 @@ func (c *Local) eventsRoutine( c.Logger.Error("wanted to publish ResultEvent, but out channel is full", "result", result, "query", result.Query) } } - case <-sub.Cancelled(): + case <-sub.Canceled(): if sub.Err() == tmpubsub.ErrUnsubscribed { return } diff --git a/rpc/client/mock/client.go b/rpc/client/mock/client.go index ec3a358cdd6..4be251ce5b9 100644 --- a/rpc/client/mock/client.go +++ b/rpc/client/mock/client.go @@ -28,10 +28,6 @@ import ( ) // Client wraps arbitrary implementations of the various interfaces. -// -// We provide a few choices to mock out each one in this package. -// Nothing hidden here, so no New function, just construct it from -// some parts, and swap them out them during the tests. type Client struct { client.ABCIClient client.SignClient @@ -41,6 +37,14 @@ type Client struct { client.EvidenceClient client.MempoolClient service.Service + + env *core.Environment +} + +func New() Client { + return Client{ + env: &core.Environment{}, + } } var _ client.Client = Client{} @@ -80,11 +84,11 @@ func (c Call) GetResponse(args interface{}) (interface{}, error) { } func (c Client) Status(ctx context.Context) (*ctypes.ResultStatus, error) { - return core.Status(&rpctypes.Context{}) + return c.env.Status(&rpctypes.Context{}) } func (c Client) ABCIInfo(ctx context.Context) (*ctypes.ResultABCIInfo, error) { - return core.ABCIInfo(&rpctypes.Context{}) + return c.env.ABCIInfo(&rpctypes.Context{}) } func (c Client) ABCIQuery(ctx context.Context, path string, data bytes.HexBytes) (*ctypes.ResultABCIQuery, error) { @@ -96,47 +100,47 @@ func (c Client) ABCIQueryWithOptions( path string, data bytes.HexBytes, opts client.ABCIQueryOptions) (*ctypes.ResultABCIQuery, error) { - return core.ABCIQuery(&rpctypes.Context{}, path, data, opts.Height, opts.Prove) + return c.env.ABCIQuery(&rpctypes.Context{}, path, data, opts.Height, opts.Prove) } func (c Client) BroadcastTxCommit(ctx context.Context, tx types.Tx) (*ctypes.ResultBroadcastTxCommit, error) { - return core.BroadcastTxCommit(&rpctypes.Context{}, tx) + return c.env.BroadcastTxCommit(&rpctypes.Context{}, tx) } func (c Client) BroadcastTxAsync(ctx context.Context, tx types.Tx) (*ctypes.ResultBroadcastTx, error) { - return core.BroadcastTxAsync(&rpctypes.Context{}, tx) + return c.env.BroadcastTxAsync(&rpctypes.Context{}, tx) } func (c Client) BroadcastTxSync(ctx context.Context, tx types.Tx) (*ctypes.ResultBroadcastTx, error) { - return core.BroadcastTxSync(&rpctypes.Context{}, tx) + return c.env.BroadcastTxSync(&rpctypes.Context{}, tx) } func (c Client) CheckTx(ctx context.Context, tx types.Tx) (*ctypes.ResultCheckTx, error) { - return core.CheckTx(&rpctypes.Context{}, tx) + return c.env.CheckTx(&rpctypes.Context{}, tx) } func (c Client) NetInfo(ctx context.Context) (*ctypes.ResultNetInfo, error) { - return core.NetInfo(&rpctypes.Context{}) + return c.env.NetInfo(&rpctypes.Context{}) } func (c Client) ConsensusState(ctx context.Context) (*ctypes.ResultConsensusState, error) { - return core.ConsensusState(&rpctypes.Context{}) + return c.env.GetConsensusState(&rpctypes.Context{}) } func (c Client) DumpConsensusState(ctx context.Context) (*ctypes.ResultDumpConsensusState, error) { - return core.DumpConsensusState(&rpctypes.Context{}) + return c.env.DumpConsensusState(&rpctypes.Context{}) } func (c Client) ConsensusParams(ctx context.Context, height *int64) (*ctypes.ResultConsensusParams, error) { - return core.ConsensusParams(&rpctypes.Context{}, height) + return c.env.ConsensusParams(&rpctypes.Context{}, height) } func (c Client) Health(ctx context.Context) (*ctypes.ResultHealth, error) { - return core.Health(&rpctypes.Context{}) + return c.env.Health(&rpctypes.Context{}) } func (c Client) DialSeeds(ctx context.Context, seeds []string) (*ctypes.ResultDialSeeds, error) { - return core.UnsafeDialSeeds(&rpctypes.Context{}, seeds) + return c.env.UnsafeDialSeeds(&rpctypes.Context{}, seeds) } func (c Client) DialPeers( @@ -146,33 +150,33 @@ func (c Client) DialPeers( unconditional, private bool, ) (*ctypes.ResultDialPeers, error) { - return core.UnsafeDialPeers(&rpctypes.Context{}, peers, persistent, unconditional, private) + return c.env.UnsafeDialPeers(&rpctypes.Context{}, peers, persistent, unconditional, private) } func (c Client) BlockchainInfo(ctx context.Context, minHeight, maxHeight int64) (*ctypes.ResultBlockchainInfo, error) { - return core.BlockchainInfo(&rpctypes.Context{}, minHeight, maxHeight) + return c.env.BlockchainInfo(&rpctypes.Context{}, minHeight, maxHeight) } func (c Client) Genesis(ctx context.Context) (*ctypes.ResultGenesis, error) { - return core.Genesis(&rpctypes.Context{}) + return c.env.Genesis(&rpctypes.Context{}) } func (c Client) Block(ctx context.Context, height *int64) (*ctypes.ResultBlock, error) { - return core.Block(&rpctypes.Context{}, height) + return c.env.Block(&rpctypes.Context{}, height) } func (c Client) BlockByHash(ctx context.Context, hash []byte) (*ctypes.ResultBlock, error) { - return core.BlockByHash(&rpctypes.Context{}, hash) + return c.env.BlockByHash(&rpctypes.Context{}, hash) } func (c Client) Commit(ctx context.Context, height *int64) (*ctypes.ResultCommit, error) { - return core.Commit(&rpctypes.Context{}, height) + return c.env.Commit(&rpctypes.Context{}, height) } func (c Client) Validators(ctx context.Context, height *int64, page, perPage *int) (*ctypes.ResultValidators, error) { - return core.Validators(&rpctypes.Context{}, height, page, perPage) + return c.env.Validators(&rpctypes.Context{}, height, page, perPage) } func (c Client) BroadcastEvidence(ctx context.Context, ev types.Evidence) (*ctypes.ResultBroadcastEvidence, error) { - return core.BroadcastEvidence(&rpctypes.Context{}, ev) + return c.env.BroadcastEvidence(&rpctypes.Context{}, ev) } diff --git a/rpc/core/abci.go b/rpc/core/abci.go index d1f7193be96..4df8cba3f96 100644 --- a/rpc/core/abci.go +++ b/rpc/core/abci.go @@ -9,8 +9,8 @@ import ( ) // ABCIQuery queries the application for some information. -// More: https://docs.tendermint.com/main/rpc/#/ABCI/abci_query -func ABCIQuery( +// More: https://docs.tendermint.com/master/rpc/#/ABCI/abci_query +func (env *Environment) ABCIQuery( ctx *rpctypes.Context, path string, data bytes.HexBytes, @@ -31,8 +31,8 @@ func ABCIQuery( } // ABCIInfo gets some info about the application. -// More: https://docs.tendermint.com/main/rpc/#/ABCI/abci_info -func ABCIInfo(ctx *rpctypes.Context) (*ctypes.ResultABCIInfo, error) { +// More: https://docs.tendermint.com/master/rpc/#/ABCI/abci_info +func (env *Environment) ABCIInfo(ctx *rpctypes.Context) (*ctypes.ResultABCIInfo, error) { resInfo, err := env.ProxyAppQuery.InfoSync(proxy.RequestInfo) if err != nil { return nil, err diff --git a/rpc/core/blocks.go b/rpc/core/blocks.go index 5d277218f6d..360e59e1222 100644 --- a/rpc/core/blocks.go +++ b/rpc/core/blocks.go @@ -15,10 +15,19 @@ import ( ) // BlockchainInfo gets block headers for minHeight <= height <= maxHeight. -// Block headers are returned in descending order (highest first). -// More: https://docs.tendermint.com/main/rpc/#/Info/blockchain -func BlockchainInfo(ctx *rpctypes.Context, minHeight, maxHeight int64) (*ctypes.ResultBlockchainInfo, error) { - // maximum 20 block metas +// +// If maxHeight does not yet exist, blocks up to the current height will be +// returned. If minHeight does not exist (due to pruning), earliest existing +// height will be used. +// +// At most 20 items will be returned. Block headers are returned in descending +// order (highest first). +// +// More: https://docs.tendermint.com/master/rpc/#/Info/blockchain +func (env *Environment) BlockchainInfo( + ctx *rpctypes.Context, + minHeight, maxHeight int64) (*ctypes.ResultBlockchainInfo, error) { + const limit int64 = 20 var err error minHeight, maxHeight, err = filterMinMax( @@ -79,8 +88,8 @@ func filterMinMax(base, height, min, max, limit int64) (int64, int64, error) { // Header gets block header at a given height. // If no height is provided, it will fetch the latest header. // More: https://docs.tendermint.com/master/rpc/#/Info/header -func Header(ctx *rpctypes.Context, heightPtr *int64) (*ctypes.ResultHeader, error) { - height, err := getHeight(env.BlockStore.Height(), heightPtr) +func (env *Environment) Header(ctx *rpctypes.Context, heightPtr *int64) (*ctypes.ResultHeader, error) { + height, err := env.getHeight(env.BlockStore.Height(), heightPtr) if err != nil { return nil, err } @@ -95,7 +104,7 @@ func Header(ctx *rpctypes.Context, heightPtr *int64) (*ctypes.ResultHeader, erro // HeaderByHash gets header by hash. // More: https://docs.tendermint.com/master/rpc/#/Info/header_by_hash -func HeaderByHash(ctx *rpctypes.Context, hash bytes.HexBytes) (*ctypes.ResultHeader, error) { +func (env *Environment) HeaderByHash(ctx *rpctypes.Context, hash bytes.HexBytes) (*ctypes.ResultHeader, error) { // N.B. The hash parameter is HexBytes so that the reflective parameter // decoding logic in the HTTP service will correctly translate from JSON. // See https://github.com/tendermint/tendermint/issues/6802 for context. @@ -110,9 +119,9 @@ func HeaderByHash(ctx *rpctypes.Context, hash bytes.HexBytes) (*ctypes.ResultHea // Block gets block at a given height. // If no height is provided, it will fetch the latest block. -// More: https://docs.tendermint.com/main/rpc/#/Info/block -func Block(ctx *rpctypes.Context, heightPtr *int64) (*ctypes.ResultBlock, error) { - height, err := getHeight(env.BlockStore.Height(), heightPtr) +// More: https://docs.tendermint.com/master/rpc/#/Info/block +func (env *Environment) Block(ctx *rpctypes.Context, heightPtr *int64) (*ctypes.ResultBlock, error) { + height, err := env.getHeight(env.BlockStore.Height(), heightPtr) if err != nil { return nil, err } @@ -126,8 +135,8 @@ func Block(ctx *rpctypes.Context, heightPtr *int64) (*ctypes.ResultBlock, error) } // BlockByHash gets block by hash. -// More: https://docs.tendermint.com/main/rpc/#/Info/block_by_hash -func BlockByHash(ctx *rpctypes.Context, hash []byte) (*ctypes.ResultBlock, error) { +// More: https://docs.tendermint.com/master/rpc/#/Info/block_by_hash +func (env *Environment) BlockByHash(ctx *rpctypes.Context, hash []byte) (*ctypes.ResultBlock, error) { block := env.BlockStore.LoadBlockByHash(hash) if block == nil { return &ctypes.ResultBlock{BlockID: types.BlockID{}, Block: nil}, nil @@ -139,9 +148,9 @@ func BlockByHash(ctx *rpctypes.Context, hash []byte) (*ctypes.ResultBlock, error // Commit gets block commit at a given height. // If no height is provided, it will fetch the commit for the latest block. -// More: https://docs.tendermint.com/main/rpc/#/Info/commit -func Commit(ctx *rpctypes.Context, heightPtr *int64) (*ctypes.ResultCommit, error) { - height, err := getHeight(env.BlockStore.Height(), heightPtr) +// More: https://docs.tendermint.com/master/rpc/#/Info/commit +func (env *Environment) Commit(ctx *rpctypes.Context, heightPtr *int64) (*ctypes.ResultCommit, error) { + height, err := env.getHeight(env.BlockStore.Height(), heightPtr) if err != nil { return nil, err } @@ -170,9 +179,9 @@ func Commit(ctx *rpctypes.Context, heightPtr *int64) (*ctypes.ResultCommit, erro // Results are for the height of the block containing the txs. // Thus response.results.deliver_tx[5] is the results of executing // getBlock(h).Txs[5] -// More: https://docs.tendermint.com/main/rpc/#/Info/block_results -func BlockResults(ctx *rpctypes.Context, heightPtr *int64) (*ctypes.ResultBlockResults, error) { - height, err := getHeight(env.BlockStore.Height(), heightPtr) +// More: https://docs.tendermint.com/master/rpc/#/Info/block_results +func (env *Environment) BlockResults(ctx *rpctypes.Context, heightPtr *int64) (*ctypes.ResultBlockResults, error) { + height, err := env.getHeight(env.BlockStore.Height(), heightPtr) if err != nil { return nil, err } @@ -194,7 +203,7 @@ func BlockResults(ctx *rpctypes.Context, heightPtr *int64) (*ctypes.ResultBlockR // BlockSearch searches for a paginated set of blocks matching BeginBlock and // EndBlock event search criteria. -func BlockSearch( +func (env *Environment) BlockSearch( ctx *rpctypes.Context, query string, pagePtr, perPagePtr *int, @@ -230,7 +239,7 @@ func BlockSearch( // paginate results totalCount := len(results) - perPage := validatePerPage(perPagePtr) + perPage := env.validatePerPage(perPagePtr) page, err := validatePage(pagePtr, perPage, totalCount) if err != nil { diff --git a/rpc/core/blocks_test.go b/rpc/core/blocks_test.go index 71311076ce0..5ea778fbda3 100644 --- a/rpc/core/blocks_test.go +++ b/rpc/core/blocks_test.go @@ -80,7 +80,7 @@ func TestBlockResults(t *testing.T) { BeginBlock: &abci.ResponseBeginBlock{}, } - env = &Environment{} + env := &Environment{} env.StateStore = sm.NewStore(dbm.NewMemDB(), sm.StoreOptions{ DiscardABCIResponses: false, }) @@ -110,7 +110,7 @@ func TestBlockResults(t *testing.T) { } for _, tc := range testCases { - res, err := BlockResults(&rpctypes.Context{}, &tc.height) + res, err := env.BlockResults(&rpctypes.Context{}, &tc.height) if tc.wantErr { assert.Error(t, err) } else { diff --git a/rpc/core/consensus.go b/rpc/core/consensus.go index 5237ddfc6e3..f51b44f22aa 100644 --- a/rpc/core/consensus.go +++ b/rpc/core/consensus.go @@ -14,10 +14,14 @@ import ( // validators are sorted by their voting power - this is the canonical order // for the validators in the set as used in computing their Merkle root. // -// More: https://docs.tendermint.com/main/rpc/#/Info/validators -func Validators(ctx *rpctypes.Context, heightPtr *int64, pagePtr, perPagePtr *int) (*ctypes.ResultValidators, error) { +// More: https://docs.tendermint.com/master/rpc/#/Info/validators +func (env *Environment) Validators( + ctx *rpctypes.Context, + heightPtr *int64, + pagePtr, perPagePtr *int) (*ctypes.ResultValidators, error) { + // The latest validator that we know is the NextValidator of the last block. - height, err := getHeight(latestUncommittedHeight(), heightPtr) + height, err := env.getHeight(env.latestUncommittedHeight(), heightPtr) if err != nil { return nil, err } @@ -28,7 +32,7 @@ func Validators(ctx *rpctypes.Context, heightPtr *int64, pagePtr, perPagePtr *in } totalCount := len(validators.Validators) - perPage := validatePerPage(perPagePtr) + perPage := env.validatePerPage(perPagePtr) page, err := validatePage(pagePtr, perPage, totalCount) if err != nil { return nil, err @@ -47,8 +51,8 @@ func Validators(ctx *rpctypes.Context, heightPtr *int64, pagePtr, perPagePtr *in // DumpConsensusState dumps consensus state. // UNSTABLE -// More: https://docs.tendermint.com/main/rpc/#/Info/dump_consensus_state -func DumpConsensusState(ctx *rpctypes.Context) (*ctypes.ResultDumpConsensusState, error) { +// More: https://docs.tendermint.com/master/rpc/#/Info/dump_consensus_state +func (env *Environment) DumpConsensusState(ctx *rpctypes.Context) (*ctypes.ResultDumpConsensusState, error) { // Get Peer consensus states. peers := env.P2PPeers.Peers().List() peerStates := make([]ctypes.PeerStateInfo, len(peers)) @@ -80,8 +84,8 @@ func DumpConsensusState(ctx *rpctypes.Context) (*ctypes.ResultDumpConsensusState // ConsensusState returns a concise summary of the consensus state. // UNSTABLE -// More: https://docs.tendermint.com/main/rpc/#/Info/consensus_state -func ConsensusState(ctx *rpctypes.Context) (*ctypes.ResultConsensusState, error) { +// More: https://docs.tendermint.com/master/rpc/#/Info/consensus_state +func (env *Environment) GetConsensusState(ctx *rpctypes.Context) (*ctypes.ResultConsensusState, error) { // Get self round state. bz, err := env.ConsensusState.GetRoundStateSimpleJSON() return &ctypes.ResultConsensusState{RoundState: bz}, err @@ -89,11 +93,14 @@ func ConsensusState(ctx *rpctypes.Context) (*ctypes.ResultConsensusState, error) // ConsensusParams gets the consensus parameters at the given block height. // If no height is provided, it will fetch the latest consensus params. -// More: https://docs.tendermint.com/main/rpc/#/Info/consensus_params -func ConsensusParams(ctx *rpctypes.Context, heightPtr *int64) (*ctypes.ResultConsensusParams, error) { +// More: https://docs.tendermint.com/master/rpc/#/Info/consensus_params +func (env *Environment) ConsensusParams( + ctx *rpctypes.Context, + heightPtr *int64) (*ctypes.ResultConsensusParams, error) { + // The latest consensus params that we know is the consensus params after the // last block. - height, err := getHeight(latestUncommittedHeight(), heightPtr) + height, err := env.getHeight(env.latestUncommittedHeight(), heightPtr) if err != nil { return nil, err } diff --git a/rpc/core/dev.go b/rpc/core/dev.go index b70f5f1e12d..0e365cdcc16 100644 --- a/rpc/core/dev.go +++ b/rpc/core/dev.go @@ -6,7 +6,7 @@ import ( ) // UnsafeFlushMempool removes all transactions from the mempool. -func UnsafeFlushMempool(ctx *rpctypes.Context) (*ctypes.ResultUnsafeFlushMempool, error) { +func (env *Environment) UnsafeFlushMempool(ctx *rpctypes.Context) (*ctypes.ResultUnsafeFlushMempool, error) { env.Mempool.Flush() return &ctypes.ResultUnsafeFlushMempool{}, nil } diff --git a/rpc/core/env.go b/rpc/core/env.go index e9231993769..17cf45ff285 100644 --- a/rpc/core/env.go +++ b/rpc/core/env.go @@ -6,7 +6,6 @@ import ( "time" cfg "github.com/tendermint/tendermint/config" - "github.com/tendermint/tendermint/consensus" "github.com/tendermint/tendermint/crypto" tmjson "github.com/tendermint/tendermint/libs/json" "github.com/tendermint/tendermint/libs/log" @@ -33,17 +32,6 @@ const ( genesisChunkSize = 16 * 1024 * 1024 // 16 ) -var ( - // set by Node - env *Environment -) - -// SetEnvironment sets up the given Environment. -// It will race if multiple Node call SetEnvironment. -func SetEnvironment(e *Environment) { - env = e -} - //---------------------------------------------- // These interfaces are used by RPC and must be thread safe @@ -69,6 +57,10 @@ type peers interface { Peers() p2p.IPeerSet } +type consensusReactor interface { + WaitSync() bool +} + // ---------------------------------------------- // Environment contains objects and interfaces used by the RPC. It is expected // to be setup once during startup. @@ -78,21 +70,21 @@ type Environment struct { ProxyAppMempool proxy.AppConnMempool // interfaces defined in types and above - StateStore sm.Store - BlockStore sm.BlockStore - EvidencePool sm.EvidencePool - ConsensusState Consensus - P2PPeers peers - P2PTransport transport + StateStore sm.Store + BlockStore sm.BlockStore + EvidencePool sm.EvidencePool + ConsensusState Consensus + ConsensusReactor consensusReactor + P2PPeers peers + P2PTransport transport // objects - PubKey crypto.PubKey - GenDoc *types.GenesisDoc // cache the genesis structure - TxIndexer txindex.TxIndexer - BlockIndexer indexer.BlockIndexer - ConsensusReactor *consensus.Reactor - EventBus *types.EventBus // thread safe - Mempool mempl.Mempool + PubKey crypto.PubKey + GenDoc *types.GenesisDoc // cache the genesis structure + TxIndexer txindex.TxIndexer + BlockIndexer indexer.BlockIndexer + EventBus *types.EventBus // thread safe + Mempool mempl.Mempool Logger log.Logger @@ -125,7 +117,7 @@ func validatePage(pagePtr *int, perPage, totalCount int) (int, error) { return page, nil } -func validatePerPage(perPagePtr *int) int { +func (env *Environment) validatePerPage(perPagePtr *int) int { if perPagePtr == nil { // no per_page parameter return defaultPerPage } @@ -141,7 +133,7 @@ func validatePerPage(perPagePtr *int) int { // InitGenesisChunks configures the environment and should be called on service // startup. -func InitGenesisChunks() error { +func (env *Environment) InitGenesisChunks() error { if env.genChunks != nil { return nil } @@ -178,7 +170,7 @@ func validateSkipCount(page, perPage int) int { } // latestHeight can be either latest committed or uncommitted (+1) height. -func getHeight(latestHeight int64, heightPtr *int64) (int64, error) { +func (env *Environment) getHeight(latestHeight int64, heightPtr *int64) (int64, error) { if heightPtr != nil { height := *heightPtr if height <= 0 { @@ -198,7 +190,7 @@ func getHeight(latestHeight int64, heightPtr *int64) (int64, error) { return latestHeight, nil } -func latestUncommittedHeight() int64 { +func (env *Environment) latestUncommittedHeight() int64 { nodeIsSyncing := env.ConsensusReactor.WaitSync() if nodeIsSyncing { return env.BlockStore.Height() diff --git a/rpc/core/env_test.go b/rpc/core/env_test.go index b44c21a4cb0..dc64db1d6ae 100644 --- a/rpc/core/env_test.go +++ b/rpc/core/env_test.go @@ -70,13 +70,13 @@ func TestPaginationPerPage(t *testing.T) { {5, maxPerPage, maxPerPage}, {5, maxPerPage + 1, maxPerPage}, } - + env := &Environment{} for _, c := range cases { - p := validatePerPage(&c.perPage) + p := env.validatePerPage(&c.perPage) assert.Equal(t, c.newPerPage, p, fmt.Sprintf("%v", c)) } // nil case - p := validatePerPage(nil) + p := env.validatePerPage(nil) assert.Equal(t, defaultPerPage, p) } diff --git a/rpc/core/events.go b/rpc/core/events.go index e8d9773631d..63ed90fa772 100644 --- a/rpc/core/events.go +++ b/rpc/core/events.go @@ -19,8 +19,8 @@ const ( ) // Subscribe for events via WebSocket. -// More: https://docs.tendermint.com/main/rpc/#/Websocket/subscribe -func Subscribe(ctx *rpctypes.Context, query string) (*ctypes.ResultSubscribe, error) { +// More: https://docs.tendermint.com/master/rpc/#/Websocket/subscribe +func (env *Environment) Subscribe(ctx *rpctypes.Context, query string) (*ctypes.ResultSubscribe, error) { addr := ctx.RemoteAddr() if env.EventBus.NumClients() >= env.Config.MaxSubscriptionClients { @@ -76,7 +76,7 @@ func Subscribe(ctx *rpctypes.Context, query string) (*ctypes.ResultSubscribe, er return } } - case <-sub.Cancelled(): + case <-sub.Canceled(): if sub.Err() != tmpubsub.ErrUnsubscribed { var reason string if sub.Err() == nil { @@ -102,8 +102,8 @@ func Subscribe(ctx *rpctypes.Context, query string) (*ctypes.ResultSubscribe, er } // Unsubscribe from events via WebSocket. -// More: https://docs.tendermint.com/main/rpc/#/Websocket/unsubscribe -func Unsubscribe(ctx *rpctypes.Context, query string) (*ctypes.ResultUnsubscribe, error) { +// More: https://docs.tendermint.com/master/rpc/#/Websocket/unsubscribe +func (env *Environment) Unsubscribe(ctx *rpctypes.Context, query string) (*ctypes.ResultUnsubscribe, error) { addr := ctx.RemoteAddr() env.Logger.Info("Unsubscribe from query", "remote", addr, "query", query) q, err := tmquery.New(query) @@ -118,8 +118,8 @@ func Unsubscribe(ctx *rpctypes.Context, query string) (*ctypes.ResultUnsubscribe } // UnsubscribeAll from all events via WebSocket. -// More: https://docs.tendermint.com/main/rpc/#/Websocket/unsubscribe_all -func UnsubscribeAll(ctx *rpctypes.Context) (*ctypes.ResultUnsubscribe, error) { +// More: https://docs.tendermint.com/master/rpc/#/Websocket/unsubscribe_all +func (env *Environment) UnsubscribeAll(ctx *rpctypes.Context) (*ctypes.ResultUnsubscribe, error) { addr := ctx.RemoteAddr() env.Logger.Info("Unsubscribe from all", "remote", addr) err := env.EventBus.UnsubscribeAll(context.Background(), addr) diff --git a/rpc/core/evidence.go b/rpc/core/evidence.go index d1b3753e7ee..0c5b5b725de 100644 --- a/rpc/core/evidence.go +++ b/rpc/core/evidence.go @@ -10,8 +10,11 @@ import ( ) // BroadcastEvidence broadcasts evidence of the misbehavior. -// More: https://docs.tendermint.com/main/rpc/#/Info/broadcast_evidence -func BroadcastEvidence(ctx *rpctypes.Context, ev types.Evidence) (*ctypes.ResultBroadcastEvidence, error) { +// More: https://docs.tendermint.com/master/rpc/#/Evidence/broadcast_evidence +func (env *Environment) BroadcastEvidence( + ctx *rpctypes.Context, + ev types.Evidence) (*ctypes.ResultBroadcastEvidence, error) { + if ev == nil { return nil, errors.New("no evidence was provided") } diff --git a/rpc/core/health.go b/rpc/core/health.go index aaf1ceecf8f..855911d83de 100644 --- a/rpc/core/health.go +++ b/rpc/core/health.go @@ -7,7 +7,7 @@ import ( // Health gets node health. Returns empty result (200 OK) on success, no // response - in case of an error. -// More: https://docs.tendermint.com/main/rpc/#/Info/health -func Health(ctx *rpctypes.Context) (*ctypes.ResultHealth, error) { +// More: https://docs.tendermint.com/master/rpc/#/Info/health +func (env *Environment) Health(ctx *rpctypes.Context) (*ctypes.ResultHealth, error) { return &ctypes.ResultHealth{}, nil } diff --git a/rpc/core/mempool.go b/rpc/core/mempool.go index 69b87dd5cbe..c0f7cd8dc06 100644 --- a/rpc/core/mempool.go +++ b/rpc/core/mempool.go @@ -18,8 +18,8 @@ import ( // BroadcastTxAsync returns right away, with no response. Does not wait for // CheckTx nor DeliverTx results. -// More: https://docs.tendermint.com/main/rpc/#/Tx/broadcast_tx_async -func BroadcastTxAsync(ctx *rpctypes.Context, tx types.Tx) (*ctypes.ResultBroadcastTx, error) { +// More: https://docs.tendermint.com/master/rpc/#/Tx/broadcast_tx_async +func (env *Environment) BroadcastTxAsync(ctx *rpctypes.Context, tx types.Tx) (*ctypes.ResultBroadcastTx, error) { err := env.Mempool.CheckTx(tx, nil, mempl.TxInfo{}) if err != nil { @@ -30,8 +30,8 @@ func BroadcastTxAsync(ctx *rpctypes.Context, tx types.Tx) (*ctypes.ResultBroadca // BroadcastTxSync returns with the response from CheckTx. Does not wait for // DeliverTx result. -// More: https://docs.tendermint.com/main/rpc/#/Tx/broadcast_tx_sync -func BroadcastTxSync(ctx *rpctypes.Context, tx types.Tx) (*ctypes.ResultBroadcastTx, error) { +// More: https://docs.tendermint.com/master/rpc/#/Tx/broadcast_tx_sync +func (env *Environment) BroadcastTxSync(ctx *rpctypes.Context, tx types.Tx) (*ctypes.ResultBroadcastTx, error) { resCh := make(chan *abci.Response, 1) err := env.Mempool.CheckTx(tx, func(res *abci.Response) { select { @@ -60,8 +60,8 @@ func BroadcastTxSync(ctx *rpctypes.Context, tx types.Tx) (*ctypes.ResultBroadcas } // BroadcastTxCommit returns with the responses from CheckTx and DeliverTx. -// More: https://docs.tendermint.com/main/rpc/#/Tx/broadcast_tx_commit -func BroadcastTxCommit(ctx *rpctypes.Context, tx types.Tx) (*ctypes.ResultBroadcastTxCommit, error) { +// More: https://docs.tendermint.com/master/rpc/#/Tx/broadcast_tx_commit +func (env *Environment) BroadcastTxCommit(ctx *rpctypes.Context, tx types.Tx) (*ctypes.ResultBroadcastTxCommit, error) { subscriber := ctx.RemoteAddr() if env.EventBus.NumClients() >= env.Config.MaxSubscriptionClients { @@ -121,7 +121,7 @@ func BroadcastTxCommit(ctx *rpctypes.Context, tx types.Tx) (*ctypes.ResultBroadc Hash: tx.Hash(), Height: deliverTxRes.Height, }, nil - case <-deliverTxSub.Cancelled(): + case <-deliverTxSub.Canceled(): var reason string if deliverTxSub.Err() == nil { reason = "Tendermint exited" @@ -149,10 +149,10 @@ func BroadcastTxCommit(ctx *rpctypes.Context, tx types.Tx) (*ctypes.ResultBroadc // UnconfirmedTxs gets unconfirmed transactions (maximum ?limit entries) // including their number. -// More: https://docs.tendermint.com/main/rpc/#/Info/unconfirmed_txs -func UnconfirmedTxs(ctx *rpctypes.Context, limitPtr *int) (*ctypes.ResultUnconfirmedTxs, error) { +// More: https://docs.tendermint.com/master/rpc/#/Info/unconfirmed_txs +func (env *Environment) UnconfirmedTxs(ctx *rpctypes.Context, limitPtr *int) (*ctypes.ResultUnconfirmedTxs, error) { // reuse per_page validator - limit := validatePerPage(limitPtr) + limit := env.validatePerPage(limitPtr) txs := env.Mempool.ReapMaxTxs(limit) return &ctypes.ResultUnconfirmedTxs{ @@ -163,8 +163,8 @@ func UnconfirmedTxs(ctx *rpctypes.Context, limitPtr *int) (*ctypes.ResultUnconfi } // NumUnconfirmedTxs gets number of unconfirmed transactions. -// More: https://docs.tendermint.com/main/rpc/#/Info/num_unconfirmed_txs -func NumUnconfirmedTxs(ctx *rpctypes.Context) (*ctypes.ResultUnconfirmedTxs, error) { +// More: https://docs.tendermint.com/master/rpc/#/Info/num_unconfirmed_txs +func (env *Environment) NumUnconfirmedTxs(ctx *rpctypes.Context) (*ctypes.ResultUnconfirmedTxs, error) { return &ctypes.ResultUnconfirmedTxs{ Count: env.Mempool.Size(), Total: env.Mempool.Size(), @@ -173,8 +173,8 @@ func NumUnconfirmedTxs(ctx *rpctypes.Context) (*ctypes.ResultUnconfirmedTxs, err // CheckTx checks the transaction without executing it. The transaction won't // be added to the mempool either. -// More: https://docs.tendermint.com/main/rpc/#/Tx/check_tx -func CheckTx(ctx *rpctypes.Context, tx types.Tx) (*ctypes.ResultCheckTx, error) { +// More: https://docs.tendermint.com/master/rpc/#/Tx/check_tx +func (env *Environment) CheckTx(ctx *rpctypes.Context, tx types.Tx) (*ctypes.ResultCheckTx, error) { res, err := env.ProxyAppMempool.CheckTxSync(abci.RequestCheckTx{Tx: tx}) if err != nil { return nil, err diff --git a/rpc/core/net.go b/rpc/core/net.go index 27378bfed61..688667203f4 100644 --- a/rpc/core/net.go +++ b/rpc/core/net.go @@ -11,8 +11,8 @@ import ( ) // NetInfo returns network info. -// More: https://docs.tendermint.com/main/rpc/#/Info/net_info -func NetInfo(ctx *rpctypes.Context) (*ctypes.ResultNetInfo, error) { +// More: https://docs.tendermint.com/master/rpc/#/Info/net_info +func (env *Environment) NetInfo(ctx *rpctypes.Context) (*ctypes.ResultNetInfo, error) { peersList := env.P2PPeers.Peers().List() peers := make([]ctypes.Peer, 0, len(peersList)) for _, peer := range peersList { @@ -39,7 +39,7 @@ func NetInfo(ctx *rpctypes.Context) (*ctypes.ResultNetInfo, error) { } // UnsafeDialSeeds dials the given seeds (comma-separated id@IP:PORT). -func UnsafeDialSeeds(ctx *rpctypes.Context, seeds []string) (*ctypes.ResultDialSeeds, error) { +func (env *Environment) UnsafeDialSeeds(ctx *rpctypes.Context, seeds []string) (*ctypes.ResultDialSeeds, error) { if len(seeds) == 0 { return &ctypes.ResultDialSeeds{}, errors.New("no seeds provided") } @@ -52,8 +52,11 @@ func UnsafeDialSeeds(ctx *rpctypes.Context, seeds []string) (*ctypes.ResultDialS // UnsafeDialPeers dials the given peers (comma-separated id@IP:PORT), // optionally making them persistent. -func UnsafeDialPeers(ctx *rpctypes.Context, peers []string, persistent, unconditional, private bool) ( - *ctypes.ResultDialPeers, error) { +func (env *Environment) UnsafeDialPeers( + ctx *rpctypes.Context, + peers []string, + persistent, unconditional, private bool) (*ctypes.ResultDialPeers, error) { + if len(peers) == 0 { return &ctypes.ResultDialPeers{}, errors.New("no peers provided") } @@ -92,8 +95,8 @@ func UnsafeDialPeers(ctx *rpctypes.Context, peers []string, persistent, uncondit } // Genesis returns genesis file. -// More: https://docs.tendermint.com/main/rpc/#/Info/genesis -func Genesis(ctx *rpctypes.Context) (*ctypes.ResultGenesis, error) { +// More: https://docs.tendermint.com/master/rpc/#/Info/genesis +func (env *Environment) Genesis(ctx *rpctypes.Context) (*ctypes.ResultGenesis, error) { if len(env.genChunks) > 1 { return nil, errors.New("genesis response is large, please use the genesis_chunked API instead") } @@ -101,7 +104,7 @@ func Genesis(ctx *rpctypes.Context) (*ctypes.ResultGenesis, error) { return &ctypes.ResultGenesis{Genesis: env.GenDoc}, nil } -func GenesisChunked(ctx *rpctypes.Context, chunk uint) (*ctypes.ResultGenesisChunk, error) { +func (env *Environment) GenesisChunked(ctx *rpctypes.Context, chunk uint) (*ctypes.ResultGenesisChunk, error) { if env.genChunks == nil { return nil, fmt.Errorf("service configuration error, genesis chunks are not initialized") } diff --git a/rpc/core/net_test.go b/rpc/core/net_test.go index c971776f3e8..8612e0c500b 100644 --- a/rpc/core/net_test.go +++ b/rpc/core/net_test.go @@ -23,6 +23,7 @@ func TestUnsafeDialSeeds(t *testing.T) { } }) + env := &Environment{} env.Logger = log.TestingLogger() env.P2PPeers = sw @@ -36,7 +37,7 @@ func TestUnsafeDialSeeds(t *testing.T) { } for _, tc := range testCases { - res, err := UnsafeDialSeeds(&rpctypes.Context{}, tc.seeds) + res, err := env.UnsafeDialSeeds(&rpctypes.Context{}, tc.seeds) if tc.isErr { assert.Error(t, err) } else { @@ -62,6 +63,7 @@ func TestUnsafeDialPeers(t *testing.T) { } }) + env := &Environment{} env.Logger = log.TestingLogger() env.P2PPeers = sw @@ -76,7 +78,7 @@ func TestUnsafeDialPeers(t *testing.T) { } for _, tc := range testCases { - res, err := UnsafeDialPeers(&rpctypes.Context{}, tc.peers, tc.persistence, tc.unconditional, tc.private) + res, err := env.UnsafeDialPeers(&rpctypes.Context{}, tc.peers, tc.persistence, tc.unconditional, tc.private) if tc.isErr { assert.Error(t, err) } else { diff --git a/rpc/core/routes.go b/rpc/core/routes.go index cd658889f3b..508bfa016af 100644 --- a/rpc/core/routes.go +++ b/rpc/core/routes.go @@ -6,54 +6,58 @@ import ( // TODO: better system than "unsafe" prefix +type RoutesMap map[string]*rpc.RPCFunc + // Routes is a map of available routes. -var Routes = map[string]*rpc.RPCFunc{ - // subscribe/unsubscribe are reserved for websocket events. - "subscribe": rpc.NewWSRPCFunc(Subscribe, "query"), - "unsubscribe": rpc.NewWSRPCFunc(Unsubscribe, "query"), - "unsubscribe_all": rpc.NewWSRPCFunc(UnsubscribeAll, ""), - - // info API - "health": rpc.NewRPCFunc(Health, ""), - "status": rpc.NewRPCFunc(Status, ""), - "net_info": rpc.NewRPCFunc(NetInfo, ""), - "blockchain": rpc.NewRPCFunc(BlockchainInfo, "minHeight,maxHeight", rpc.Cacheable()), - "genesis": rpc.NewRPCFunc(Genesis, "", rpc.Cacheable()), - "genesis_chunked": rpc.NewRPCFunc(GenesisChunked, "chunk", rpc.Cacheable()), - "block": rpc.NewRPCFunc(Block, "height", rpc.Cacheable("height")), - "block_by_hash": rpc.NewRPCFunc(BlockByHash, "hash", rpc.Cacheable()), - "block_results": rpc.NewRPCFunc(BlockResults, "height", rpc.Cacheable("height")), - "commit": rpc.NewRPCFunc(Commit, "height", rpc.Cacheable("height")), - "header": rpc.NewRPCFunc(Header, "height", rpc.Cacheable("height")), - "header_by_hash": rpc.NewRPCFunc(HeaderByHash, "hash", rpc.Cacheable()), - "check_tx": rpc.NewRPCFunc(CheckTx, "tx"), - "tx": rpc.NewRPCFunc(Tx, "hash,prove", rpc.Cacheable()), - "tx_search": rpc.NewRPCFunc(TxSearch, "query,prove,page,per_page,order_by"), - "block_search": rpc.NewRPCFunc(BlockSearch, "query,page,per_page,order_by"), - "validators": rpc.NewRPCFunc(Validators, "height,page,per_page", rpc.Cacheable("height")), - "dump_consensus_state": rpc.NewRPCFunc(DumpConsensusState, ""), - "consensus_state": rpc.NewRPCFunc(ConsensusState, ""), - "consensus_params": rpc.NewRPCFunc(ConsensusParams, "height", rpc.Cacheable("height")), - "unconfirmed_txs": rpc.NewRPCFunc(UnconfirmedTxs, "limit"), - "num_unconfirmed_txs": rpc.NewRPCFunc(NumUnconfirmedTxs, ""), - - // tx broadcast API - "broadcast_tx_commit": rpc.NewRPCFunc(BroadcastTxCommit, "tx"), - "broadcast_tx_sync": rpc.NewRPCFunc(BroadcastTxSync, "tx"), - "broadcast_tx_async": rpc.NewRPCFunc(BroadcastTxAsync, "tx"), - - // abci API - "abci_query": rpc.NewRPCFunc(ABCIQuery, "path,data,height,prove"), - "abci_info": rpc.NewRPCFunc(ABCIInfo, "", rpc.Cacheable()), - - // evidence API - "broadcast_evidence": rpc.NewRPCFunc(BroadcastEvidence, "evidence"), +func (env *Environment) GetRoutes() RoutesMap { + return RoutesMap{ + // subscribe/unsubscribe are reserved for websocket events. + "subscribe": rpc.NewWSRPCFunc(env.Subscribe, "query"), + "unsubscribe": rpc.NewWSRPCFunc(env.Unsubscribe, "query"), + "unsubscribe_all": rpc.NewWSRPCFunc(env.UnsubscribeAll, ""), + + // info AP + "health": rpc.NewRPCFunc(env.Health, ""), + "status": rpc.NewRPCFunc(env.Status, ""), + "net_info": rpc.NewRPCFunc(env.NetInfo, ""), + "blockchain": rpc.NewRPCFunc(env.BlockchainInfo, "minHeight,maxHeight", rpc.Cacheable()), + "genesis": rpc.NewRPCFunc(env.Genesis, "", rpc.Cacheable()), + "genesis_chunked": rpc.NewRPCFunc(env.GenesisChunked, "chunk", rpc.Cacheable()), + "block": rpc.NewRPCFunc(env.Block, "height", rpc.Cacheable("height")), + "block_by_hash": rpc.NewRPCFunc(env.BlockByHash, "hash", rpc.Cacheable()), + "block_results": rpc.NewRPCFunc(env.BlockResults, "height", rpc.Cacheable("height")), + "commit": rpc.NewRPCFunc(env.Commit, "height", rpc.Cacheable("height")), + "header": rpc.NewRPCFunc(env.Header, "height", rpc.Cacheable("height")), + "header_by_hash": rpc.NewRPCFunc(env.HeaderByHash, "hash", rpc.Cacheable()), + "check_tx": rpc.NewRPCFunc(env.CheckTx, "tx"), + "tx": rpc.NewRPCFunc(env.Tx, "hash,prove", rpc.Cacheable()), + "tx_search": rpc.NewRPCFunc(env.TxSearch, "query,prove,page,per_page,order_by"), + "block_search": rpc.NewRPCFunc(env.BlockSearch, "query,page,per_page,order_by"), + "validators": rpc.NewRPCFunc(env.Validators, "height,page,per_page", rpc.Cacheable("height")), + "dump_consensus_state": rpc.NewRPCFunc(env.DumpConsensusState, ""), + "consensus_state": rpc.NewRPCFunc(env.GetConsensusState, ""), + "consensus_params": rpc.NewRPCFunc(env.ConsensusParams, "height", rpc.Cacheable("height")), + "unconfirmed_txs": rpc.NewRPCFunc(env.UnconfirmedTxs, "limit"), + "num_unconfirmed_txs": rpc.NewRPCFunc(env.NumUnconfirmedTxs, ""), + + // tx broadcast API + "broadcast_tx_commit": rpc.NewRPCFunc(env.BroadcastTxCommit, "tx"), + "broadcast_tx_sync": rpc.NewRPCFunc(env.BroadcastTxSync, "tx"), + "broadcast_tx_async": rpc.NewRPCFunc(env.BroadcastTxAsync, "tx"), + + // abci API + "abci_query": rpc.NewRPCFunc(env.ABCIQuery, "path,data,height,prove"), + "abci_info": rpc.NewRPCFunc(env.ABCIInfo, "", rpc.Cacheable()), + + // evidence API + "broadcast_evidence": rpc.NewRPCFunc(env.BroadcastEvidence, "evidence"), + } } // AddUnsafeRoutes adds unsafe routes. -func AddUnsafeRoutes() { +func (env *Environment) AddUnsafeRoutes(routes RoutesMap) { // control API - Routes["dial_seeds"] = rpc.NewRPCFunc(UnsafeDialSeeds, "seeds") - Routes["dial_peers"] = rpc.NewRPCFunc(UnsafeDialPeers, "peers,persistent,unconditional,private") - Routes["unsafe_flush_mempool"] = rpc.NewRPCFunc(UnsafeFlushMempool, "") + routes["dial_seeds"] = rpc.NewRPCFunc(env.UnsafeDialSeeds, "seeds") + routes["dial_peers"] = rpc.NewRPCFunc(env.UnsafeDialPeers, "peers,persistent,unconditional,private") + routes["unsafe_flush_mempool"] = rpc.NewRPCFunc(env.UnsafeFlushMempool, "") } diff --git a/rpc/core/status.go b/rpc/core/status.go index a2a70d95de4..2fd9bfe49bf 100644 --- a/rpc/core/status.go +++ b/rpc/core/status.go @@ -12,8 +12,8 @@ import ( // Status returns Tendermint status including node info, pubkey, latest block // hash, app hash, block height and time. -// More: https://docs.tendermint.com/main/rpc/#/Info/status -func Status(ctx *rpctypes.Context) (*ctypes.ResultStatus, error) { +// More: https://docs.tendermint.com/master/rpc/#/Info/status +func (env *Environment) Status(ctx *rpctypes.Context) (*ctypes.ResultStatus, error) { var ( earliestBlockHeight int64 earliestBlockHash tmbytes.HexBytes @@ -47,7 +47,7 @@ func Status(ctx *rpctypes.Context) (*ctypes.ResultStatus, error) { // Return the very last voting power, not the voting power of this validator // during the last block. var votingPower int64 - if val := validatorAtHeight(latestUncommittedHeight()); val != nil { + if val := env.validatorAtHeight(env.latestUncommittedHeight()); val != nil { votingPower = val.VotingPower } @@ -74,12 +74,12 @@ func Status(ctx *rpctypes.Context) (*ctypes.ResultStatus, error) { return result, nil } -func validatorAtHeight(h int64) *types.Validator { - vals, err := env.StateStore.LoadValidators(h) +func (env *Environment) validatorAtHeight(h int64) *types.Validator { + valsWithH, err := env.StateStore.LoadValidators(h) if err != nil { return nil } privValAddress := env.PubKey.Address() - _, val := vals.GetByAddress(privValAddress) + _, val := valsWithH.GetByAddress(privValAddress) return val } diff --git a/rpc/core/tx.go b/rpc/core/tx.go index aa14cab1325..d7518752fad 100644 --- a/rpc/core/tx.go +++ b/rpc/core/tx.go @@ -17,7 +17,7 @@ import ( // transaction is in the mempool, invalidated, or was not sent in the first // place. // More: https://docs.tendermint.com/main/rpc/#/Info/tx -func Tx(ctx *rpctypes.Context, hash []byte, prove bool) (*ctypes.ResultTx, error) { +func (env *Environment) Tx(ctx *rpctypes.Context, hash []byte, prove bool) (*ctypes.ResultTx, error) { // if index is disabled, return error if _, ok := env.TxIndexer.(*null.TxIndex); ok { return nil, fmt.Errorf("transaction indexing is disabled") @@ -51,7 +51,7 @@ func Tx(ctx *rpctypes.Context, hash []byte, prove bool) (*ctypes.ResultTx, error // TxSearch allows you to query for multiple transactions results. It returns a // list of transactions (maximum ?per_page entries) and the total count. // More: https://docs.tendermint.com/main/rpc/#/Info/tx_search -func TxSearch( +func (env *Environment) TxSearch( ctx *rpctypes.Context, query string, prove bool, @@ -98,7 +98,7 @@ func TxSearch( // paginate results totalCount := len(results) - perPage := validatePerPage(perPagePtr) + perPage := env.validatePerPage(perPagePtr) page, err := validatePage(pagePtr, perPage, totalCount) if err != nil { diff --git a/rpc/grpc/api.go b/rpc/grpc/api.go index 62c6b66c172..41597dfd97c 100644 --- a/rpc/grpc/api.go +++ b/rpc/grpc/api.go @@ -9,6 +9,7 @@ import ( ) type broadcastAPI struct { + env *core.Environment } func (bapi *broadcastAPI) Ping(ctx context.Context, req *RequestPing) (*ResponsePing, error) { @@ -19,7 +20,7 @@ func (bapi *broadcastAPI) Ping(ctx context.Context, req *RequestPing) (*Response func (bapi *broadcastAPI) BroadcastTx(ctx context.Context, req *RequestBroadcastTx) (*ResponseBroadcastTx, error) { // NOTE: there's no way to get client's remote address // see https://stackoverflow.com/questions/33684570/session-and-remote-ip-address-in-grpc-go - res, err := core.BroadcastTxCommit(&rpctypes.Context{}, req.Tx) + res, err := bapi.env.BroadcastTxCommit(&rpctypes.Context{}, req.Tx) if err != nil { return nil, err } diff --git a/rpc/grpc/client_server.go b/rpc/grpc/client_server.go index 387a662134f..57380c2c5b4 100644 --- a/rpc/grpc/client_server.go +++ b/rpc/grpc/client_server.go @@ -7,6 +7,7 @@ import ( "google.golang.org/grpc" tmnet "github.com/tendermint/tendermint/libs/net" + "github.com/tendermint/tendermint/rpc/core" ) // Config is an gRPC server configuration. @@ -17,16 +18,16 @@ type Config struct { // StartGRPCServer starts a new gRPC BroadcastAPIServer using the given // net.Listener. // NOTE: This function blocks - you may want to call it in a go-routine. -func StartGRPCServer(ln net.Listener) error { +func StartGRPCServer(env *core.Environment, ln net.Listener) error { grpcServer := grpc.NewServer() - RegisterBroadcastAPIServer(grpcServer, &broadcastAPI{}) + RegisterBroadcastAPIServer(grpcServer, &broadcastAPI{env: env}) return grpcServer.Serve(ln) } // StartGRPCClient dials the gRPC server using protoAddr and returns a new // BroadcastAPIClient. func StartGRPCClient(protoAddr string) BroadcastAPIClient { - //nolint:staticcheck // SA1019 Existing use of deprecated but supported dial option. + //nolint: staticcheck // SA1019 Existing use of deprecated but supported dial option. conn, err := grpc.Dial(protoAddr, grpc.WithInsecure(), grpc.WithContextDialer(dialerFunc)) if err != nil { panic(err) diff --git a/rpc/jsonrpc/jsonrpc_test.go b/rpc/jsonrpc/jsonrpc_test.go index c322dfcea93..d6d98e5fc0c 100644 --- a/rpc/jsonrpc/jsonrpc_test.go +++ b/rpc/jsonrpc/jsonrpc_test.go @@ -138,7 +138,7 @@ func setup() { wm.SetLogger(tcpLogger) mux.HandleFunc(websocketEndpoint, wm.WebsocketHandler) config := server.DefaultConfig() - listener1, err := server.Listen(tcpAddr, config) + listener1, err := server.Listen(tcpAddr, config.MaxOpenConnections) if err != nil { panic(err) } @@ -154,7 +154,7 @@ func setup() { wm = server.NewWebsocketManager(Routes) wm.SetLogger(unixLogger) mux2.HandleFunc(websocketEndpoint, wm.WebsocketHandler) - listener2, err := server.Listen(unixAddr, config) + listener2, err := server.Listen(unixAddr, config.MaxOpenConnections) if err != nil { panic(err) } diff --git a/rpc/jsonrpc/server/http_server.go b/rpc/jsonrpc/server/http_server.go index 6dd772e3d97..526692674a2 100644 --- a/rpc/jsonrpc/server/http_server.go +++ b/rpc/jsonrpc/server/http_server.go @@ -258,7 +258,7 @@ func (h maxBytesHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { // Listen starts a new net.Listener on the given address. // It returns an error if the address is invalid or the call to Listen() fails. -func Listen(addr string, config *Config) (listener net.Listener, err error) { +func Listen(addr string, maxOpenConnections int) (listener net.Listener, err error) { parts := strings.SplitN(addr, "://", 2) if len(parts) != 2 { return nil, fmt.Errorf( @@ -271,8 +271,8 @@ func Listen(addr string, config *Config) (listener net.Listener, err error) { if err != nil { return nil, fmt.Errorf("failed to listen on %v: %v", addr, err) } - if config.MaxOpenConnections > 0 { - listener = netutil.LimitListener(listener, config.MaxOpenConnections) + if maxOpenConnections > 0 { + listener = netutil.LimitListener(listener, maxOpenConnections) } return listener, nil diff --git a/rpc/jsonrpc/server/http_server_test.go b/rpc/jsonrpc/server/http_server_test.go index 72e87320737..6d4ab385ad4 100644 --- a/rpc/jsonrpc/server/http_server_test.go +++ b/rpc/jsonrpc/server/http_server_test.go @@ -39,8 +39,7 @@ func TestMaxOpenConnections(t *testing.T) { fmt.Fprint(w, "some body") }) config := DefaultConfig() - config.MaxOpenConnections = max - l, err := Listen("tcp://127.0.0.1:0", config) + l, err := Listen("tcp://127.0.0.1:0", max) require.NoError(t, err) defer l.Close() go Serve(l, mux, log.TestingLogger(), config) //nolint:errcheck // ignore for tests diff --git a/rpc/jsonrpc/test/main.go b/rpc/jsonrpc/test/main.go index fe3ffb76983..72583a43eb4 100644 --- a/rpc/jsonrpc/test/main.go +++ b/rpc/jsonrpc/test/main.go @@ -34,7 +34,7 @@ func main() { rpcserver.RegisterRPCFuncs(mux, routes, logger) config := rpcserver.DefaultConfig() - listener, err := rpcserver.Listen("tcp://127.0.0.1:8008", config) + listener, err := rpcserver.Listen("tcp://127.0.0.1:8008", config.MaxOpenConnections) if err != nil { tmos.Exit(err.Error()) } diff --git a/rpc/test/helpers.go b/rpc/test/helpers.go index 1b88dec5111..14be9e22ff1 100644 --- a/rpc/test/helpers.go +++ b/rpc/test/helpers.go @@ -170,7 +170,7 @@ func NewTendermint(app abci.Application, opts *Options) *nm.Node { } node, err := nm.NewNode(config, pv, nodeKey, papp, nm.DefaultGenesisDocProviderFunc(config), - nm.DefaultDBProvider, + cfg.DefaultDBProvider, nm.DefaultMetricsProvider(config.Instrumentation), logger) if err != nil { diff --git a/state/execution_test.go b/state/execution_test.go index 8e1c69d638a..53a7c3f3171 100644 --- a/state/execution_test.go +++ b/state/execution_test.go @@ -549,7 +549,7 @@ func TestEndBlockValidatorUpdates(t *testing.T) { assert.Equal(t, pubkey, event.ValidatorUpdates[0].PubKey) assert.EqualValues(t, 10, event.ValidatorUpdates[0].VotingPower) } - case <-updatesSub.Cancelled(): + case <-updatesSub.Canceled(): t.Fatalf("updatesSub was canceled (reason: %v)", updatesSub.Err()) case <-time.After(1 * time.Second): t.Fatal("Did not receive EventValidatorSetUpdates within 1 sec.") diff --git a/state/indexer/block/indexer.go b/state/indexer/block/indexer.go new file mode 100644 index 00000000000..1e3b7e923f7 --- /dev/null +++ b/state/indexer/block/indexer.go @@ -0,0 +1,47 @@ +package block + +import ( + "errors" + "fmt" + + dbm "github.com/tendermint/tm-db" + + "github.com/tendermint/tendermint/config" + "github.com/tendermint/tendermint/state/indexer" + blockidxkv "github.com/tendermint/tendermint/state/indexer/block/kv" + blockidxnull "github.com/tendermint/tendermint/state/indexer/block/null" + "github.com/tendermint/tendermint/state/indexer/sink/psql" + "github.com/tendermint/tendermint/state/txindex" + "github.com/tendermint/tendermint/state/txindex/kv" + "github.com/tendermint/tendermint/state/txindex/null" +) + +// EventSinksFromConfig constructs a slice of indexer.EventSink using the provided +// configuration. +// +//nolint:lll +func IndexerFromConfig(cfg *config.Config, dbProvider config.DBProvider, chainID string) (txindex.TxIndexer, indexer.BlockIndexer, error) { + switch cfg.TxIndex.Indexer { + case "kv": + store, err := dbProvider(&config.DBContext{ID: "tx_index", Config: cfg}) + if err != nil { + return nil, nil, err + } + + return kv.NewTxIndex(store), blockidxkv.New(dbm.NewPrefixDB(store, []byte("block_events"))), nil + + case "psql": + conn := cfg.TxIndex.PsqlConn + if conn == "" { + return nil, nil, errors.New("the psql connection settings cannot be empty") + } + es, err := psql.NewEventSink(cfg.TxIndex.PsqlConn, chainID) + if err != nil { + return nil, nil, fmt.Errorf("creating psql indexer: %w", err) + } + return es.TxIndexer(), es.BlockIndexer(), nil + + default: + return &null.TxIndex{}, &blockidxnull.BlockerIndexer{}, nil + } +} diff --git a/state/mocks/block_store.go b/state/mocks/block_store.go index d449f67115f..ed921f4dc27 100644 --- a/state/mocks/block_store.go +++ b/state/mocks/block_store.go @@ -28,6 +28,20 @@ func (_m *BlockStore) Base() int64 { return r0 } +// Close provides a mock function with given fields: +func (_m *BlockStore) Close() error { + ret := _m.Called() + + var r0 error + if rf, ok := ret.Get(0).(func() error); ok { + r0 = rf() + } else { + r0 = ret.Error(0) + } + + return r0 +} + // DeleteLatestBlock provides a mock function with given fields: func (_m *BlockStore) DeleteLatestBlock() error { ret := _m.Called() diff --git a/state/services.go b/state/services.go index 0473b43b2f7..569242cd412 100644 --- a/state/services.go +++ b/state/services.go @@ -36,6 +36,8 @@ type BlockStore interface { LoadSeenCommit(height int64) *types.Commit DeleteLatestBlock() error + + Close() error } //----------------------------------------------------------------------------- diff --git a/state/txindex/indexer_service.go b/state/txindex/indexer_service.go index 0e8fbb9c911..7c2738382c2 100644 --- a/state/txindex/indexer_service.go +++ b/state/txindex/indexer_service.go @@ -59,54 +59,59 @@ func (is *IndexerService) OnStart() error { go func() { for { - msg := <-blockHeadersSub.Out() - eventDataHeader := msg.Data().(types.EventDataNewBlockHeader) - height := eventDataHeader.Header.Height - batch := NewBatch(eventDataHeader.NumTxs) - - for i := int64(0); i < eventDataHeader.NumTxs; i++ { - msg2 := <-txsSub.Out() - txResult := msg2.Data().(types.EventDataTx).TxResult - - if err = batch.Add(&txResult); err != nil { - is.Logger.Error( - "failed to add tx to batch", - "height", height, - "index", txResult.Index, - "err", err, - ) + select { + case <-blockHeadersSub.Canceled(): + return + case msg := <-blockHeadersSub.Out(): + + eventDataHeader := msg.Data().(types.EventDataNewBlockHeader) + height := eventDataHeader.Header.Height + batch := NewBatch(eventDataHeader.NumTxs) + + for i := int64(0); i < eventDataHeader.NumTxs; i++ { + msg2 := <-txsSub.Out() + txResult := msg2.Data().(types.EventDataTx).TxResult + + if err = batch.Add(&txResult); err != nil { + is.Logger.Error( + "failed to add tx to batch", + "height", height, + "index", txResult.Index, + "err", err, + ) + + if is.terminateOnError { + if err := is.Stop(); err != nil { + is.Logger.Error("failed to stop", "err", err) + } + return + } + } + } + if err := is.blockIdxr.Index(eventDataHeader); err != nil { + is.Logger.Error("failed to index block", "height", height, "err", err) if is.terminateOnError { if err := is.Stop(); err != nil { is.Logger.Error("failed to stop", "err", err) } return } + } else { + is.Logger.Info("indexed block exents", "height", height) } - } - if err := is.blockIdxr.Index(eventDataHeader); err != nil { - is.Logger.Error("failed to index block", "height", height, "err", err) - if is.terminateOnError { - if err := is.Stop(); err != nil { - is.Logger.Error("failed to stop", "err", err) - } - return - } - } else { - is.Logger.Info("indexed block exents", "height", height) - } - - if err = is.txIdxr.AddBatch(batch); err != nil { - is.Logger.Error("failed to index block txs", "height", height, "err", err) - if is.terminateOnError { - if err := is.Stop(); err != nil { - is.Logger.Error("failed to stop", "err", err) + if err = is.txIdxr.AddBatch(batch); err != nil { + is.Logger.Error("failed to index block txs", "height", height, "err", err) + if is.terminateOnError { + if err := is.Stop(); err != nil { + is.Logger.Error("failed to stop", "err", err) + } + return } - return + } else { + is.Logger.Debug("indexed transactions", "height", height, "num_txs", eventDataHeader.NumTxs) } - } else { - is.Logger.Debug("indexed transactions", "height", height, "num_txs", eventDataHeader.NumTxs) } } }() diff --git a/test/e2e/node/main.go b/test/e2e/node/main.go index c18878aadd2..954b4543fa6 100644 --- a/test/e2e/node/main.go +++ b/test/e2e/node/main.go @@ -141,7 +141,7 @@ func startNode(cfg *Config) error { nodeKey, cc, node.DefaultGenesisDocProviderFunc(tmcfg), - node.DefaultDBProvider, + config.DefaultDBProvider, node.DefaultMetricsProvider(tmcfg.Instrumentation), nodeLogger, ) @@ -157,8 +157,8 @@ func startLightClient(cfg *Config) error { return err } - dbContext := &node.DBContext{ID: "light", Config: tmcfg} - lightDB, err := node.DefaultDBProvider(dbContext) + dbContext := &config.DBContext{ID: "light", Config: tmcfg} + lightDB, err := config.DefaultDBProvider(dbContext) if err != nil { return err } diff --git a/types/event_bus.go b/types/event_bus.go index d51ae8e6775..3efa008e2cd 100644 --- a/types/event_bus.go +++ b/types/event_bus.go @@ -23,7 +23,7 @@ type EventBusSubscriber interface { type Subscription interface { Out() <-chan tmpubsub.Message - Cancelled() <-chan struct{} //nolint: misspell + Canceled() <-chan struct{} Err() error } diff --git a/types/event_bus_test.go b/types/event_bus_test.go index 62f57fca6f3..09461d085ee 100644 --- a/types/event_bus_test.go +++ b/types/event_bus_test.go @@ -439,7 +439,7 @@ func benchmarkEventBus(numClients int, randQueries bool, randEvents bool, b *tes for { select { case <-sub.Out(): - case <-sub.Cancelled(): + case <-sub.Canceled(): return } }