Skip to content
6 changes: 6 additions & 0 deletions catchup/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,12 @@ func (s *Service) fetchAndWrite(r basics.Round, prevFetchCompleteChan chan bool,
// if the context expired, just exit.
return false
}
if errNSBE, ok := err.(ledgercore.ErrNonSequentialBlockEval); ok && errNSBE.EvaluatorRound <= errNSBE.LatestRound {
// the block was added to the ledger from elsewhere after fetching it here
// only the agreement could have added this block into the ledger, catchup is complete
s.log.Infof("fetchAndWrite(%d): after fetching the block, it is already in the ledger. The catchup is complete", r)
return false
}
s.log.Warnf("fetchAndWrite(%d): failed to validate block : %v", r, err)
return false
}
Expand Down
20 changes: 12 additions & 8 deletions data/ledger.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,14 +325,16 @@ func (l *Ledger) EnsureValidatedBlock(vb *ledgercore.ValidatedBlock, c agreement
break
}

logfn := logging.Base().Errorf
logfn := l.log.Errorf

switch err.(type) {
case ledgercore.BlockInLedgerError:
logfn = logging.Base().Debugf
// If the block is already in the ledger (catchup and agreement might be competing),
// reporting this as a debug message is sufficient.
logfn = l.log.Debugf
// Otherwise, the error is because the block is in the future. Error is logged.
}

logfn("could not write block %d to the ledger: %v", round, err)
logfn("data.EnsureValidatedBlock: could not write block %d to the ledger: %v", round, err)
}
}

Expand All @@ -353,14 +355,16 @@ func (l *Ledger) EnsureBlock(block *bookkeeping.Block, c agreement.Certificate)
switch err.(type) {
case protocol.Error:
if !protocolErrorLogged {
logging.Base().Errorf("unrecoverable protocol error detected at block %d: %v", round, err)
l.log.Errorf("data.EnsureBlock: unrecoverable protocol error detected at block %d: %v", round, err)
protocolErrorLogged = true
}
case ledgercore.BlockInLedgerError:
logging.Base().Debugf("could not write block %d to the ledger: %v", round, err)
return // this error implies that l.LastRound() >= round
// The block is already in the ledger. Catchup and agreement could be competing
// It is sufficient to report this as a Debug message
l.log.Debugf("data.EnsureBlock: could not write block %d to the ledger: %v", round, err)
return
default:
logging.Base().Errorf("could not write block %d to the ledger: %v", round, err)
l.log.Errorf("data.EnsureBlock: could not write block %d to the ledger: %v", round, err)
}

// If there was an error add a short delay before the next attempt.
Expand Down
241 changes: 241 additions & 0 deletions data/ledger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
package data

import (
"context"
"fmt"
"sync"
"testing"

"github.com/stretchr/testify/require"
Expand All @@ -32,6 +35,7 @@ import (
"github.com/algorand/go-algorand/logging"
"github.com/algorand/go-algorand/protocol"
"github.com/algorand/go-algorand/test/partitiontest"
"github.com/algorand/go-algorand/util/execpool"
)

var testPoolAddr = basics.Address{0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff}
Expand Down Expand Up @@ -420,3 +424,240 @@ func TestConsensusVersion(t *testing.T) {
require.Equal(t, protocol.ConsensusVersion(""), ver)
require.Equal(t, ledgercore.ErrNoEntry{Round: basics.Round(blk.BlockHeader.NextProtocolSwitchOn + 1), Latest: basics.Round(blk.BlockHeader.Round), Committed: basics.Round(blk.BlockHeader.Round)}, err)
}

type loggedMessages struct {
logging.Logger
expectedMessages chan string
unexpectedMessages chan string
}

func (lm loggedMessages) Debug(args ...interface{}) {
m := fmt.Sprint(args...)
lm.unexpectedMessages <- m
}
func (lm loggedMessages) Debugf(s string, args ...interface{}) {
m := fmt.Sprintf(s, args...)
lm.expectedMessages <- m
}
func (lm loggedMessages) Info(args ...interface{}) {
m := fmt.Sprint(args...)
lm.unexpectedMessages <- m
}
func (lm loggedMessages) Infof(s string, args ...interface{}) {
m := fmt.Sprintf(s, args...)
lm.unexpectedMessages <- m
}
func (lm loggedMessages) Warn(args ...interface{}) {
m := fmt.Sprint(args...)
lm.unexpectedMessages <- m
}
func (lm loggedMessages) Warnf(s string, args ...interface{}) {
m := fmt.Sprintf(s, args...)
lm.unexpectedMessages <- m
}
func (lm loggedMessages) Error(args ...interface{}) {
m := fmt.Sprint(args...)
lm.unexpectedMessages <- m
}
func (lm loggedMessages) Errorf(s string, args ...interface{}) {
m := fmt.Sprintf(s, args...)
lm.unexpectedMessages <- m
}

// TestLedgerErrorValidate creates 3 parallel routines adding blocks to the ledger through different interfaces.
// The purpose here is to simulate the scenario where the catchup and the agreement compete to add blocks to the ledger.
// The error messages reported can be excessive or unnecessary. This test evaluates what messages are generate and at what frequency.
func TestLedgerErrorValidate(t *testing.T) {
partitiontest.PartitionTest(t)

var testPoolAddr = basics.Address{0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff}
var testSinkAddr = basics.Address{0x2c, 0x2a, 0x6c, 0xe9, 0xa9, 0xa7, 0xc2, 0x8c, 0x22, 0x95, 0xfd, 0x32, 0x4f, 0x77, 0xa5, 0x4, 0x8b, 0x42, 0xc2, 0xb7, 0xa8, 0x54, 0x84, 0xb6, 0x80, 0xb1, 0xe1, 0x3d, 0x59, 0x9b, 0xeb, 0x36}

proto, _ := config.Consensus[protocol.ConsensusCurrentVersion]
origProto := proto
defer func() {
config.Consensus[protocol.ConsensusCurrentVersion] = origProto
}()
proto.MinBalance = 0
config.Consensus[protocol.ConsensusCurrentVersion] = proto

blk := bookkeeping.Block{}
blk.CurrentProtocol = protocol.ConsensusCurrentVersion
blk.RewardsPool = testPoolAddr
blk.FeeSink = testSinkAddr
blk.BlockHeader.GenesisHash = crypto.Hash([]byte(t.Name()))

accts := make(map[basics.Address]basics.AccountData)
accts[testPoolAddr] = basics.MakeAccountData(basics.NotParticipating, basics.MicroAlgos{Raw: 0})
accts[testSinkAddr] = basics.MakeAccountData(basics.NotParticipating, basics.MicroAlgos{Raw: 0})

genesisInitState := ledgercore.InitState{
Accounts: accts,
Block: blk,
GenesisHash: crypto.Hash([]byte(t.Name())),
}

expectedMessages := make(chan string, 100)
unexpectedMessages := make(chan string, 100)

const inMem = true
cfg := config.GetDefaultLocal()
cfg.Archival = true
log := loggedMessages{Logger: logging.TestingLog(t), expectedMessages: expectedMessages, unexpectedMessages: unexpectedMessages}
log.SetLevel(logging.Debug)
realLedger, err := ledger.OpenLedger(log, t.Name(), inMem, genesisInitState, cfg)
require.NoError(t, err, "could not open ledger")
defer realLedger.Close()

l := Ledger{Ledger: realLedger, log: log}
l.log.SetLevel(logging.Debug)
require.NotNil(t, &l)

totalsRound, _, err := realLedger.LatestTotals()
require.NoError(t, err)
require.Equal(t, basics.Round(0), totalsRound)

errChan := make(chan error, 1)
defer close(errChan)

wg := sync.WaitGroup{}
defer wg.Wait()

blkChan1 := make(chan bookkeeping.Block, 10)
blkChan2 := make(chan bookkeeping.Block, 10)
blkChan3 := make(chan bookkeeping.Block, 10)
defer close(blkChan1)
defer close(blkChan2)
defer close(blkChan3)

// Add blocks to the ledger via EnsureValidatedBlock. This calls AddValidatedBlock, which simply
// passes the block to blockQueue. The returned error is handled by EnsureValidatedBlock, which reports
// in the form of logged error message.
go func() {
wg.Add(1)
i := 0
for blk := range blkChan1 {
i++
vb, err := validatedBlock(l.Ledger, blk)
if err != nil {
// AddBlock already added the block
// This is okay to ignore.
// This error is generated from ledger.Ledger Validate function, used from:
// - node blockValidatorImpl Validate
// - catchup service s.ledger.Validate (Catchup service returns after the first error)
continue
}
l.EnsureValidatedBlock(vb, agreement.Certificate{})
}
wg.Done()
}()

// Add blocks to the ledger via EnsureBlock. This basically calls AddBlock, but handles
// the errors by logging them. Checking the logged messages to verify its behavior.
go func() {
wg.Add(1)
i := 0
for blk := range blkChan2 {
i++
l.EnsureBlock(&blk, agreement.Certificate{})
}
wg.Done()
}()

// Add blocks directly to the ledger
go func() {
wg.Add(1)
i := 0
for blk := range blkChan3 {
i++
err := l.AddBlock(blk, agreement.Certificate{})
// AddBlock is used in 2 places:
// - data.ledger.EnsureBlock which reports a log message as Error or Debug
// - catchup.service.fetchAndWrite which leads to interrupting catchup or skiping the round
if err != nil {
switch err.(type) {
// The following two cases are okay to ignore, since these are expected and handled
case ledgercore.BlockInLedgerError:
case ledgercore.ErrNonSequentialBlockEval:
continue
default:
// Make sure unexpected error is not obtained here
errChan <- err
}
}
l.WaitForCommit(blk.BlockHeader.Round)
}
wg.Done()
}()

// flush the messages output during the setup
more := true
for more {
select {
case <-expectedMessages:
case <-unexpectedMessages:
default:
more = false
}
}

for rnd := basics.Round(1); rnd <= basics.Round(2000); rnd++ {
blk, err := getEmptyBlock(rnd-1, l.Ledger, t.Name(), genesisInitState.Accounts)
require.NoError(t, err)
blkChan3 <- blk
blkChan2 <- blk
blkChan1 <- blk

more = true
for more {
select {
case err := <-errChan:
require.NoError(t, err)
case <-expectedMessages:
// only debug messages should be reported
case um := <-unexpectedMessages:
require.Empty(t, um, um)
default:
more = false
}
}
}
}

func validatedBlock(l *ledger.Ledger, blk bookkeeping.Block) (vb *ledgercore.ValidatedBlock, err error) {
backlogPool := execpool.MakeBacklog(nil, 0, execpool.LowPriority, nil)
defer backlogPool.Shutdown()
vb, err = l.Validate(context.Background(), blk, backlogPool)
return
}

func getEmptyBlock(afterRound basics.Round, l *ledger.Ledger, genesisID string, initAccounts map[basics.Address]basics.AccountData) (blk bookkeeping.Block, err error) {
l.WaitForCommit(afterRound)

lastBlock, err := l.Block(l.Latest())
if err != nil {
return
}

proto := config.Consensus[lastBlock.CurrentProtocol]
blk.BlockHeader = bookkeeping.BlockHeader{
GenesisID: genesisID,
Round: l.Latest() + 1,
Branch: lastBlock.Hash(),
TimeStamp: 0,
}

if proto.SupportGenesisHash {
blk.BlockHeader.GenesisHash = crypto.Hash([]byte(genesisID))
}

blk.RewardsPool = testPoolAddr
blk.FeeSink = testSinkAddr
blk.CurrentProtocol = lastBlock.CurrentProtocol

blk.TxnRoot, err = blk.PaysetCommit()
if err != nil {
return
}
return
}
7 changes: 6 additions & 1 deletion ledger/ledger.go
Original file line number Diff line number Diff line change
Expand Up @@ -578,6 +578,11 @@ func (l *Ledger) AddBlock(blk bookkeeping.Block, cert agreement.Certificate) err

updates, err := internal.Eval(context.Background(), l, blk, false, l.verifiedTxnCache, nil)
if err != nil {
if errNSBE, ok := err.(ledgercore.ErrNonSequentialBlockEval); ok && errNSBE.EvaluatorRound <= errNSBE.LatestRound {
return ledgercore.BlockInLedgerError{
LastRound: errNSBE.EvaluatorRound,
NextRound: errNSBE.LatestRound + 1}
}
return err
}
vb := ledgercore.MakeValidatedBlock(blk, updates)
Expand All @@ -602,7 +607,7 @@ func (l *Ledger) AddValidatedBlock(vb ledgercore.ValidatedBlock, cert agreement.
}
l.headerCache.Put(blk.Round(), blk.BlockHeader)
l.trackers.newBlock(blk, vb.Delta())
l.log.Debugf("added blk %d", blk.Round())
l.log.Debugf("ledger.AddValidatedBlock: added blk %d", blk.Round())
return nil
}

Expand Down