Skip to content

Commit dec5893

Browse files
committed
eth/downloader: nitpick fixes
1 parent f9650b0 commit dec5893

File tree

4 files changed

+54
-55
lines changed

4 files changed

+54
-55
lines changed

eth/downloader/downloader.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -601,6 +601,7 @@ func (d *Downloader) Terminate() {
601601
d.stateBloom.Close()
602602
}
603603
d.quitLock.Unlock()
604+
604605
// Cancel any pending download requests
605606
d.Cancel()
606607
}
@@ -1211,6 +1212,7 @@ func (d *Downloader) fetchParts(deliveryCh chan dataPack, deliver func(dataPack)
12111212
// Create a ticker to detect expired retrieval tasks
12121213
ticker := time.NewTicker(100 * time.Millisecond)
12131214
defer ticker.Stop()
1215+
12141216
update := make(chan struct{}, 1)
12151217

12161218
// Prepare the queue and fetch block parts until the block header fetcher's done

eth/downloader/queue.go

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -335,7 +335,7 @@ func (q *queue) Schedule(headers []*types.Header, from uint64) []*types.Header {
335335
// Results can be called concurrently with Deliver and Schedule,
336336
// but assumes that there are not two simultaneous callers to Results
337337
func (q *queue) Results(block bool) []*fetchResult {
338-
// abort early if there are no items and non-blocking requested
338+
// Abort early if there are no items and non-blocking requested
339339
if !block && !q.resultCache.HasCompletedItems() {
340340
return nil
341341
}
@@ -379,7 +379,8 @@ func (q *queue) Results(block bool) []*fetchResult {
379379
// on the result cache
380380
throttleThreshold := uint64((common.StorageSize(blockCacheMemory) + q.resultSize - 1) / q.resultSize)
381381
q.resultCache.SetThrottleThreshold(throttleThreshold)
382-
// log some info at certain times
382+
383+
// Log some info at certain times
383384
if time.Since(q.lastStatLog) > 10*time.Second {
384385
q.lastStatLog = time.Now()
385386
info := q.Stats()
@@ -392,13 +393,15 @@ func (q *queue) Results(block bool) []*fetchResult {
392393
func (q *queue) Stats() []interface{} {
393394
q.lock.RLock()
394395
defer q.lock.RUnlock()
396+
395397
return q.stats()
396398
}
399+
397400
func (q *queue) stats() []interface{} {
398401
return []interface{}{
399-
"receiptTaskQueue", q.receiptTaskQueue.Size(),
400-
"blockTaskQueue", q.blockTaskQueue.Size(),
401-
"est resultSize", q.resultSize,
402+
"receiptTasks", q.receiptTaskQueue.Size(),
403+
"blockTasks", q.blockTaskQueue.Size(),
404+
"itemSize", q.resultSize,
402405
}
403406
}
404407

@@ -469,11 +472,11 @@ func (q *queue) ReserveReceipts(p *peerConnection, count int) (*fetchRequest, bo
469472
// Note, this method expects the queue lock to be already held for writing. The
470473
// reason the lock is not obtained in here is because the parameters already need
471474
// to access the queue, so they already need a lock anyway.
472-
// returns:
473-
// item - the fetchRequest
474-
// progress, bool - whether any progress was made
475-
// throttle, bool - if the caller should throttle for a while
476-
// error - any error that occcurred
475+
//
476+
// Returns:
477+
// item - the fetchRequest
478+
// progress - whether any progress was made
479+
// throttle - if the caller should throttle for a while
477480
func (q *queue) reserveHeaders(p *peerConnection, count int, taskPool map[common.Hash]*types.Header, taskQueue *prque.Prque,
478481
pendPool map[string]*fetchRequest, kind uint) (*fetchRequest, bool, bool) {
479482
// Short circuit if the pool has been depleted, or if the peer's already
@@ -518,7 +521,7 @@ func (q *queue) reserveHeaders(p *peerConnection, count int, taskPool map[common
518521
}
519522
if err != nil {
520523
// this most definitely should _not_ happen
521-
log.Warn("reserve headers error", "error", err)
524+
log.Warn("Failed to reserve headers", "err", err)
522525
// There are no resultslots available. Leave it in the task queue
523526
break
524527
}

eth/downloader/queue_test.go

Lines changed: 12 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -75,13 +75,15 @@ func init() {
7575
blocks, _ = makeChain(targetBlocks, 0, genesis, true)
7676
emptyChain = &chainData{blocks, 0}
7777
}
78+
7879
func (chain *chainData) headers() []*types.Header {
7980
hdrs := make([]*types.Header, len(chain.blocks))
8081
for i, b := range chain.blocks {
8182
hdrs[i] = b.Header()
8283
}
8384
return hdrs
8485
}
86+
8587
func (chain *chainData) Len() int {
8688
return len(chain.blocks)
8789
}
@@ -95,7 +97,6 @@ func dummyPeer(id string) *peerConnection {
9597
}
9698

9799
func TestBasics(t *testing.T) {
98-
99100
q := newQueue(10)
100101
if !q.Idle() {
101102
t.Errorf("new queue should be idle")
@@ -121,10 +122,7 @@ func TestBasics(t *testing.T) {
121122
// queue that a certain peer will deliver them for us
122123
{
123124
peer := dummyPeer("peer-1")
124-
fetchReq, _, throttle, err := q.ReserveBodies(peer, 50)
125-
if err != nil {
126-
t.Fatal(err)
127-
}
125+
fetchReq, _, throttle := q.ReserveBodies(peer, 50)
128126
if !throttle {
129127
// queue size is only 10, so throttling should occur
130128
t.Fatal("should throttle")
@@ -139,10 +137,8 @@ func TestBasics(t *testing.T) {
139137
}
140138
{
141139
peer := dummyPeer("peer-2")
142-
fetchReq, _, throttle, err := q.ReserveBodies(peer, 50)
143-
if err != nil {
144-
t.Fatal(err)
145-
}
140+
fetchReq, _, throttle := q.ReserveBodies(peer, 50)
141+
146142
// The second peer should hit throttling
147143
if !throttle {
148144
t.Fatalf("should not throttle")
@@ -158,10 +154,7 @@ func TestBasics(t *testing.T) {
158154
// The receipt delivering peer should not be affected
159155
// by the throttling of body deliveries
160156
peer := dummyPeer("peer-3")
161-
fetchReq, _, throttle, err := q.ReserveReceipts(peer, 50)
162-
if err != nil {
163-
t.Fatal(err)
164-
}
157+
fetchReq, _, throttle := q.ReserveReceipts(peer, 50)
165158
if !throttle {
166159
// queue size is only 10, so throttling should occur
167160
t.Fatal("should throttle")
@@ -181,7 +174,6 @@ func TestBasics(t *testing.T) {
181174
}
182175

183176
func TestEmptyBlocks(t *testing.T) {
184-
185177
q := newQueue(10)
186178

187179
q.Prepare(1, FastSync)
@@ -208,10 +200,8 @@ func TestEmptyBlocks(t *testing.T) {
208200
{
209201
// Reserve blocks
210202
peer := dummyPeer("peer-1")
211-
fetchReq, _, _, err := q.ReserveBodies(peer, 50)
212-
if err != nil {
213-
t.Fatal(err)
214-
}
203+
fetchReq, _, _ := q.ReserveBodies(peer, 50)
204+
215205
// there should be nothing to fetch, blocks are empty
216206
if fetchReq != nil {
217207
t.Fatal("there should be no body fetch tasks remaining")
@@ -227,10 +217,8 @@ func TestEmptyBlocks(t *testing.T) {
227217
//fmt.Printf("receiptTaskQueue len: %d\n", q.receiptTaskQueue.Size())
228218
{
229219
peer := dummyPeer("peer-3")
230-
fetchReq, _, _, err := q.ReserveReceipts(peer, 50)
231-
if err != nil {
232-
t.Fatal(err)
233-
}
220+
fetchReq, _, _ := q.ReserveReceipts(peer, 50)
221+
234222
// there should be nothing to fetch, blocks are empty
235223
if fetchReq != nil {
236224
t.Fatal("there should be no body fetch tasks remaining")
@@ -296,7 +284,7 @@ func XTestDelivery(t *testing.T) {
296284
i := 4
297285
for {
298286
peer := dummyPeer(fmt.Sprintf("peer-%d", i))
299-
f, _, _, _ := q.ReserveBodies(peer, rand.Intn(30))
287+
f, _, _ := q.ReserveBodies(peer, rand.Intn(30))
300288
if f != nil {
301289
var emptyList []*types.Header
302290
var txs [][]*types.Transaction
@@ -322,7 +310,7 @@ func XTestDelivery(t *testing.T) {
322310
// reserve receiptfetch
323311
peer := dummyPeer("peer-3")
324312
for {
325-
f, _, _, _ := q.ReserveReceipts(peer, rand.Intn(50))
313+
f, _, _ := q.ReserveReceipts(peer, rand.Intn(50))
326314
if f != nil {
327315
var rcs [][]*types.Receipt
328316
for _, hdr := range f.Headers {

eth/downloader/resultcache.go renamed to eth/downloader/resultstore.go

Lines changed: 26 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,6 @@
1414
// You should have received a copy of the GNU Lesser General Public License
1515
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
1616

17-
// resultcache implements a structure for maintaining fetchResults, tracking their
18-
// download-progress and delivering (finished) results
19-
2017
package downloader
2118

2219
import (
@@ -27,9 +24,10 @@ import (
2724
"github.com/ethereum/go-ethereum/core/types"
2825
)
2926

27+
// resultStore implements a structure for maintaining fetchResults, tracking their
28+
// download-progress and delivering (finished) results.
3029
type resultStore struct {
3130
items []*fetchResult // Downloaded but not yet delivered fetch results
32-
lock *sync.RWMutex // lock protect internals
3331
resultOffset uint64 // Offset of the first cached fetch result in the block chain
3432

3533
// Internal index of first non-completed entry, updated atomically when needed.
@@ -43,20 +41,24 @@ type resultStore struct {
4341
// 8192 possible places. The queue will, at certain times, recalibrate
4442
// this index.
4543
throttleThreshold uint64
44+
45+
lock sync.RWMutex
4646
}
4747

4848
func newResultStore(size int) *resultStore {
4949
return &resultStore{
5050
resultOffset: 0,
5151
items: make([]*fetchResult, size),
52-
lock: new(sync.RWMutex),
5352
throttleThreshold: 3 * uint64(size) / 4, // 75%
5453
}
5554
}
5655

56+
// SetThrottleThreshold updates the throttling threshold based on the requested
57+
// limit and the total queue capacity.
5758
func (r *resultStore) SetThrottleThreshold(threshold uint64) {
5859
r.lock.Lock()
5960
defer r.lock.Unlock()
61+
6062
limit := uint64(len(r.items)) * 3 / 4
6163
if threshold >= limit {
6264
threshold = limit
@@ -66,15 +68,16 @@ func (r *resultStore) SetThrottleThreshold(threshold uint64) {
6668

6769
// AddFetch adds a header for body/receipt fetching. This is used when the queue
6870
// wants to reserve headers for fetching.
71+
//
6972
// It returns the following:
70-
// stale -- if true, this item is already passed, and should not be requested again.
71-
// throttled -- if true, the resultcache is at capacity, and this particular header is not
72-
// prio right now
73-
// fetchResult -- the result to store data into
74-
// err -- any error that occurred
73+
// stale - if true, this item is already passed, and should not be requested again
74+
// throttled - if true, the store is at capacity, this particular header is not prio now
75+
// item - the result to store data into
76+
// err - any error that occurred
7577
func (r *resultStore) AddFetch(header *types.Header, fastSync bool) (stale, throttled bool, item *fetchResult, err error) {
7678
r.lock.Lock()
7779
defer r.lock.Unlock()
80+
7881
var index int
7982
item, index, stale, throttled, err = r.getFetchResult(header.Number.Uint64())
8083
if err != nil || stale || throttled {
@@ -88,20 +91,20 @@ func (r *resultStore) AddFetch(header *types.Header, fastSync bool) (stale, thro
8891
}
8992

9093
// GetDeliverySlot returns the fetchResult for the given header. If the 'stale' flag
91-
// is true, that means the header has already been delivered 'upstream'.
92-
// This method does not bubble up the 'throttle' flag, since it's moot at the
93-
// point in time when the item is downloaded and ready for delivery
94+
// is true, that means the header has already been delivered 'upstream'. This method
95+
// does not bubble up the 'throttle' flag, since it's moot at the point in time when
96+
// the item is downloaded and ready for delivery
9497
func (r *resultStore) GetDeliverySlot(headerNumber uint64) (*fetchResult, bool, error) {
9598
r.lock.RLock()
9699
defer r.lock.RUnlock()
100+
97101
res, _, stale, _, err := r.getFetchResult(headerNumber)
98102
return res, stale, err
99103
}
100104

101-
// getFetchResult returns the fetchResult corresponding to the given item, and the index where
102-
// the result is stored.
105+
// getFetchResult returns the fetchResult corresponding to the given item, and
106+
// the index where the result is stored.
103107
func (r *resultStore) getFetchResult(headerNumber uint64) (item *fetchResult, index int, stale, throttle bool, err error) {
104-
105108
index = int(int64(headerNumber) - int64(r.resultOffset))
106109
throttle = index >= int(r.throttleThreshold)
107110
stale = index < 0
@@ -124,6 +127,7 @@ func (r *resultStore) getFetchResult(headerNumber uint64) (item *fetchResult, in
124127
func (r *resultStore) HasCompletedItems() bool {
125128
r.lock.RLock()
126129
defer r.lock.RUnlock()
130+
127131
if len(r.items) == 0 {
128132
return false
129133
}
@@ -135,7 +139,8 @@ func (r *resultStore) HasCompletedItems() bool {
135139

136140
// countCompleted returns the number of items ready for delivery, stopping at
137141
// the first non-complete item.
138-
// It assumes (at least) rlock is held
142+
//
143+
// The mthod assumes (at least) rlock is held.
139144
func (r *resultStore) countCompleted() int {
140145
// We iterate from the already known complete point, and see
141146
// if any more has completed since last count
@@ -170,18 +175,19 @@ func (r *resultStore) GetCompleted(limit int) []*fetchResult {
170175
for i := len(r.items) - limit; i < len(r.items); i++ {
171176
r.items[i] = nil
172177
}
173-
// Advance the expected block number of the first cache entry.
178+
// Advance the expected block number of the first cache entry
174179
r.resultOffset += uint64(limit)
175-
// And subtract the number of items from our index
176180
atomic.AddInt32(&r.indexIncomplete, int32(-limit))
181+
177182
return results
178183
}
179184

180185
// Prepare initialises the offset with the given block number
181186
func (r *resultStore) Prepare(offset uint64) {
182187
r.lock.Lock()
188+
defer r.lock.Unlock()
189+
183190
if r.resultOffset < offset {
184191
r.resultOffset = offset
185192
}
186-
r.lock.Unlock()
187193
}

0 commit comments

Comments
 (0)