Skip to content

Commit 7cf56d6

Browse files
miner: use channels instead of atomics in update loop (#21536)
This PR changes several different things: - Adds test cases for the miner loop - Stops the worker if it wasn't already stopped in worker.Close() - Uses channels instead of atomics in the miner.update() loop Co-authored-by: Felix Lange <fjl@twurst.com>
1 parent d7f02b4 commit 7cf56d6

File tree

3 files changed

+196
-32
lines changed

3 files changed

+196
-32
lines changed

miner/miner.go

Lines changed: 25 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ package miner
2020
import (
2121
"fmt"
2222
"math/big"
23-
"sync/atomic"
2423
"time"
2524

2625
"github.com/ethereum/go-ethereum/common"
@@ -61,19 +60,19 @@ type Miner struct {
6160
eth Backend
6261
engine consensus.Engine
6362
exitCh chan struct{}
64-
65-
canStart int32 // can start indicates whether we can start the mining operation
66-
shouldStart int32 // should start indicates whether we should start after sync
63+
startCh chan common.Address
64+
stopCh chan struct{}
6765
}
6866

6967
func New(eth Backend, config *Config, chainConfig *params.ChainConfig, mux *event.TypeMux, engine consensus.Engine, isLocalBlock func(block *types.Block) bool) *Miner {
7068
miner := &Miner{
71-
eth: eth,
72-
mux: mux,
73-
engine: engine,
74-
exitCh: make(chan struct{}),
75-
worker: newWorker(config, chainConfig, engine, eth, mux, isLocalBlock, true),
76-
canStart: 1,
69+
eth: eth,
70+
mux: mux,
71+
engine: engine,
72+
exitCh: make(chan struct{}),
73+
startCh: make(chan common.Address),
74+
stopCh: make(chan struct{}),
75+
worker: newWorker(config, chainConfig, engine, eth, mux, isLocalBlock, true),
7776
}
7877
go miner.update()
7978

@@ -88,6 +87,7 @@ func (miner *Miner) update() {
8887
events := miner.mux.Subscribe(downloader.StartEvent{}, downloader.DoneEvent{}, downloader.FailedEvent{})
8988
defer events.Unsubscribe()
9089

90+
shouldStart := false
9191
for {
9292
select {
9393
case ev := <-events.Chan():
@@ -96,47 +96,40 @@ func (miner *Miner) update() {
9696
}
9797
switch ev.Data.(type) {
9898
case downloader.StartEvent:
99-
atomic.StoreInt32(&miner.canStart, 0)
100-
if miner.Mining() {
101-
miner.Stop()
102-
atomic.StoreInt32(&miner.shouldStart, 1)
99+
wasMining := miner.Mining()
100+
miner.worker.stop()
101+
if wasMining {
102+
// Resume mining after sync was finished
103+
shouldStart = true
103104
log.Info("Mining aborted due to sync")
104105
}
105106
case downloader.DoneEvent, downloader.FailedEvent:
106-
shouldStart := atomic.LoadInt32(&miner.shouldStart) == 1
107-
108-
atomic.StoreInt32(&miner.canStart, 1)
109-
atomic.StoreInt32(&miner.shouldStart, 0)
110107
if shouldStart {
111-
miner.Start(miner.coinbase)
108+
miner.SetEtherbase(miner.coinbase)
109+
miner.worker.start()
112110
}
113-
// stop immediately and ignore all further pending events
114-
return
115111
}
112+
case addr := <-miner.startCh:
113+
miner.SetEtherbase(addr)
114+
miner.worker.start()
115+
case <-miner.stopCh:
116+
miner.worker.stop()
116117
case <-miner.exitCh:
118+
miner.worker.close()
117119
return
118120
}
119121
}
120122
}
121123

122124
func (miner *Miner) Start(coinbase common.Address) {
123-
atomic.StoreInt32(&miner.shouldStart, 1)
124-
miner.SetEtherbase(coinbase)
125-
126-
if atomic.LoadInt32(&miner.canStart) == 0 {
127-
log.Info("Network syncing, will start miner afterwards")
128-
return
129-
}
130-
miner.worker.start()
125+
miner.startCh <- coinbase
131126
}
132127

133128
func (miner *Miner) Stop() {
134-
miner.worker.stop()
135-
atomic.StoreInt32(&miner.shouldStart, 0)
129+
miner.stopCh <- struct{}{}
136130
}
137131

138132
func (miner *Miner) Close() {
139-
miner.worker.close()
140133
close(miner.exitCh)
141134
}
142135

miner/miner_test.go

Lines changed: 170 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,170 @@
1+
// Copyright 2020 The go-ethereum Authors
2+
// This file is part of the go-ethereum library.
3+
//
4+
// The go-ethereum library is free software: you can redistribute it and/or modify
5+
// it under the terms of the GNU Lesser General Public License as published by
6+
// the Free Software Foundation, either version 3 of the License, or
7+
// (at your option) any later version.
8+
//
9+
// The go-ethereum library is distributed in the hope that it will be useful,
10+
// but WITHOUT ANY WARRANTY; without even the implied warranty of
11+
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12+
// GNU Lesser General Public License for more details.
13+
//
14+
// You should have received a copy of the GNU Lesser General Public License
15+
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
16+
17+
// Package miner implements Ethereum block creation and mining.
18+
package miner
19+
20+
import (
21+
"testing"
22+
"time"
23+
24+
"github.com/ethereum/go-ethereum/common"
25+
"github.com/ethereum/go-ethereum/consensus/ethash"
26+
"github.com/ethereum/go-ethereum/core"
27+
"github.com/ethereum/go-ethereum/core/rawdb"
28+
"github.com/ethereum/go-ethereum/core/state"
29+
"github.com/ethereum/go-ethereum/core/types"
30+
"github.com/ethereum/go-ethereum/core/vm"
31+
"github.com/ethereum/go-ethereum/eth/downloader"
32+
"github.com/ethereum/go-ethereum/ethdb/memorydb"
33+
"github.com/ethereum/go-ethereum/event"
34+
"github.com/ethereum/go-ethereum/params"
35+
"github.com/ethereum/go-ethereum/trie"
36+
)
37+
38+
type mockBackend struct {
39+
bc *core.BlockChain
40+
txPool *core.TxPool
41+
}
42+
43+
func NewMockBackend(bc *core.BlockChain, txPool *core.TxPool) *mockBackend {
44+
return &mockBackend{
45+
bc: bc,
46+
txPool: txPool,
47+
}
48+
}
49+
50+
func (m *mockBackend) BlockChain() *core.BlockChain {
51+
return m.bc
52+
}
53+
54+
func (m *mockBackend) TxPool() *core.TxPool {
55+
return m.txPool
56+
}
57+
58+
type testBlockChain struct {
59+
statedb *state.StateDB
60+
gasLimit uint64
61+
chainHeadFeed *event.Feed
62+
}
63+
64+
func (bc *testBlockChain) CurrentBlock() *types.Block {
65+
return types.NewBlock(&types.Header{
66+
GasLimit: bc.gasLimit,
67+
}, nil, nil, nil, new(trie.Trie))
68+
}
69+
70+
func (bc *testBlockChain) GetBlock(hash common.Hash, number uint64) *types.Block {
71+
return bc.CurrentBlock()
72+
}
73+
74+
func (bc *testBlockChain) StateAt(common.Hash) (*state.StateDB, error) {
75+
return bc.statedb, nil
76+
}
77+
78+
func (bc *testBlockChain) SubscribeChainHeadEvent(ch chan<- core.ChainHeadEvent) event.Subscription {
79+
return bc.chainHeadFeed.Subscribe(ch)
80+
}
81+
82+
func TestMiner(t *testing.T) {
83+
miner, mux := createMiner(t)
84+
miner.Start(common.HexToAddress("0x12345"))
85+
waitForMiningState(t, miner, true)
86+
// Start the downloader
87+
mux.Post(downloader.StartEvent{})
88+
waitForMiningState(t, miner, false)
89+
// Stop the downloader and wait for the update loop to run
90+
mux.Post(downloader.DoneEvent{})
91+
waitForMiningState(t, miner, true)
92+
// Start the downloader and wait for the update loop to run
93+
mux.Post(downloader.StartEvent{})
94+
waitForMiningState(t, miner, false)
95+
// Stop the downloader and wait for the update loop to run
96+
mux.Post(downloader.FailedEvent{})
97+
waitForMiningState(t, miner, true)
98+
}
99+
100+
func TestStartStopMiner(t *testing.T) {
101+
miner, _ := createMiner(t)
102+
waitForMiningState(t, miner, false)
103+
miner.Start(common.HexToAddress("0x12345"))
104+
waitForMiningState(t, miner, true)
105+
miner.Stop()
106+
waitForMiningState(t, miner, false)
107+
}
108+
109+
func TestCloseMiner(t *testing.T) {
110+
miner, _ := createMiner(t)
111+
waitForMiningState(t, miner, false)
112+
miner.Start(common.HexToAddress("0x12345"))
113+
waitForMiningState(t, miner, true)
114+
// Terminate the miner and wait for the update loop to run
115+
miner.Close()
116+
waitForMiningState(t, miner, false)
117+
}
118+
119+
// waitForMiningState waits until either
120+
// * the desired mining state was reached
121+
// * a timeout was reached which fails the test
122+
func waitForMiningState(t *testing.T, m *Miner, mining bool) {
123+
t.Helper()
124+
125+
var state bool
126+
for i := 0; i < 100; i++ {
127+
if state = m.Mining(); state == mining {
128+
return
129+
}
130+
time.Sleep(10 * time.Millisecond)
131+
}
132+
t.Fatalf("Mining() == %t, want %t", state, mining)
133+
}
134+
135+
func createMiner(t *testing.T) (*Miner, *event.TypeMux) {
136+
// Create Ethash config
137+
config := Config{
138+
Etherbase: common.HexToAddress("123456789"),
139+
}
140+
// Create chainConfig
141+
memdb := memorydb.New()
142+
chainDB := rawdb.NewDatabase(memdb)
143+
genesis := core.DeveloperGenesisBlock(15, common.HexToAddress("12345"))
144+
chainConfig, _, err := core.SetupGenesisBlock(chainDB, genesis)
145+
if err != nil {
146+
t.Fatalf("can't create new chain config: %v", err)
147+
}
148+
// Create event Mux
149+
mux := new(event.TypeMux)
150+
// Create consensus engine
151+
engine := ethash.New(ethash.Config{}, []string{}, false)
152+
engine.SetThreads(-1)
153+
// Create isLocalBlock
154+
isLocalBlock := func(block *types.Block) bool {
155+
return true
156+
}
157+
// Create Ethereum backend
158+
limit := uint64(1000)
159+
bc, err := core.NewBlockChain(chainDB, new(core.CacheConfig), chainConfig, engine, vm.Config{}, isLocalBlock, &limit)
160+
if err != nil {
161+
t.Fatalf("can't create new chain %v", err)
162+
}
163+
statedb, _ := state.New(common.Hash{}, state.NewDatabase(rawdb.NewMemoryDatabase()), nil)
164+
blockchain := &testBlockChain{statedb, 10000000, new(event.Feed)}
165+
166+
pool := core.NewTxPool(testTxPoolConfig, params.TestChainConfig, blockchain)
167+
backend := NewMockBackend(bc, pool)
168+
// Create Miner
169+
return New(backend, &config, chainConfig, mux, engine, isLocalBlock), mux
170+
}

miner/worker.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -303,6 +303,7 @@ func (w *worker) isRunning() bool {
303303
// close terminates all background threads maintained by the worker.
304304
// Note the worker does not support being closed multiple times.
305305
func (w *worker) close() {
306+
atomic.StoreInt32(&w.running, 0)
306307
close(w.exitCh)
307308
}
308309

0 commit comments

Comments
 (0)