@@ -34,14 +34,19 @@ var ErrNotRequested = errors.New("not requested")
34
34
// node it already processed previously.
35
35
var ErrAlreadyProcessed = errors .New ("already processed" )
36
36
37
+ // maxFetchesPerDepth is the maximum number of pending trie nodes per depth. The
38
+ // role of this value is to limit the number of trie nodes that get expanded in
39
+ // memory if the node was configured with a significant number of peers.
40
+ const maxFetchesPerDepth = 16384
41
+
37
42
// request represents a scheduled or already in-flight state retrieval request.
38
43
type request struct {
44
+ path []byte // Merkle path leading to this node for prioritization
39
45
hash common.Hash // Hash of the node data content to retrieve
40
46
data []byte // Data content of the node, cached until all subtrees complete
41
47
code bool // Whether this is a code entry
42
48
43
49
parents []* request // Parent state nodes referencing this entry (notify all upon completion)
44
- depth int // Depth level within the trie the node is located to prioritise DFS
45
50
deps int // Number of dependencies before allowed to commit this node
46
51
47
52
callback LeafCallback // Callback to invoke if a leaf node it reached on this branch
@@ -89,6 +94,7 @@ type Sync struct {
89
94
nodeReqs map [common.Hash ]* request // Pending requests pertaining to a trie node hash
90
95
codeReqs map [common.Hash ]* request // Pending requests pertaining to a code hash
91
96
queue * prque.Prque // Priority queue with the pending requests
97
+ fetches map [int ]int // Number of active fetches per trie node depth
92
98
bloom * SyncBloom // Bloom filter for fast state existence checks
93
99
}
94
100
@@ -100,14 +106,15 @@ func NewSync(root common.Hash, database ethdb.KeyValueReader, callback LeafCallb
100
106
nodeReqs : make (map [common.Hash ]* request ),
101
107
codeReqs : make (map [common.Hash ]* request ),
102
108
queue : prque .New (nil ),
109
+ fetches : make (map [int ]int ),
103
110
bloom : bloom ,
104
111
}
105
- ts .AddSubTrie (root , 0 , common.Hash {}, callback )
112
+ ts .AddSubTrie (root , nil , common.Hash {}, callback )
106
113
return ts
107
114
}
108
115
109
116
// AddSubTrie registers a new trie to the sync code, rooted at the designated parent.
110
- func (s * Sync ) AddSubTrie (root common.Hash , depth int , parent common.Hash , callback LeafCallback ) {
117
+ func (s * Sync ) AddSubTrie (root common.Hash , path [] byte , parent common.Hash , callback LeafCallback ) {
111
118
// Short circuit if the trie is empty or already known
112
119
if root == emptyRoot {
113
120
return
@@ -128,8 +135,8 @@ func (s *Sync) AddSubTrie(root common.Hash, depth int, parent common.Hash, callb
128
135
}
129
136
// Assemble the new sub-trie sync request
130
137
req := & request {
138
+ path : path ,
131
139
hash : root ,
132
- depth : depth ,
133
140
callback : callback ,
134
141
}
135
142
// If this sub-trie has a designated parent, link them together
@@ -147,7 +154,7 @@ func (s *Sync) AddSubTrie(root common.Hash, depth int, parent common.Hash, callb
147
154
// AddCodeEntry schedules the direct retrieval of a contract code that should not
148
155
// be interpreted as a trie node, but rather accepted and stored into the database
149
156
// as is.
150
- func (s * Sync ) AddCodeEntry (hash common.Hash , depth int , parent common.Hash ) {
157
+ func (s * Sync ) AddCodeEntry (hash common.Hash , path [] byte , parent common.Hash ) {
151
158
// Short circuit if the entry is empty or already known
152
159
if hash == emptyState {
153
160
return
@@ -170,9 +177,9 @@ func (s *Sync) AddCodeEntry(hash common.Hash, depth int, parent common.Hash) {
170
177
}
171
178
// Assemble the new sub-trie sync request
172
179
req := & request {
173
- hash : hash ,
174
- code : true ,
175
- depth : depth ,
180
+ path : path ,
181
+ hash : hash ,
182
+ code : true ,
176
183
}
177
184
// If this sub-trie has a designated parent, link them together
178
185
if parent != (common.Hash {}) {
@@ -190,7 +197,18 @@ func (s *Sync) AddCodeEntry(hash common.Hash, depth int, parent common.Hash) {
190
197
func (s * Sync ) Missing (max int ) []common.Hash {
191
198
var requests []common.Hash
192
199
for ! s .queue .Empty () && (max == 0 || len (requests ) < max ) {
193
- requests = append (requests , s .queue .PopItem ().(common.Hash ))
200
+ // Retrieve th enext item in line
201
+ item , prio := s .queue .Peek ()
202
+
203
+ // If we have too many already-pending tasks for this depth, throttle
204
+ depth := int (prio >> 56 )
205
+ if s .fetches [depth ] > maxFetchesPerDepth {
206
+ break
207
+ }
208
+ // Item is allowed to be scheduled, add it to the task list
209
+ s .queue .Pop ()
210
+ s .fetches [depth ]++
211
+ requests = append (requests , item .(common.Hash ))
194
212
}
195
213
return requests
196
214
}
@@ -285,31 +303,35 @@ func (s *Sync) schedule(req *request) {
285
303
// is a trie node and code has same hash. In this case two elements
286
304
// with same hash and same or different depth will be pushed. But it's
287
305
// ok the worst case is the second response will be treated as duplicated.
288
- s .queue .Push (req .hash , int64 (req .depth ))
306
+ prio := int64 (len (req .path )) << 56 // depth >= 128 will never happen, storage leaves will be included in their parents
307
+ for i := 0 ; i < 14 && i < len (req .path ); i ++ {
308
+ prio |= int64 (15 - req .path [i ]) << (52 - i * 4 ) // 15-nibble => lexicographic order
309
+ }
310
+ s .queue .Push (req .hash , prio )
289
311
}
290
312
291
313
// children retrieves all the missing children of a state trie entry for future
292
314
// retrieval scheduling.
293
315
func (s * Sync ) children (req * request , object node ) ([]* request , error ) {
294
316
// Gather all the children of the node, irrelevant whether known or not
295
317
type child struct {
296
- node node
297
- depth int
318
+ path [] byte
319
+ node node
298
320
}
299
321
var children []child
300
322
301
323
switch node := (object ).(type ) {
302
324
case * shortNode :
303
325
children = []child {{
304
- node : node .Val ,
305
- depth : req .depth + len ( node .Key ),
326
+ node : node .Val ,
327
+ path : append ( append ([] byte ( nil ), req .path ... ), node .Key ... ),
306
328
}}
307
329
case * fullNode :
308
330
for i := 0 ; i < 17 ; i ++ {
309
331
if node .Children [i ] != nil {
310
332
children = append (children , child {
311
- node : node .Children [i ],
312
- depth : req .depth + 1 ,
333
+ node : node .Children [i ],
334
+ path : append ( append ([] byte ( nil ), req .path ... ), byte ( i )) ,
313
335
})
314
336
}
315
337
}
@@ -322,7 +344,7 @@ func (s *Sync) children(req *request, object node) ([]*request, error) {
322
344
// Notify any external watcher of a new key/value node
323
345
if req .callback != nil {
324
346
if node , ok := (child .node ).(valueNode ); ok {
325
- if err := req .callback (node , req .hash ); err != nil {
347
+ if err := req .callback (req . path , node , req .hash ); err != nil {
326
348
return nil , err
327
349
}
328
350
}
@@ -346,9 +368,9 @@ func (s *Sync) children(req *request, object node) ([]*request, error) {
346
368
}
347
369
// Locally unknown node, schedule for retrieval
348
370
requests = append (requests , & request {
371
+ path : child .path ,
349
372
hash : hash ,
350
373
parents : []* request {req },
351
- depth : child .depth ,
352
374
callback : req .callback ,
353
375
})
354
376
}
@@ -364,9 +386,11 @@ func (s *Sync) commit(req *request) (err error) {
364
386
if req .code {
365
387
s .membatch .codes [req .hash ] = req .data
366
388
delete (s .codeReqs , req .hash )
389
+ s .fetches [len (req .path )]--
367
390
} else {
368
391
s .membatch .nodes [req .hash ] = req .data
369
392
delete (s .nodeReqs , req .hash )
393
+ s .fetches [len (req .path )]--
370
394
}
371
395
// Check all parents for completion
372
396
for _ , parent := range req .parents {
0 commit comments