Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support non-clean shutdown #155

Merged
merged 17 commits into from
Sep 1, 2022
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions arbitrum/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/internal/shutdowncheck"
"github.com/ethereum/go-ethereum/node"
)

Expand All @@ -24,6 +25,8 @@ type Backend struct {
bloomRequests chan chan *bloombits.Retrieval // Channel receiving bloom data retrieval requests
bloomIndexer *core.ChainIndexer // Bloom indexer operating during block imports

shutdownTracker *shutdowncheck.ShutdownTracker

chanTxs chan *types.Transaction
chanClose chan struct{} //close coroutine
chanNewBlock chan struct{} //create new L2 block unless empty
Expand All @@ -39,6 +42,8 @@ func NewBackend(stack *node.Node, config *Config, chainDb ethdb.Database, publis
bloomRequests: make(chan chan *bloombits.Retrieval),
bloomIndexer: core.NewBloomIndexer(chainDb, config.BloomBitsBlocks, config.BloomConfirms),

shutdownTracker: shutdowncheck.NewShutdownTracker(chainDb),

chanTxs: make(chan *types.Transaction, 100),
chanClose: make(chan struct{}),
chanNewBlock: make(chan struct{}, 1),
Expand All @@ -49,6 +54,7 @@ func NewBackend(stack *node.Node, config *Config, chainDb ethdb.Database, publis
if err != nil {
return nil, err
}
backend.shutdownTracker.MarkStartup()
return backend, nil
}

Expand Down Expand Up @@ -79,13 +85,16 @@ func (b *Backend) ArbInterface() ArbInterface {
// TODO: this is used when registering backend as lifecycle in stack
func (b *Backend) Start() error {
b.startBloomHandlers(b.config.BloomBitsBlocks)
b.shutdownTracker.Start()

return nil
}

func (b *Backend) Stop() error {
b.scope.Close()
b.bloomIndexer.Close()
b.shutdownTracker.Stop()
b.chainDb.Close()
close(b.chanClose)
return nil
}
26 changes: 20 additions & 6 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,6 @@ const (
txLookupCacheLimit = 1024
maxFutureBlocks = 256
maxTimeFutureBlocks = 30
TriesInMemory = 128

// BlockChainVersion ensures that an incompatible database forces a resync from scratch.
//
Expand Down Expand Up @@ -132,12 +131,21 @@ type CacheConfig struct {
SnapshotLimit int // Memory allowance (MB) to use for caching snapshot entries in memory
Preimages bool // Whether to store preimage of trie key to the disk

// Arbitrum: configure GC window
TriesInMemory uint64 // Height difference before which a trie may not be garbage-collected
TrieRetention time.Duration // Time limit before which a trie may not be garbage-collected

SnapshotWait bool // Wait for snapshot construction on startup. TODO(karalabe): This is a dirty hack for testing, nuke it
}

// defaultCacheConfig are the default caching values if none are specified by the
// user (also used during testing).
var defaultCacheConfig = &CacheConfig{

// Arbitrum Config Options
TriesInMemory: 128,
TrieRetention: 30 * time.Minute,

TrieCleanLimit: 256,
TrieDirtyLimit: 256,
TrieTimeLimit: 5 * time.Minute,
Expand Down Expand Up @@ -862,6 +870,9 @@ func (bc *BlockChain) Stop() {
}
}

// Arbitrum: only discard tries sufficiently old in both time and height
retain := bc.FindRetentionBound()

// Ensure the state of a recent block is also stored to disk before exiting.
// We're writing three different states to catch different restart scenarios:
// - HEAD: So we don't need to reprocess any blocks in the general case
Expand All @@ -870,7 +881,7 @@ func (bc *BlockChain) Stop() {
if !bc.cacheConfig.TrieDirtyDisabled {
triedb := bc.stateCache.TrieDB()

for _, offset := range []uint64{0, 1, TriesInMemory - 1} {
for _, offset := range []uint64{0, 1, retain - 1} {
if number := bc.CurrentBlock().NumberU64(); number > offset {
recent := bc.GetBlockByNumber(number - offset)
if recent.Root() == (common.Hash{}) {
Expand Down Expand Up @@ -1275,6 +1286,9 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types.
}
triedb := bc.stateCache.TrieDB()

// Arbitrum: only discard tries sufficiently old in both time and height
retain := bc.FindRetentionBound()

// If we're running an archive node, always flush
if bc.cacheConfig.TrieDirtyDisabled {
return triedb.Commit(root, false, nil)
Expand All @@ -1283,7 +1297,7 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types.
triedb.Reference(root, common.Hash{}) // metadata reference to keep trie alive
bc.triegc.Push(root, -int64(block.NumberU64()))

if current := block.NumberU64(); current > TriesInMemory {
if current := block.NumberU64(); current > retain {
// If we exceeded our memory allowance, flush matured singleton nodes to disk
var (
nodes, imgs = triedb.Size()
Expand All @@ -1293,7 +1307,7 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types.
triedb.Cap(limit - ethdb.IdealBatchSize)
}
// Find the next state trie we need to commit
chosen := current - TriesInMemory
chosen := current - retain

// If we exceeded out time allowance, flush an entire trie to disk
if bc.gcproc > bc.cacheConfig.TrieTimeLimit {
Expand All @@ -1305,8 +1319,8 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types.
} else {
// If we're exceeding limits but haven't reached a large enough memory gap,
// warn the user that the system is becoming unstable.
if chosen < lastWrite+TriesInMemory && bc.gcproc >= 2*bc.cacheConfig.TrieTimeLimit {
log.Info("State in memory for too long, committing", "time", bc.gcproc, "allowance", bc.cacheConfig.TrieTimeLimit, "optimum", float64(chosen-lastWrite)/TriesInMemory)
if chosen < lastWrite+retain && bc.gcproc >= 2*bc.cacheConfig.TrieTimeLimit {
log.Info("State in memory for too long, committing", "time", bc.gcproc, "allowance", bc.cacheConfig.TrieTimeLimit, "optimum", float64(chosen-lastWrite)/float64(retain))
}
// Flush an entire trie and restart the counters
triedb.Commit(header.Root, true, nil)
Expand Down
62 changes: 62 additions & 0 deletions core/blockchain_arbitrum.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,23 @@
package core

import (
"time"

"github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/rpc"
)

// WriteBlockAndSetHeadWithTime also counts processTime, which will cause intermittent TrieDirty cache writes
func (bc *BlockChain) WriteBlockAndSetHeadWithTime(block *types.Block, receipts []*types.Receipt, logs []*types.Log, state *state.StateDB, emitHeadEvent bool, processTime time.Duration) (status WriteStatus, err error) {
if !bc.chainmu.TryLock() {
return NonStatTy, errChainStopped
}
defer bc.chainmu.Unlock()
bc.gcproc += processTime
return bc.writeBlockAndSetHead(block, receipts, logs, state, emitHeadEvent)
}

func (bc *BlockChain) ReorgToOldBlock(newHead *types.Block) error {
bc.wg.Add(1)
defer bc.wg.Done()
Expand Down Expand Up @@ -54,3 +67,52 @@ func (bc *BlockChain) ClipToPostNitroGenesis(blockNum rpc.BlockNumber) (rpc.Bloc
}
return blockNum, currentBlock
}

// finds the number of blocks that aren't prunable
func (bc *BlockChain) FindRetentionBound() uint64 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably not needed, but we might be able to improve performance here by remembering the previous retention bound between calls as a hint for the search

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this would work better with a little-different search algo.. leaving it to later

tsahee marked this conversation as resolved.
Show resolved Hide resolved
minimumSpan := bc.cacheConfig.TriesInMemory
minimumAge := uint64(bc.cacheConfig.TrieRetention.Seconds())

saturatingCast := func(value int64) uint64 {
if value < 0 {
return 0
}
return uint64(value)
}

// enforce that the block be sufficiently deep
current := bc.CurrentBlock()
heightBound := saturatingCast(int64(current.NumberU64()) - int64(minimumSpan) + 1)

// find the left bound to our subsequent binary search
timeBound := heightBound
leap := int64(1)
for timeBound > 0 {
age := current.Time() - bc.GetBlockByNumber(uint64(timeBound)).Time()
if age > minimumAge {
break
}
timeBound = saturatingCast(int64(timeBound) - leap)
leap *= 2
}
if timeBound == heightBound {
return current.NumberU64() - timeBound + 1
}

// Algo: binary search on the interval [a, b] for the first prunable block
// Timebound is a prunable block, if one exists.
// We want to find the first block that's not prunable.
//
a := timeBound // a prunable block, if possible
b := heightBound // not prunable
for a+2 < b {
mid := a/2 + b/2 // a < mid < b
age := current.Time() - bc.GetBlockByNumber(mid).Time()
if age <= minimumAge {
b = mid // mid is not prunable and less than b
} else {
a = mid // mid is prunable, but might equal a
}
}
return current.NumberU64() - a
}
10 changes: 10 additions & 0 deletions core/blockchain_repair_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1767,6 +1767,11 @@ func testRepair(t *testing.T, tt *rewindTest, snapshots bool) {
genesis = (&Genesis{BaseFee: big.NewInt(params.InitialBaseFee)}).MustCommit(db)
engine = ethash.NewFullFaker()
config = &CacheConfig{

// Arbitrum
TriesInMemory: 128,
TrieRetention: 30 * time.Minute,

TrieCleanLimit: 256,
TrieDirtyLimit: 256,
TrieTimeLimit: 5 * time.Minute,
Expand Down Expand Up @@ -1891,6 +1896,11 @@ func TestIssue23496(t *testing.T) {
genesis = (&Genesis{BaseFee: big.NewInt(params.InitialBaseFee)}).MustCommit(db)
engine = ethash.NewFullFaker()
config = &CacheConfig{

// Arbitrum
TriesInMemory: 128,
TrieRetention: 30 * time.Minute,

TrieCleanLimit: 256,
TrieDirtyLimit: 256,
TrieTimeLimit: 5 * time.Minute,
Expand Down
5 changes: 5 additions & 0 deletions core/blockchain_sethead_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1967,6 +1967,11 @@ func testSetHead(t *testing.T, tt *rewindTest, snapshots bool) {
genesis = (&Genesis{BaseFee: big.NewInt(params.InitialBaseFee)}).MustCommit(db)
engine = ethash.NewFullFaker()
config = &CacheConfig{

// Arbitrum
TriesInMemory: 128,
TrieRetention: 30 * time.Minute,

TrieCleanLimit: 256,
TrieDirtyLimit: 256,
TrieTimeLimit: 5 * time.Minute,
Expand Down
15 changes: 15 additions & 0 deletions core/blockchain_snapshot_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,11 @@ func (snaptest *gappedSnapshotTest) test(t *testing.T) {

// Insert a few more blocks without enabling snapshot
var cacheConfig = &CacheConfig{

// Arbitrum
TriesInMemory: 128,
TrieRetention: 30 * time.Minute,

TrieCleanLimit: 256,
TrieDirtyLimit: 256,
TrieTimeLimit: 5 * time.Minute,
Expand Down Expand Up @@ -363,6 +368,11 @@ func (snaptest *wipeCrashSnapshotTest) test(t *testing.T) {
chain.Stop()

config := &CacheConfig{

// Arbitrum
TriesInMemory: 128,
TrieRetention: 30 * time.Minute,

TrieCleanLimit: 256,
TrieDirtyLimit: 256,
TrieTimeLimit: 5 * time.Minute,
Expand All @@ -378,6 +388,11 @@ func (snaptest *wipeCrashSnapshotTest) test(t *testing.T) {

// Restart the chain, the wiper should starts working
config = &CacheConfig{

// Arbitrum
TriesInMemory: 128,
TrieRetention: 30 * time.Minute,

TrieCleanLimit: 256,
TrieDirtyLimit: 256,
TrieTimeLimit: 5 * time.Minute,
Expand Down
5 changes: 5 additions & 0 deletions eth/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,11 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) {
EnablePreimageRecording: config.EnablePreimageRecording,
}
cacheConfig = &core.CacheConfig{

// Arbitrum
TriesInMemory: 128,
TrieRetention: 30 * time.Minute,

TrieCleanLimit: config.TrieCleanCache,
TrieCleanJournal: stack.ResolvePath(config.TrieCleanCacheJournal),
TrieCleanRejournal: config.TrieCleanCacheRejournal,
Expand Down
2 changes: 1 addition & 1 deletion eth/gasprice/gasprice_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ func newTestBackend(t *testing.T, londonBlock *big.Int, pending bool) *testBacke
// Construct testing chain
diskdb := rawdb.NewMemoryDatabase()
gspec.Commit(diskdb)
chain, err := core.NewBlockChain(diskdb, &core.CacheConfig{TrieCleanNoPrefetch: true}, gspec.Config, engine, vm.Config{}, nil, nil)
chain, err := core.NewBlockChain(diskdb, &core.CacheConfig{TrieCleanNoPrefetch: true, TriesInMemory: 128}, gspec.Config, engine, vm.Config{}, nil, nil)
if err != nil {
t.Fatalf("Failed to create local chain, %v", err)
}
Expand Down
5 changes: 5 additions & 0 deletions eth/tracers/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,11 @@ func newTestBackend(t *testing.T, n int, gspec *core.Genesis, generator func(i i
// Import the canonical chain
gspec.MustCommit(backend.chaindb)
cacheConfig := &core.CacheConfig{

// Arbitrum
TriesInMemory: 128,
TrieRetention: 30 * time.Minute,

TrieCleanLimit: 256,
TrieDirtyLimit: 256,
TrieTimeLimit: 5 * time.Minute,
Expand Down
2 changes: 1 addition & 1 deletion miner/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ func newTestWorkerBackend(t *testing.T, chainConfig *params.ChainConfig, engine
}
genesis := gspec.MustCommit(db)

chain, _ := core.NewBlockChain(db, &core.CacheConfig{TrieDirtyDisabled: true}, gspec.Config, engine, vm.Config{}, nil, nil)
chain, _ := core.NewBlockChain(db, &core.CacheConfig{TrieDirtyDisabled: true, TriesInMemory: 128}, gspec.Config, engine, vm.Config{}, nil, nil)
txpool := core.NewTxPool(testTxPoolConfig, chainConfig, chain)

// Generate a small n-block chain and an uncle block for it
Expand Down
2 changes: 1 addition & 1 deletion tests/block_test_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func (t *BlockTest) Run(snapshotter bool) error {
} else {
engine = ethash.NewShared()
}
cache := &core.CacheConfig{TrieCleanLimit: 0}
cache := &core.CacheConfig{TrieCleanLimit: 0, TriesInMemory: 128}
if snapshotter {
cache.SnapshotLimit = 1
cache.SnapshotWait = true
Expand Down