Skip to content

Commit

Permalink
eth/downloader: use atomic types (ethereum#27030)
Browse files Browse the repository at this point in the history
* eth/downloader: use atomic type

* Update eth/downloader/downloader_test.go

Co-authored-by: Martin Holst Swende <martin@swende.se>

* Update eth/downloader/downloader_test.go

Co-authored-by: Martin Holst Swende <martin@swende.se>

---------

Co-authored-by: Martin Holst Swende <martin@swende.se>
  • Loading branch information
2 people authored and mmsqe committed Dec 7, 2023
1 parent 67a557d commit 5692e25
Show file tree
Hide file tree
Showing 6 changed files with 54 additions and 54 deletions.
3 changes: 1 addition & 2 deletions eth/downloader/beaconsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package downloader
import (
"fmt"
"sync"
"sync/atomic"
"time"

"github.com/ethereum/go-ethereum/common"
Expand Down Expand Up @@ -371,7 +370,7 @@ func (d *Downloader) fetchBeaconHeaders(from uint64) error {
continue
}
// If the pivot block is committed, signal header sync termination
if atomic.LoadInt32(&d.committed) == 1 {
if d.committed.Load() {
select {
case d.headerProcCh <- nil:
return nil
Expand Down
30 changes: 15 additions & 15 deletions eth/downloader/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ type headerTask struct {
}

type Downloader struct {
mode uint32 // Synchronisation mode defining the strategy used (per sync cycle), use d.getMode() to get the SyncMode
mode atomic.Uint32 // Synchronisation mode defining the strategy used (per sync cycle), use d.getMode() to get the SyncMode
mux *event.TypeMux // Event multiplexer to announce sync operation events

checkpoint uint64 // Checkpoint block number to enforce head against (e.g. snap sync)
Expand All @@ -122,9 +122,9 @@ type Downloader struct {

// Status
synchroniseMock func(id string, hash common.Hash) error // Replacement for synchronise during testing
synchronising int32
notified int32
committed int32
synchronising atomic.Bool
notified atomic.Bool
committed atomic.Bool
ancientLimit uint64 // The maximum block number which can be regarded as ancient data.

// Channels
Expand Down Expand Up @@ -292,7 +292,7 @@ func (d *Downloader) Progress() ethereum.SyncProgress {

// Synchronising returns whether the downloader is currently retrieving blocks.
func (d *Downloader) Synchronising() bool {
return atomic.LoadInt32(&d.synchronising) > 0
return d.synchronising.Load()
}

// RegisterPeer injects a new download peer into the set of block source to be
Expand Down Expand Up @@ -392,13 +392,13 @@ func (d *Downloader) synchronise(id string, hash common.Hash, td, ttd *big.Int,
return d.synchroniseMock(id, hash)
}
// Make sure only one goroutine is ever allowed past this point at once
if !atomic.CompareAndSwapInt32(&d.synchronising, 0, 1) {
if !d.synchronising.CompareAndSwap(false, true) {
return errBusy
}
defer atomic.StoreInt32(&d.synchronising, 0)
defer d.synchronising.Store(false)

// Post a user notification of the sync (only once per session)
if atomic.CompareAndSwapInt32(&d.notified, 0, 1) {
if d.notified.CompareAndSwap(false, true) {
log.Info("Block synchronisation started")
}
if mode == SnapSync {
Expand Down Expand Up @@ -435,7 +435,7 @@ func (d *Downloader) synchronise(id string, hash common.Hash, td, ttd *big.Int,
defer d.Cancel() // No matter what, we can't leave the cancel channel open

// Atomically set the requested sync mode
atomic.StoreUint32(&d.mode, uint32(mode))
d.mode.Store(uint32(mode))

// Retrieve the origin peer and initiate the downloading process
var p *peerConnection
Expand All @@ -452,7 +452,7 @@ func (d *Downloader) synchronise(id string, hash common.Hash, td, ttd *big.Int,
}

func (d *Downloader) getMode() SyncMode {
return SyncMode(atomic.LoadUint32(&d.mode))
return SyncMode(d.mode.Load())
}

// syncWithPeer starts a block synchronization based on the hash chain from the
Expand Down Expand Up @@ -562,9 +562,9 @@ func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td, ttd *
rawdb.WriteLastPivotNumber(d.stateDB, pivotNumber)
}
}
d.committed = 1
d.committed.Store(true)
if mode == SnapSync && pivot.Number.Uint64() != 0 {
d.committed = 0
d.committed.Store(false)
}
if mode == SnapSync {
// Set the ancient data limitation. If we are running snap sync, all block
Expand Down Expand Up @@ -1128,7 +1128,7 @@ func (d *Downloader) fetchHeaders(p *peerConnection, from uint64, head uint64) e
// If no more headers are inbound, notify the content fetchers and return
if len(headers) == 0 {
// Don't abort header fetches while the pivot is downloading
if atomic.LoadInt32(&d.committed) == 0 && pivot <= from {
if !d.committed.Load() && pivot <= from {
p.log.Debug("No headers, waiting for pivot commit")
select {
case <-time.After(fsHeaderContCheck):
Expand Down Expand Up @@ -1669,7 +1669,7 @@ func (d *Downloader) processSnapSyncContent() error {
results = append(append([]*fetchResult{oldPivot}, oldTail...), results...)
}
// Split around the pivot block and process the two sides via snap/full sync
if atomic.LoadInt32(&d.committed) == 0 {
if !d.committed.Load() {
latest := results[len(results)-1].Header
// If the height is above the pivot block by 2 sets, it means the pivot
// become stale in the network and it was garbage collected, move to a
Expand Down Expand Up @@ -1794,7 +1794,7 @@ func (d *Downloader) commitPivotBlock(result *fetchResult) error {
if err := d.blockchain.SnapSyncCommitHead(block.Hash()); err != nil {
return err
}
atomic.StoreInt32(&d.committed, 1)
d.committed.Store(true)
return nil
}

Expand Down
25 changes: 13 additions & 12 deletions eth/downloader/downloader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -476,9 +476,10 @@ func testThrottling(t *testing.T, protocol uint, mode SyncMode) {
tester.newPeer("peer", protocol, testChainBase.blocks[1:])

// Wrap the importer to allow stepping
blocked, proceed := uint32(0), make(chan struct{})
var blocked atomic.Uint32
proceed := make(chan struct{})
tester.downloader.chainInsertHook = func(results []*fetchResult) {
atomic.StoreUint32(&blocked, uint32(len(results)))
blocked.Store(uint32(len(results)))
<-proceed
}
// Start a synchronisation concurrently
Expand All @@ -505,7 +506,7 @@ func testThrottling(t *testing.T, protocol uint, mode SyncMode) {
tester.downloader.queue.resultCache.lock.Lock()
{
cached = tester.downloader.queue.resultCache.countCompleted()
frozen = int(atomic.LoadUint32(&blocked))
frozen = int(blocked.Load())
retrieved = int(tester.chain.CurrentSnapBlock().Number.Uint64()) + 1
}
tester.downloader.queue.resultCache.lock.Unlock()
Expand All @@ -528,8 +529,8 @@ func testThrottling(t *testing.T, protocol uint, mode SyncMode) {
t.Fatalf("block count mismatch: have %v, want %v (owned %v, blocked %v, target %v)", cached, blockCacheMaxItems, retrieved, frozen, targetBlocks+1)
}
// Permit the blocked blocks to import
if atomic.LoadUint32(&blocked) > 0 {
atomic.StoreUint32(&blocked, uint32(0))
if blocked.Load() > 0 {
blocked.Store(uint32(0))
proceed <- struct{}{}
}
}
Expand Down Expand Up @@ -786,12 +787,12 @@ func testEmptyShortCircuit(t *testing.T, protocol uint, mode SyncMode) {
tester.newPeer("peer", protocol, chain.blocks[1:])

// Instrument the downloader to signal body requests
bodiesHave, receiptsHave := int32(0), int32(0)
var bodiesHave, receiptsHave atomic.Int32
tester.downloader.bodyFetchHook = func(headers []*types.Header) {
atomic.AddInt32(&bodiesHave, int32(len(headers)))
bodiesHave.Add(int32(len(headers)))
}
tester.downloader.receiptFetchHook = func(headers []*types.Header) {
atomic.AddInt32(&receiptsHave, int32(len(headers)))
receiptsHave.Add(int32(len(headers)))
}
// Synchronise with the peer and make sure all blocks were retrieved
if err := tester.sync("peer", nil, mode); err != nil {
Expand All @@ -811,11 +812,11 @@ func testEmptyShortCircuit(t *testing.T, protocol uint, mode SyncMode) {
receiptsNeeded++
}
}
if int(bodiesHave) != bodiesNeeded {
t.Errorf("body retrieval count mismatch: have %v, want %v", bodiesHave, bodiesNeeded)
if int(bodiesHave.Load()) != bodiesNeeded {
t.Errorf("body retrieval count mismatch: have %v, want %v", bodiesHave.Load(), bodiesNeeded)
}
if int(receiptsHave) != receiptsNeeded {
t.Errorf("receipt retrieval count mismatch: have %v, want %v", receiptsHave, receiptsNeeded)
if int(receiptsHave.Load()) != receiptsNeeded {
t.Errorf("receipt retrieval count mismatch: have %v, want %v", receiptsHave.Load(), receiptsNeeded)
}
}

Expand Down
18 changes: 9 additions & 9 deletions eth/downloader/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ type fetchRequest struct {
// fetchResult is a struct collecting partial results from data fetchers until
// all outstanding pieces complete and the result as a whole can be processed.
type fetchResult struct {
pending int32 // Flag telling what deliveries are outstanding
pending atomic.Int32 // Flag telling what deliveries are outstanding

Header *types.Header
Uncles []*types.Header
Expand All @@ -75,38 +75,38 @@ func newFetchResult(header *types.Header, fastSync bool) *fetchResult {
Header: header,
}
if !header.EmptyBody() {
item.pending |= (1 << bodyType)
item.pending.Store(item.pending.Load() | (1 << bodyType))
} else if header.WithdrawalsHash != nil {
item.Withdrawals = make(types.Withdrawals, 0)
}
if fastSync && !header.EmptyReceipts() {
item.pending |= (1 << receiptType)
item.pending.Store(item.pending.Load() | (1 << receiptType))
}
return item
}

// SetBodyDone flags the body as finished.
func (f *fetchResult) SetBodyDone() {
if v := atomic.LoadInt32(&f.pending); (v & (1 << bodyType)) != 0 {
atomic.AddInt32(&f.pending, -1)
if v := f.pending.Load(); (v & (1 << bodyType)) != 0 {
f.pending.Add(-1)
}
}

// AllDone checks if item is done.
func (f *fetchResult) AllDone() bool {
return atomic.LoadInt32(&f.pending) == 0
return f.pending.Load() == 0
}

// SetReceiptsDone flags the receipts as finished.
func (f *fetchResult) SetReceiptsDone() {
if v := atomic.LoadInt32(&f.pending); (v & (1 << receiptType)) != 0 {
atomic.AddInt32(&f.pending, -2)
if v := f.pending.Load(); (v & (1 << receiptType)) != 0 {
f.pending.Add(-2)
}
}

// Done checks if the given type is done already
func (f *fetchResult) Done(kind uint) bool {
v := atomic.LoadInt32(&f.pending)
v := f.pending.Load()
return v&(1<<kind) == 0
}

Expand Down
8 changes: 4 additions & 4 deletions eth/downloader/resultstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ type resultStore struct {
// Internal index of first non-completed entry, updated atomically when needed.
// If all items are complete, this will equal length(items), so
// *important* : is not safe to use for indexing without checking against length
indexIncomplete int32 // atomic access
indexIncomplete atomic.Int32

// throttleThreshold is the limit up to which we _want_ to fill the
// results. If blocks are large, we want to limit the results to less
Expand Down Expand Up @@ -146,7 +146,7 @@ func (r *resultStore) HasCompletedItems() bool {
func (r *resultStore) countCompleted() int {
// We iterate from the already known complete point, and see
// if any more has completed since last count
index := atomic.LoadInt32(&r.indexIncomplete)
index := r.indexIncomplete.Load()
for ; ; index++ {
if index >= int32(len(r.items)) {
break
Expand All @@ -156,7 +156,7 @@ func (r *resultStore) countCompleted() int {
break
}
}
atomic.StoreInt32(&r.indexIncomplete, index)
r.indexIncomplete.Store(index)
return int(index)
}

Expand All @@ -179,7 +179,7 @@ func (r *resultStore) GetCompleted(limit int) []*fetchResult {
}
// Advance the expected block number of the first cache entry
r.resultOffset += uint64(limit)
atomic.AddInt32(&r.indexIncomplete, int32(-limit))
r.indexIncomplete.Add(int32(-limit))

return results
}
Expand Down
24 changes: 12 additions & 12 deletions eth/downloader/skeleton_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,8 @@ type skeletonTestPeer struct {

serve func(origin uint64) []*types.Header // Hook to allow custom responses

served uint64 // Number of headers served by this peer
dropped uint64 // Flag whether the peer was dropped (stop responding)
served atomic.Uint64 // Number of headers served by this peer
dropped atomic.Uint64 // Flag whether the peer was dropped (stop responding)
}

// newSkeletonTestPeer creates a new mock peer to test the skeleton sync with.
Expand Down Expand Up @@ -113,7 +113,7 @@ func (p *skeletonTestPeer) RequestHeadersByNumber(origin uint64, amount int, ski
// Since skeleton test peer are in-memory mocks, dropping the does not make
// them inaccessible. As such, check a local `dropped` field to see if the
// peer has been dropped and should not respond any more.
if atomic.LoadUint64(&p.dropped) != 0 {
if p.dropped.Load() != 0 {
return nil, errors.New("peer already dropped")
}
// Skeleton sync retrieves batches of headers going backward without gaps.
Expand Down Expand Up @@ -161,7 +161,7 @@ func (p *skeletonTestPeer) RequestHeadersByNumber(origin uint64, amount int, ski
}
}
}
atomic.AddUint64(&p.served, uint64(len(headers)))
p.served.Add(uint64(len(headers)))

hashes := make([]common.Hash, len(headers))
for i, header := range headers {
Expand All @@ -182,7 +182,7 @@ func (p *skeletonTestPeer) RequestHeadersByNumber(origin uint64, amount int, ski
sink <- res
if err := <-res.Done; err != nil {
log.Warn("Skeleton test peer response rejected", "err", err)
atomic.AddUint64(&p.dropped, 1)
p.dropped.Add(1)
}
}()
return req, nil
Expand Down Expand Up @@ -817,7 +817,7 @@ func TestSkeletonSyncRetrievals(t *testing.T) {
dropped := make(map[string]int)
drop := func(peer string) {
if p := peerset.Peer(peer); p != nil {
atomic.AddUint64(&p.peer.(*skeletonTestPeer).dropped, 1)
p.peer.(*skeletonTestPeer).dropped.Add(1)
}
peerset.Unregister(peer)
dropped[peer]++
Expand Down Expand Up @@ -895,14 +895,14 @@ func TestSkeletonSyncRetrievals(t *testing.T) {
if !tt.unpredictable {
var served uint64
for _, peer := range tt.peers {
served += atomic.LoadUint64(&peer.served)
served += peer.served.Load()
}
if served != tt.midserve {
t.Errorf("test %d, mid state: served headers mismatch: have %d, want %d", i, served, tt.midserve)
}
var drops uint64
for _, peer := range tt.peers {
drops += atomic.LoadUint64(&peer.dropped)
drops += peer.dropped.Load()
}
if drops != tt.middrop {
t.Errorf("test %d, mid state: dropped peers mismatch: have %d, want %d", i, drops, tt.middrop)
Expand Down Expand Up @@ -950,20 +950,20 @@ func TestSkeletonSyncRetrievals(t *testing.T) {
if !tt.unpredictable {
served := uint64(0)
for _, peer := range tt.peers {
served += atomic.LoadUint64(&peer.served)
served += peer.served.Load()
}
if tt.newPeer != nil {
served += atomic.LoadUint64(&tt.newPeer.served)
served += tt.newPeer.served.Load()
}
if served != tt.endserve {
t.Errorf("test %d, end state: served headers mismatch: have %d, want %d", i, served, tt.endserve)
}
drops := uint64(0)
for _, peer := range tt.peers {
drops += atomic.LoadUint64(&peer.dropped)
drops += peer.dropped.Load()
}
if tt.newPeer != nil {
drops += atomic.LoadUint64(&tt.newPeer.dropped)
drops += tt.newPeer.dropped.Load()
}
if drops != tt.enddrop {
t.Errorf("test %d, end state: dropped peers mismatch: have %d, want %d", i, drops, tt.middrop)
Expand Down

0 comments on commit 5692e25

Please sign in to comment.