diff --git a/consensus/common_test.go b/consensus/common_test.go index c7bd9ede3..81d71753f 100644 --- a/consensus/common_test.go +++ b/consensus/common_test.go @@ -404,6 +404,17 @@ func newStateWithConfigAndBlockStore( pv types.PrivValidator, app abci.Application, blockDB dbm.DB, +) *State { + return newStateWithConfigAndBlockStoreWithLoggers(thisConfig, state, pv, app, blockDB, DefaultTestLoggers()) +} + +func newStateWithConfigAndBlockStoreWithLoggers( + thisConfig *cfg.Config, + state sm.State, + pv types.PrivValidator, + app abci.Application, + blockDB dbm.DB, + loggers TestLoggers, ) *State { // Get BlockStore blockStore := store.NewBlockStore(blockDB) @@ -415,7 +426,7 @@ func newStateWithConfigAndBlockStore( // Make Mempool mempool := mempl.NewCListMempool(thisConfig.Mempool, proxyAppConnMem, 0) - mempool.SetLogger(log.TestingLogger().With("module", "mempool")) + mempool.SetLogger(loggers.memLogger.With("module", "mempool")) if thisConfig.Consensus.WaitForTxs() { mempool.EnableTxsAvailable() } @@ -429,13 +440,13 @@ func newStateWithConfigAndBlockStore( panic(err) } - blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyAppConnCon, mempool, evpool) + blockExec := sm.NewBlockExecutor(stateStore, loggers.execLogger, proxyAppConnCon, mempool, evpool) cs := NewState(thisConfig.Consensus, state, blockExec, blockStore, mempool, evpool) - cs.SetLogger(log.TestingLogger().With("module", "consensus")) + cs.SetLogger(loggers.csLogger.With("module", "consensus")) cs.SetPrivValidator(pv) eventBus := types.NewEventBus() - eventBus.SetLogger(log.TestingLogger().With("module", "events")) + eventBus.SetLogger(loggers.eventLogger.With("module", "events")) err := eventBus.Start() if err != nil { panic(err) @@ -761,6 +772,34 @@ func ensureNewEventOnChannel(ch <-chan tmpubsub.Message) { //------------------------------------------------------------------------------- // consensus nets +type TestLoggers struct { + memLogger log.Logger + evLogger log.Logger + execLogger log.Logger + csLogger log.Logger + eventLogger log.Logger +} + +func NewTestLoggers(memLogger, evLogger, execLogger, csLogger, eventLogger log.Logger) TestLoggers { + return TestLoggers{ + memLogger: memLogger, + evLogger: evLogger, + execLogger: execLogger, + csLogger: csLogger, + eventLogger: eventLogger, + } +} + +func DefaultTestLoggers() TestLoggers { + return NewTestLoggers( + log.TestingLogger(), log.TestingLogger(), log.TestingLogger(), log.TestingLogger(), log.TestingLogger()) +} + +func NopTestLoggers() TestLoggers { + return NewTestLoggers( + log.NewNopLogger(), log.NewNopLogger(), log.NewNopLogger(), log.NewNopLogger(), log.NewNopLogger()) +} + // consensusLogger is a TestingLogger which uses a different // color for each validator ("validator" key must exist). func consensusLogger() log.Logger { diff --git a/consensus/replay_test.go b/consensus/replay_test.go index 1e3c1872d..2c6b169bd 100644 --- a/consensus/replay_test.go +++ b/consensus/replay_test.go @@ -43,11 +43,14 @@ func TestMain(m *testing.M) { configMempoolTest := ResetConfig("consensus_mempool_test") configByzantineTest := ResetConfig("consensus_byzantine_test") code := m.Run() - os.RemoveAll(config.RootDir) - os.RemoveAll(consensusReplayConfig.RootDir) - os.RemoveAll(configStateTest.RootDir) - os.RemoveAll(configMempoolTest.RootDir) - os.RemoveAll(configByzantineTest.RootDir) + if code != 0 { + fmt.Println("err:", "code:", code) + } + fmt.Println("config:", "err:", os.RemoveAll(config.RootDir)) + fmt.Println("consensusReplayConfig:", "err:", os.RemoveAll(consensusReplayConfig.RootDir)) + fmt.Println("configStateTest:", "err:", os.RemoveAll(configStateTest.RootDir)) + fmt.Println("configMempoolTest:", "err:", os.RemoveAll(configMempoolTest.RootDir)) + fmt.Println("configByzantineTest:", "err:", os.RemoveAll(configByzantineTest.RootDir)) os.Exit(code) } @@ -66,22 +69,24 @@ func TestMain(m *testing.M) { // and which ones we need the wal for - then we'd also be able to only flush the // wal writer when we need to, instead of with every message. -func startNewStateAndWaitForBlock(t *testing.T, consensusReplayConfig *cfg.Config, - lastBlockHeight int64, blockDB dbm.DB, stateStore sm.Store) { - logger := log.TestingLogger() +func startNewStateAndWaitForBlock(t *testing.T, i int, consensusReplayConfig *cfg.Config, + blockDB dbm.DB, stateStore sm.Store) { + logger := log.TestingLogger().With("attr", "make block", "i", i) state, _ := stateStore.LoadFromDBOrGenesisFile(consensusReplayConfig.GenesisFile()) privValidator := loadPrivValidator(consensusReplayConfig) - cs := newStateWithConfigAndBlockStore( + cs := newStateWithConfigAndBlockStoreWithLoggers( consensusReplayConfig, state, privValidator, kvstore.NewApplication(), blockDB, + NewTestLoggers( + log.NewNopLogger().With("module", "mempool"), + log.NewNopLogger().With("module", "evidence"), + logger.With("module", "executor"), + logger.With("module", "consensus"), + log.NewNopLogger().With("module", "event")), ) - cs.SetLogger(logger) - - bytes, _ := ioutil.ReadFile(cs.config.WalFile()) - t.Logf("====== WAL: \n\r%X\n", bytes) err := cs.Start() require.NoError(t, err) @@ -98,7 +103,9 @@ func startNewStateAndWaitForBlock(t *testing.T, consensusReplayConfig *cfg.Confi newBlockSub, err := cs.eventBus.Subscribe(context.Background(), testSubscriber, types.EventQueryNewBlock) require.NoError(t, err) select { - case <-newBlockSub.Out(): + case msg := <-newBlockSub.Out(): + height := msg.Data().(types.EventDataNewBlock).Block.Height + t.Logf("Make Block.Height[%d]", height) case <-newBlockSub.Cancelled(): t.Fatal("newBlockSub was cancelled") case <-time.After(10 * time.Second): // XXX 120 second is too much time, so we changed to 10 second @@ -155,7 +162,7 @@ func TestWALCrash(t *testing.T) { func crashWALandCheckLiveness(t *testing.T, consensusReplayConfig *cfg.Config, initFn func(dbm.DB, *State, context.Context), heightToStop int64) { walPanicked := make(chan error) - crashingWal := &crashingWAL{panicCh: walPanicked, heightToStop: heightToStop} + crashingWal := &crashingWAL{t: t, panicCh: walPanicked, heightToStop: heightToStop} i := 1 LOOP: @@ -163,21 +170,26 @@ LOOP: t.Logf("====== LOOP %d\n", i) // create consensus state from a clean slate - logger := log.NewNopLogger() blockDB := memdb.NewDB() stateDB := blockDB stateStore := sm.NewStore(stateDB) state, err := sm.MakeGenesisStateFromFile(consensusReplayConfig.GenesisFile()) require.NoError(t, err) privValidator := loadPrivValidator(consensusReplayConfig) - cs := newStateWithConfigAndBlockStore( + logger := log.TestingLogger().With("attr", "crash wal", "i", i) + cs := newStateWithConfigAndBlockStoreWithLoggers( consensusReplayConfig, state, privValidator, kvstore.NewApplication(), blockDB, + NewTestLoggers( + log.NewNopLogger().With("module", "mempool"), + log.NewNopLogger().With("module", "evidence"), + logger.With("module", "executor"), + logger.With("module", "consensus"), + log.NewNopLogger().With("module", "event")), ) - cs.SetLogger(logger) // start sending transactions ctx, cancel := context.WithCancel(context.Background()) @@ -200,14 +212,12 @@ LOOP: err = cs.Start() require.NoError(t, err) - i++ - select { case err := <-walPanicked: t.Logf("WAL panicked: %v", err) // make sure we can make blocks after a crash - startNewStateAndWaitForBlock(t, consensusReplayConfig, cs.Height, blockDB, stateStore) + startNewStateAndWaitForBlock(t, i, consensusReplayConfig, blockDB, stateStore) // stop consensus state and transactions sender (initFn) cs.Stop() //nolint:errcheck // Logging this error causes failure @@ -220,6 +230,8 @@ LOOP: case <-time.After(10 * time.Second): t.Fatal("WAL did not panic for 10 seconds (check the log)") } + + i++ } } @@ -227,6 +239,7 @@ LOOP: // (before and after). It remembers a message for which we last panicked // (lastPanickedForMsgIndex), so we don't panic for it in subsequent iterations. type crashingWAL struct { + t *testing.T next WAL panicCh chan error heightToStop int64 @@ -260,23 +273,36 @@ func (e ReachedHeightToStopError) Error() string { // exiting the cs.receiveRoutine. func (w *crashingWAL) Write(m WALMessage) error { if endMsg, ok := m.(EndHeightMessage); ok { - if endMsg.Height == w.heightToStop { + if endMsg.Height >= w.heightToStop { + w.t.Logf("Rearched[%d] WAL messasge[%T], Height[%d]", w.msgIndex, m, endMsg.Height) w.panicCh <- ReachedHeightToStopError{endMsg.Height} runtime.Goexit() return nil } - + w.t.Logf("Not-Rearched[%d] WAL messasge[%T], Height[%d]", w.msgIndex, m, endMsg.Height) + w.msgIndex++ return w.next.Write(m) } if w.msgIndex > w.lastPanickedForMsgIndex { w.lastPanickedForMsgIndex = w.msgIndex _, file, line, _ := runtime.Caller(1) - w.panicCh <- WALWriteError{fmt.Sprintf("failed to write %T to WAL (fileline: %s:%d)", m, file, line)} + w.panicCh <- WALWriteError{ + fmt.Sprintf("Failed[%d] to write %T to WAL (fileline: %s:%d)", w.msgIndex, m, file, line)} runtime.Goexit() return nil } - + if mi, ok := m.(msgInfo); ok { + if pm, ok := mi.Msg.(*ProposalMessage); ok { + w.t.Logf("Skipped[%d] WAL message[%T]:[%T]:[%v]", w.msgIndex, m, mi.Msg, pm.Proposal.Type) + } else if vm, ok := mi.Msg.(*VoteMessage); ok { + w.t.Logf("Skipped[%d] WAL message[%T]:[%T]:[%v]", w.msgIndex, m, mi.Msg, vm.Vote.Type) + } else { + w.t.Logf("Skipped[%d] WAL message[%T]:[%T]", w.msgIndex, m, mi.Msg) + } + } else { + w.t.Logf("Skipped[%d] WAL message[%T]", w.msgIndex, m) + } w.msgIndex++ return w.next.Write(m) }