Skip to content

Commit

Permalink
eth/fetcher, trie: unit test reliability fixes (ethereum#23020)
Browse files Browse the repository at this point in the history
Some tests take quite some time during exit, which I think causes
some appveyor fails like this:

    https://ci.appveyor.com/project/ethereum/go-ethereum/builds/39511210/job/xhom84eg2e4uulq3

One of the things that seem to take time during exit is waiting
(up to 100ms) for the syncbloom to close. This PR changes it to use
a channel, instead of looping with a 100ms wait.

This also includes some unrelated changes improving the reliability of
eth/fetcher tests, which fail a lot because they are time-dependent.
  • Loading branch information
holiman authored and atif-konasl committed Oct 15, 2021
1 parent 4bb76a8 commit da8e322
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 25 deletions.
18 changes: 10 additions & 8 deletions eth/fetcher/block_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -833,15 +833,17 @@ func (f *BlockFetcher) importBlocks(peer string, block *types.Block) {
// internal state.
func (f *BlockFetcher) forgetHash(hash common.Hash) {
// Remove all pending announces and decrement DOS counters
for _, announce := range f.announced[hash] {
f.announces[announce.origin]--
if f.announces[announce.origin] <= 0 {
delete(f.announces, announce.origin)
if announceMap, ok := f.announced[hash]; ok {
for _, announce := range announceMap {
f.announces[announce.origin]--
if f.announces[announce.origin] <= 0 {
delete(f.announces, announce.origin)
}
}
delete(f.announced, hash)
if f.announceChangeHook != nil {
f.announceChangeHook(hash, false)
}
}
delete(f.announced, hash)
if f.announceChangeHook != nil {
f.announceChangeHook(hash, false)
}
// Remove any pending fetches and decrement the DOS counters
if announce := f.fetching[hash]; announce != nil {
Expand Down
19 changes: 17 additions & 2 deletions eth/fetcher/block_fetcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -698,6 +698,7 @@ func testInvalidNumberAnnouncement(t *testing.T, light bool) {
badBodyFetcher := tester.makeBodyFetcher("bad", blocks, 0)

imported := make(chan interface{})
announced := make(chan interface{})
tester.fetcher.importedHook = func(header *types.Header, block *types.Block) {
if light {
if header == nil {
Expand All @@ -712,21 +713,35 @@ func testInvalidNumberAnnouncement(t *testing.T, light bool) {
}
}
// Announce a block with a bad number, check for immediate drop
tester.fetcher.announceChangeHook = func(hash common.Hash, b bool) {
announced <- nil
}
tester.fetcher.Notify("bad", hashes[0], 2, time.Now().Add(-arriveTimeout), badHeaderFetcher, badBodyFetcher)
verifyAnnounce := func() {
for i := 0; i < 2; i++ {
select {
case <-announced:
continue
case <-time.After(1 * time.Second):
t.Fatal("announce timeout")
return
}
}
}
verifyAnnounce()
verifyImportEvent(t, imported, false)

tester.lock.RLock()
dropped := tester.drops["bad"]
tester.lock.RUnlock()

if !dropped {
t.Fatalf("peer with invalid numbered announcement not dropped")
}

goodHeaderFetcher := tester.makeHeaderFetcher("good", blocks, -gatherSlack)
goodBodyFetcher := tester.makeBodyFetcher("good", blocks, 0)
// Make sure a good announcement passes without a drop
tester.fetcher.Notify("good", hashes[0], 1, time.Now().Add(-arriveTimeout), goodHeaderFetcher, goodBodyFetcher)
verifyAnnounce()
verifyImportEvent(t, imported, true)

tester.lock.RLock()
Expand Down
32 changes: 17 additions & 15 deletions trie/sync_bloom.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,12 @@ var (
// provided disk database on creation in a background thread and will only start
// returning live results once that's finished.
type SyncBloom struct {
bloom *bloomfilter.Filter
inited uint32
closer sync.Once
closed uint32
pend sync.WaitGroup
bloom *bloomfilter.Filter
inited uint32
closer sync.Once
closed uint32
pend sync.WaitGroup
closeCh chan struct{}
}

// NewSyncBloom creates a new bloom filter of the given size (in megabytes) and
Expand All @@ -64,7 +65,8 @@ func NewSyncBloom(memory uint64, database ethdb.Iteratee) *SyncBloom {

// Assemble the fast sync bloom and init it from previous sessions
b := &SyncBloom{
bloom: bloom,
bloom: bloom,
closeCh: make(chan struct{}),
}
b.pend.Add(2)
go func() {
Expand Down Expand Up @@ -125,16 +127,15 @@ func (b *SyncBloom) init(database ethdb.Iteratee) {
// meter periodically recalculates the false positive error rate of the bloom
// filter and reports it in a metric.
func (b *SyncBloom) meter() {
// check every second
tick := time.NewTicker(1 * time.Second)
for {
// Report the current error ration. No floats, lame, scale it up.
bloomErrorGauge.Update(int64(b.bloom.FalsePosititveProbability() * 100000))

// Wait one second, but check termination more frequently
for i := 0; i < 10; i++ {
if atomic.LoadUint32(&b.closed) == 1 {
return
}
time.Sleep(100 * time.Millisecond)
select {
case <-tick.C:
// Report the current error ration. No floats, lame, scale it up.
bloomErrorGauge.Update(int64(b.bloom.FalsePosititveProbability() * 100000))
case <-b.closeCh:
return
}
}
}
Expand All @@ -145,6 +146,7 @@ func (b *SyncBloom) Close() error {
b.closer.Do(func() {
// Ensure the initializer is stopped
atomic.StoreUint32(&b.closed, 1)
close(b.closeCh)
b.pend.Wait()

// Wipe the bloom, but mark it "uninited" just in case someone attempts an access
Expand Down

0 comments on commit da8e322

Please sign in to comment.