Skip to content

Commit

Permalink
(fix) TestWALCrash: Don't start crash state when reach to stop heig…
Browse files Browse the repository at this point in the history
…ht and improve logging
  • Loading branch information
tnasu committed Sep 18, 2021
1 parent 796e9f0 commit ec72aa6
Show file tree
Hide file tree
Showing 2 changed files with 94 additions and 29 deletions.
47 changes: 43 additions & 4 deletions consensus/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,17 @@ func newStateWithConfigAndBlockStore(
pv types.PrivValidator,
app abci.Application,
blockDB dbm.DB,
) *State {
return newStateWithConfigAndBlockStoreWithLoggers(thisConfig, state, pv, app, blockDB, DefaultTestLoggers())
}

func newStateWithConfigAndBlockStoreWithLoggers(
thisConfig *cfg.Config,
state sm.State,
pv types.PrivValidator,
app abci.Application,
blockDB dbm.DB,
loggers TestLoggers,
) *State {
// Get BlockStore
blockStore := store.NewBlockStore(blockDB)
Expand All @@ -415,7 +426,7 @@ func newStateWithConfigAndBlockStore(

// Make Mempool
mempool := mempl.NewCListMempool(thisConfig.Mempool, proxyAppConnMem, 0)
mempool.SetLogger(log.TestingLogger().With("module", "mempool"))
mempool.SetLogger(loggers.memLogger.With("module", "mempool"))
if thisConfig.Consensus.WaitForTxs() {
mempool.EnableTxsAvailable()
}
Expand All @@ -429,13 +440,13 @@ func newStateWithConfigAndBlockStore(
panic(err)
}

blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyAppConnCon, mempool, evpool)
blockExec := sm.NewBlockExecutor(stateStore, loggers.execLogger, proxyAppConnCon, mempool, evpool)
cs := NewState(thisConfig.Consensus, state, blockExec, blockStore, mempool, evpool)
cs.SetLogger(log.TestingLogger().With("module", "consensus"))
cs.SetLogger(loggers.csLogger.With("module", "consensus"))
cs.SetPrivValidator(pv)

eventBus := types.NewEventBus()
eventBus.SetLogger(log.TestingLogger().With("module", "events"))
eventBus.SetLogger(loggers.eventLogger.With("module", "events"))
err := eventBus.Start()
if err != nil {
panic(err)
Expand Down Expand Up @@ -761,6 +772,34 @@ func ensureNewEventOnChannel(ch <-chan tmpubsub.Message) {
//-------------------------------------------------------------------------------
// consensus nets

type TestLoggers struct {
memLogger log.Logger
evLogger log.Logger
execLogger log.Logger
csLogger log.Logger
eventLogger log.Logger
}

func NewTestLoggers(memLogger, evLogger, execLogger, csLogger, eventLogger log.Logger) TestLoggers {
return TestLoggers{
memLogger: memLogger,
evLogger: evLogger,
execLogger: execLogger,
csLogger: csLogger,
eventLogger: eventLogger,
}
}

func DefaultTestLoggers() TestLoggers {
return NewTestLoggers(
log.TestingLogger(), log.TestingLogger(), log.TestingLogger(), log.TestingLogger(), log.TestingLogger())
}

func NopTestLoggers() TestLoggers {
return NewTestLoggers(
log.NewNopLogger(), log.NewNopLogger(), log.NewNopLogger(), log.NewNopLogger(), log.NewNopLogger())
}

// consensusLogger is a TestingLogger which uses a different
// color for each validator ("validator" key must exist).
func consensusLogger() log.Logger {
Expand Down
76 changes: 51 additions & 25 deletions consensus/replay_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,14 @@ func TestMain(m *testing.M) {
configMempoolTest := ResetConfig("consensus_mempool_test")
configByzantineTest := ResetConfig("consensus_byzantine_test")
code := m.Run()
os.RemoveAll(config.RootDir)
os.RemoveAll(consensusReplayConfig.RootDir)
os.RemoveAll(configStateTest.RootDir)
os.RemoveAll(configMempoolTest.RootDir)
os.RemoveAll(configByzantineTest.RootDir)
if code != 0 {
fmt.Println("err:", "code:", code)
}
fmt.Println("config:", "err:", os.RemoveAll(config.RootDir))
fmt.Println("consensusReplayConfig:", "err:", os.RemoveAll(consensusReplayConfig.RootDir))
fmt.Println("configStateTest:", "err:", os.RemoveAll(configStateTest.RootDir))
fmt.Println("configMempoolTest:", "err:", os.RemoveAll(configMempoolTest.RootDir))
fmt.Println("configByzantineTest:", "err:", os.RemoveAll(configByzantineTest.RootDir))
os.Exit(code)
}

Expand All @@ -66,22 +69,24 @@ func TestMain(m *testing.M) {
// and which ones we need the wal for - then we'd also be able to only flush the
// wal writer when we need to, instead of with every message.

func startNewStateAndWaitForBlock(t *testing.T, consensusReplayConfig *cfg.Config,
lastBlockHeight int64, blockDB dbm.DB, stateStore sm.Store) {
logger := log.TestingLogger()
func startNewStateAndWaitForBlock(t *testing.T, i int, consensusReplayConfig *cfg.Config,
blockDB dbm.DB, stateStore sm.Store) {
logger := log.TestingLogger().With("attr", "make block", "i", i)
state, _ := stateStore.LoadFromDBOrGenesisFile(consensusReplayConfig.GenesisFile())
privValidator := loadPrivValidator(consensusReplayConfig)
cs := newStateWithConfigAndBlockStore(
cs := newStateWithConfigAndBlockStoreWithLoggers(
consensusReplayConfig,
state,
privValidator,
kvstore.NewApplication(),
blockDB,
NewTestLoggers(
log.NewNopLogger().With("module", "mempool"),
log.NewNopLogger().With("module", "evidence"),
logger.With("module", "executor"),
logger.With("module", "consensus"),
log.NewNopLogger().With("module", "event")),
)
cs.SetLogger(logger)

bytes, _ := ioutil.ReadFile(cs.config.WalFile())
t.Logf("====== WAL: \n\r%X\n", bytes)

err := cs.Start()
require.NoError(t, err)
Expand All @@ -98,7 +103,9 @@ func startNewStateAndWaitForBlock(t *testing.T, consensusReplayConfig *cfg.Confi
newBlockSub, err := cs.eventBus.Subscribe(context.Background(), testSubscriber, types.EventQueryNewBlock)
require.NoError(t, err)
select {
case <-newBlockSub.Out():
case msg := <-newBlockSub.Out():
height := msg.Data().(types.EventDataNewBlock).Block.Height
t.Logf("Make Block.Height[%d]", height)
case <-newBlockSub.Cancelled():
t.Fatal("newBlockSub was cancelled")
case <-time.After(10 * time.Second): // XXX 120 second is too much time, so we changed to 10 second
Expand Down Expand Up @@ -155,29 +162,34 @@ func TestWALCrash(t *testing.T) {
func crashWALandCheckLiveness(t *testing.T, consensusReplayConfig *cfg.Config,
initFn func(dbm.DB, *State, context.Context), heightToStop int64) {
walPanicked := make(chan error)
crashingWal := &crashingWAL{panicCh: walPanicked, heightToStop: heightToStop}
crashingWal := &crashingWAL{t: t, panicCh: walPanicked, heightToStop: heightToStop}

i := 1
LOOP:
for {
t.Logf("====== LOOP %d\n", i)

// create consensus state from a clean slate
logger := log.NewNopLogger()
blockDB := memdb.NewDB()
stateDB := blockDB
stateStore := sm.NewStore(stateDB)
state, err := sm.MakeGenesisStateFromFile(consensusReplayConfig.GenesisFile())
require.NoError(t, err)
privValidator := loadPrivValidator(consensusReplayConfig)
cs := newStateWithConfigAndBlockStore(
logger := log.TestingLogger().With("attr", "crash wal", "i", i)
cs := newStateWithConfigAndBlockStoreWithLoggers(
consensusReplayConfig,
state,
privValidator,
kvstore.NewApplication(),
blockDB,
NewTestLoggers(
log.NewNopLogger().With("module", "mempool"),
log.NewNopLogger().With("module", "evidence"),
logger.With("module", "executor"),
logger.With("module", "consensus"),
log.NewNopLogger().With("module", "event")),
)
cs.SetLogger(logger)

// start sending transactions
ctx, cancel := context.WithCancel(context.Background())
Expand All @@ -200,14 +212,12 @@ LOOP:
err = cs.Start()
require.NoError(t, err)

i++

select {
case err := <-walPanicked:
t.Logf("WAL panicked: %v", err)

// make sure we can make blocks after a crash
startNewStateAndWaitForBlock(t, consensusReplayConfig, cs.Height, blockDB, stateStore)
startNewStateAndWaitForBlock(t, i, consensusReplayConfig, blockDB, stateStore)

// stop consensus state and transactions sender (initFn)
cs.Stop() //nolint:errcheck // Logging this error causes failure
Expand All @@ -220,13 +230,16 @@ LOOP:
case <-time.After(10 * time.Second):
t.Fatal("WAL did not panic for 10 seconds (check the log)")
}

i++
}
}

// crashingWAL is a WAL which crashes or rather simulates a crash during Save
// (before and after). It remembers a message for which we last panicked
// (lastPanickedForMsgIndex), so we don't panic for it in subsequent iterations.
type crashingWAL struct {
t *testing.T
next WAL
panicCh chan error
heightToStop int64
Expand Down Expand Up @@ -260,23 +273,36 @@ func (e ReachedHeightToStopError) Error() string {
// exiting the cs.receiveRoutine.
func (w *crashingWAL) Write(m WALMessage) error {
if endMsg, ok := m.(EndHeightMessage); ok {
if endMsg.Height == w.heightToStop {
if endMsg.Height >= w.heightToStop {
w.t.Logf("Rearched[%d] WAL messasge[%T], Height[%d]", w.msgIndex, m, endMsg.Height)
w.panicCh <- ReachedHeightToStopError{endMsg.Height}
runtime.Goexit()
return nil
}

w.t.Logf("Not-Rearched[%d] WAL messasge[%T], Height[%d]", w.msgIndex, m, endMsg.Height)
w.msgIndex++
return w.next.Write(m)
}

if w.msgIndex > w.lastPanickedForMsgIndex {
w.lastPanickedForMsgIndex = w.msgIndex
_, file, line, _ := runtime.Caller(1)
w.panicCh <- WALWriteError{fmt.Sprintf("failed to write %T to WAL (fileline: %s:%d)", m, file, line)}
w.panicCh <- WALWriteError{
fmt.Sprintf("Failed[%d] to write %T to WAL (fileline: %s:%d)", w.msgIndex, m, file, line)}
runtime.Goexit()
return nil
}

if mi, ok := m.(msgInfo); ok {
if pm, ok := mi.Msg.(*ProposalMessage); ok {
w.t.Logf("Skipped[%d] WAL message[%T]:[%T]:[%v]", w.msgIndex, m, mi.Msg, pm.Proposal.Type)
} else if vm, ok := mi.Msg.(*VoteMessage); ok {
w.t.Logf("Skipped[%d] WAL message[%T]:[%T]:[%v]", w.msgIndex, m, mi.Msg, vm.Vote.Type)
} else {
w.t.Logf("Skipped[%d] WAL message[%T]:[%T]", w.msgIndex, m, mi.Msg)
}
} else {
w.t.Logf("Skipped[%d] WAL message[%T]", w.msgIndex, m)
}
w.msgIndex++
return w.next.Write(m)
}
Expand Down

0 comments on commit ec72aa6

Please sign in to comment.