Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support non-clean shutdown #155

Merged
merged 17 commits into from
Sep 1, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions arbitrum/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/internal/shutdowncheck"
"github.com/ethereum/go-ethereum/node"
)

Expand All @@ -24,6 +25,8 @@ type Backend struct {
bloomRequests chan chan *bloombits.Retrieval // Channel receiving bloom data retrieval requests
bloomIndexer *core.ChainIndexer // Bloom indexer operating during block imports

shutdownTracker *shutdowncheck.ShutdownTracker

chanTxs chan *types.Transaction
chanClose chan struct{} //close coroutine
chanNewBlock chan struct{} //create new L2 block unless empty
Expand All @@ -39,6 +42,8 @@ func NewBackend(stack *node.Node, config *Config, chainDb ethdb.Database, publis
bloomRequests: make(chan chan *bloombits.Retrieval),
bloomIndexer: core.NewBloomIndexer(chainDb, config.BloomBitsBlocks, config.BloomConfirms),

shutdownTracker: shutdowncheck.NewShutdownTracker(chainDb),

chanTxs: make(chan *types.Transaction, 100),
chanClose: make(chan struct{}),
chanNewBlock: make(chan struct{}, 1),
Expand All @@ -49,6 +54,7 @@ func NewBackend(stack *node.Node, config *Config, chainDb ethdb.Database, publis
if err != nil {
return nil, err
}
backend.shutdownTracker.MarkStartup()
return backend, nil
}

Expand Down Expand Up @@ -79,13 +85,16 @@ func (b *Backend) ArbInterface() ArbInterface {
// TODO: this is used when registering backend as lifecycle in stack
func (b *Backend) Start() error {
b.startBloomHandlers(b.config.BloomBitsBlocks)
b.shutdownTracker.Start()

return nil
}

func (b *Backend) Stop() error {
b.scope.Close()
b.bloomIndexer.Close()
b.shutdownTracker.Stop()
b.chainDb.Close()
close(b.chanClose)
return nil
}
76 changes: 50 additions & 26 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"errors"
"fmt"
"io"
"math"
"math/big"
"sort"
"sync"
Expand Down Expand Up @@ -91,7 +92,6 @@ const (
txLookupCacheLimit = 1024
maxFutureBlocks = 256
maxTimeFutureBlocks = 30
TriesInMemory = 128

// BlockChainVersion ensures that an incompatible database forces a resync from scratch.
//
Expand Down Expand Up @@ -132,12 +132,21 @@ type CacheConfig struct {
SnapshotLimit int // Memory allowance (MB) to use for caching snapshot entries in memory
Preimages bool // Whether to store preimage of trie key to the disk

// Arbitrum: configure GC window
TriesInMemory uint64 // Height difference before which a trie may not be garbage-collected
TrieRetention time.Duration // Time limit before which a trie may not be garbage-collected

SnapshotWait bool // Wait for snapshot construction on startup. TODO(karalabe): This is a dirty hack for testing, nuke it
}

// defaultCacheConfig are the default caching values if none are specified by the
// user (also used during testing).
var defaultCacheConfig = &CacheConfig{

// Arbitrum Config Options
TriesInMemory: 128,
TrieRetention: 30 * time.Minute,

TrieCleanLimit: 256,
TrieDirtyLimit: 256,
TrieTimeLimit: 5 * time.Minute,
Expand Down Expand Up @@ -215,6 +224,11 @@ type BlockChain struct {
vmConfig vm.Config
}

type trieGcEntry struct {
Root common.Hash
Timestamp uint64
}

// NewBlockChain returns a fully initialised block chain using information
// available in the database. It initialises the default Ethereum Validator
// and Processor.
Expand Down Expand Up @@ -870,9 +884,15 @@ func (bc *BlockChain) Stop() {
if !bc.cacheConfig.TrieDirtyDisabled {
triedb := bc.stateCache.TrieDB()

for _, offset := range []uint64{0, 1, TriesInMemory - 1} {
for _, offset := range []uint64{0, 1, bc.cacheConfig.TriesInMemory - 1, math.MaxUint64} {
if number := bc.CurrentBlock().NumberU64(); number > offset {
recent := bc.GetBlockByNumber(number - offset)
var recent *types.Block
if offset == math.MaxUint {
_, latest := bc.triegc.Peek()
recent = bc.GetBlockByNumber(uint64(-latest))
} else {
recent = bc.GetBlockByNumber(number - offset)
}
if recent.Root() == (common.Hash{}) {
continue
}
Expand All @@ -890,7 +910,7 @@ func (bc *BlockChain) Stop() {
}
}
for !bc.triegc.Empty() {
triedb.Dereference(bc.triegc.PopItem().(common.Hash))
triedb.Dereference(bc.triegc.PopItem().(trieGcEntry).Root)
}
if size, _ := triedb.Size(); size != 0 {
log.Error("Dangling trie nodes after full cleanup")
Expand Down Expand Up @@ -1213,8 +1233,6 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [
return 0, nil
}

var lastWrite uint64

// writeBlockWithoutState writes only the block and its metadata to the database,
// but does not write any state. This is used to construct competing side forks
// up to the point where they exceed the canonical total difficulty.
Expand Down Expand Up @@ -1281,9 +1299,12 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types.
} else {
// Full but not archive node, do proper garbage collection
triedb.Reference(root, common.Hash{}) // metadata reference to keep trie alive
bc.triegc.Push(root, -int64(block.NumberU64()))
bc.triegc.Push(trieGcEntry{root, block.Header().Time}, -int64(block.NumberU64()))

blockLimit := int64(block.NumberU64()) - int64(bc.cacheConfig.TriesInMemory) // only cleared if below that
timeLimit := time.Now().Unix() - int64(bc.cacheConfig.TrieRetention.Seconds()) // only cleared if less than that

if current := block.NumberU64(); current > TriesInMemory {
if blockLimit > 0 && timeLimit > 0 {
// If we exceeded our memory allowance, flush matured singleton nodes to disk
var (
nodes, imgs = triedb.Size()
Expand All @@ -1292,36 +1313,39 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types.
if nodes > limit || imgs > 4*1024*1024 {
triedb.Cap(limit - ethdb.IdealBatchSize)
}
// Find the next state trie we need to commit
chosen := current - TriesInMemory

var prevEntry *trieGcEntry
var prevNum uint64
// Garbage collect anything below our required write retention
for !bc.triegc.Empty() {
tmp, number := bc.triegc.Pop()
triegcEntry := tmp.(trieGcEntry)
if uint64(-number) > uint64(blockLimit) || triegcEntry.Timestamp > uint64(timeLimit) {
bc.triegc.Push(triegcEntry, number)
break
}
if prevEntry != nil {
triedb.Dereference(prevEntry.Root)
}
prevEntry = &triegcEntry
prevNum = uint64(-number)
}
// If we exceeded out time allowance, flush an entire trie to disk
if bc.gcproc > bc.cacheConfig.TrieTimeLimit {
if bc.gcproc > bc.cacheConfig.TrieTimeLimit && prevEntry != nil {
// If the header is missing (canonical chain behind), we're reorging a low
// diff sidechain. Suspend committing until this operation is completed.
header := bc.GetHeaderByNumber(chosen)
header := bc.GetHeaderByNumber(prevNum)
if header == nil {
log.Warn("Reorg in progress, trie commit postponed", "number", chosen)
log.Warn("Reorg in progress, trie commit postponed")
} else {
// If we're exceeding limits but haven't reached a large enough memory gap,
// warn the user that the system is becoming unstable.
if chosen < lastWrite+TriesInMemory && bc.gcproc >= 2*bc.cacheConfig.TrieTimeLimit {
log.Info("State in memory for too long, committing", "time", bc.gcproc, "allowance", bc.cacheConfig.TrieTimeLimit, "optimum", float64(chosen-lastWrite)/TriesInMemory)
}
// Flush an entire trie and restart the counters
triedb.Commit(header.Root, true, nil)
lastWrite = chosen
bc.gcproc = 0
}
}
// Garbage collect anything below our required write retention
for !bc.triegc.Empty() {
root, number := bc.triegc.Pop()
if uint64(-number) > chosen {
bc.triegc.Push(root, number)
break
}
triedb.Dereference(root.(common.Hash))
if prevEntry != nil {
triedb.Dereference(prevEntry.Root)
}
}
}
Expand Down
13 changes: 13 additions & 0 deletions core/blockchain_arbitrum.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,23 @@
package core

import (
"time"

"github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/rpc"
)

// WriteBlockAndSetHeadWithTime also counts processTime, which will cause intermittent TrieDirty cache writes
func (bc *BlockChain) WriteBlockAndSetHeadWithTime(block *types.Block, receipts []*types.Receipt, logs []*types.Log, state *state.StateDB, emitHeadEvent bool, processTime time.Duration) (status WriteStatus, err error) {
if !bc.chainmu.TryLock() {
return NonStatTy, errChainStopped
}
defer bc.chainmu.Unlock()
bc.gcproc += processTime
return bc.writeBlockAndSetHead(block, receipts, logs, state, emitHeadEvent)
}

func (bc *BlockChain) ReorgToOldBlock(newHead *types.Block) error {
bc.wg.Add(1)
defer bc.wg.Done()
Expand Down
10 changes: 10 additions & 0 deletions core/blockchain_repair_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1767,6 +1767,11 @@ func testRepair(t *testing.T, tt *rewindTest, snapshots bool) {
genesis = (&Genesis{BaseFee: big.NewInt(params.InitialBaseFee)}).MustCommit(db)
engine = ethash.NewFullFaker()
config = &CacheConfig{

// Arbitrum
TriesInMemory: 128,
TrieRetention: 30 * time.Minute,

TrieCleanLimit: 256,
TrieDirtyLimit: 256,
TrieTimeLimit: 5 * time.Minute,
Expand Down Expand Up @@ -1891,6 +1896,11 @@ func TestIssue23496(t *testing.T) {
genesis = (&Genesis{BaseFee: big.NewInt(params.InitialBaseFee)}).MustCommit(db)
engine = ethash.NewFullFaker()
config = &CacheConfig{

// Arbitrum
TriesInMemory: 128,
TrieRetention: 30 * time.Minute,

TrieCleanLimit: 256,
TrieDirtyLimit: 256,
TrieTimeLimit: 5 * time.Minute,
Expand Down
5 changes: 5 additions & 0 deletions core/blockchain_sethead_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1967,6 +1967,11 @@ func testSetHead(t *testing.T, tt *rewindTest, snapshots bool) {
genesis = (&Genesis{BaseFee: big.NewInt(params.InitialBaseFee)}).MustCommit(db)
engine = ethash.NewFullFaker()
config = &CacheConfig{

// Arbitrum
TriesInMemory: 128,
TrieRetention: 30 * time.Minute,

TrieCleanLimit: 256,
TrieDirtyLimit: 256,
TrieTimeLimit: 5 * time.Minute,
Expand Down
15 changes: 15 additions & 0 deletions core/blockchain_snapshot_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,11 @@ func (snaptest *gappedSnapshotTest) test(t *testing.T) {

// Insert a few more blocks without enabling snapshot
var cacheConfig = &CacheConfig{

// Arbitrum
TriesInMemory: 128,
TrieRetention: 30 * time.Minute,

TrieCleanLimit: 256,
TrieDirtyLimit: 256,
TrieTimeLimit: 5 * time.Minute,
Expand Down Expand Up @@ -363,6 +368,11 @@ func (snaptest *wipeCrashSnapshotTest) test(t *testing.T) {
chain.Stop()

config := &CacheConfig{

// Arbitrum
TriesInMemory: 128,
TrieRetention: 30 * time.Minute,

TrieCleanLimit: 256,
TrieDirtyLimit: 256,
TrieTimeLimit: 5 * time.Minute,
Expand All @@ -378,6 +388,11 @@ func (snaptest *wipeCrashSnapshotTest) test(t *testing.T) {

// Restart the chain, the wiper should starts working
config = &CacheConfig{

// Arbitrum
TriesInMemory: 128,
TrieRetention: 30 * time.Minute,

TrieCleanLimit: 256,
TrieDirtyLimit: 256,
TrieTimeLimit: 5 * time.Minute,
Expand Down
5 changes: 5 additions & 0 deletions eth/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,11 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) {
EnablePreimageRecording: config.EnablePreimageRecording,
}
cacheConfig = &core.CacheConfig{

// Arbitrum
TriesInMemory: 128,
TrieRetention: 30 * time.Minute,

TrieCleanLimit: config.TrieCleanCache,
TrieCleanJournal: stack.ResolvePath(config.TrieCleanCacheJournal),
TrieCleanRejournal: config.TrieCleanCacheRejournal,
Expand Down
2 changes: 1 addition & 1 deletion eth/gasprice/gasprice_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ func newTestBackend(t *testing.T, londonBlock *big.Int, pending bool) *testBacke
// Construct testing chain
diskdb := rawdb.NewMemoryDatabase()
gspec.Commit(diskdb)
chain, err := core.NewBlockChain(diskdb, &core.CacheConfig{TrieCleanNoPrefetch: true}, gspec.Config, engine, vm.Config{}, nil, nil)
chain, err := core.NewBlockChain(diskdb, &core.CacheConfig{TrieCleanNoPrefetch: true, TriesInMemory: 128}, gspec.Config, engine, vm.Config{}, nil, nil)
if err != nil {
t.Fatalf("Failed to create local chain, %v", err)
}
Expand Down
5 changes: 5 additions & 0 deletions eth/tracers/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,11 @@ func newTestBackend(t *testing.T, n int, gspec *core.Genesis, generator func(i i
// Import the canonical chain
gspec.MustCommit(backend.chaindb)
cacheConfig := &core.CacheConfig{

// Arbitrum
TriesInMemory: 128,
TrieRetention: 30 * time.Minute,

TrieCleanLimit: 256,
TrieDirtyLimit: 256,
TrieTimeLimit: 5 * time.Minute,
Expand Down
2 changes: 1 addition & 1 deletion miner/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ func newTestWorkerBackend(t *testing.T, chainConfig *params.ChainConfig, engine
}
genesis := gspec.MustCommit(db)

chain, _ := core.NewBlockChain(db, &core.CacheConfig{TrieDirtyDisabled: true}, gspec.Config, engine, vm.Config{}, nil, nil)
chain, _ := core.NewBlockChain(db, &core.CacheConfig{TrieDirtyDisabled: true, TriesInMemory: 128}, gspec.Config, engine, vm.Config{}, nil, nil)
txpool := core.NewTxPool(testTxPoolConfig, chainConfig, chain)

// Generate a small n-block chain and an uncle block for it
Expand Down
2 changes: 1 addition & 1 deletion tests/block_test_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func (t *BlockTest) Run(snapshotter bool) error {
} else {
engine = ethash.NewShared()
}
cache := &core.CacheConfig{TrieCleanLimit: 0}
cache := &core.CacheConfig{TrieCleanLimit: 0, TriesInMemory: 128}
if snapshotter {
cache.SnapshotLimit = 1
cache.SnapshotWait = true
Expand Down