Skip to content

Commit 40b6ccf

Browse files
holimanfjl
andauthored
core,les: headerchain import in batches (ethereum#21471)
* core: add test for headerchain inserts * core, light: write headerchains in batches * core: change to one callback per batch of inserted headers + review concerns * core: error-check on batch write * core: unexport writeHeaders * core: remove callback parameter in InsertHeaderChain The semantics of InsertHeaderChain are now much simpler: it is now an all-or-nothing operation. The new WriteStatus return value allows callers to check for the canonicality of the insertion. This change simplifies use of HeaderChain in package les, where the callback was previously used to post chain events. * core: skip some hashing when writing headers * core: less hashing in header validation * core: fix headerchain flaw regarding blacklisted hashes Co-authored-by: Felix Lange <fjl@twurst.com>
1 parent bd848aa commit 40b6ccf

File tree

4 files changed

+309
-140
lines changed

4 files changed

+309
-140
lines changed

core/blockchain.go

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2438,12 +2438,8 @@ func (bc *BlockChain) InsertHeaderChain(chain []*types.Header, checkFreq int) (i
24382438

24392439
bc.wg.Add(1)
24402440
defer bc.wg.Done()
2441-
2442-
whFunc := func(header *types.Header) error {
2443-
_, err := bc.hc.WriteHeader(header)
2444-
return err
2445-
}
2446-
return bc.hc.InsertHeaderChain(chain, whFunc, start)
2441+
_, err := bc.hc.InsertHeaderChain(chain, start)
2442+
return 0, err
24472443
}
24482444

24492445
// CurrentHeader retrieves the current head header of the canonical chain. The

core/headerchain.go

Lines changed: 174 additions & 118 deletions
Original file line numberDiff line numberDiff line change
@@ -129,118 +129,192 @@ func (hc *HeaderChain) GetBlockNumber(hash common.Hash) *uint64 {
129129
return number
130130
}
131131

132-
// WriteHeader writes a header into the local chain, given that its parent is
133-
// already known. If the total difficulty of the newly inserted header becomes
134-
// greater than the current known TD, the canonical chain is re-routed.
132+
type headerWriteResult struct {
133+
status WriteStatus
134+
ignored int
135+
imported int
136+
lastHash common.Hash
137+
lastHeader *types.Header
138+
}
139+
140+
// WriteHeaders writes a chain of headers into the local chain, given that the parents
141+
// are already known. If the total difficulty of the newly inserted chain becomes
142+
// greater than the current known TD, the canonical chain is reorged.
135143
//
136144
// Note: This method is not concurrent-safe with inserting blocks simultaneously
137145
// into the chain, as side effects caused by reorganisations cannot be emulated
138146
// without the real blocks. Hence, writing headers directly should only be done
139147
// in two scenarios: pure-header mode of operation (light clients), or properly
140148
// separated header/block phases (non-archive clients).
141-
func (hc *HeaderChain) WriteHeader(header *types.Header) (status WriteStatus, err error) {
142-
// Cache some values to prevent constant recalculation
149+
func (hc *HeaderChain) writeHeaders(headers []*types.Header) (result *headerWriteResult, err error) {
150+
if len(headers) == 0 {
151+
return &headerWriteResult{}, nil
152+
}
153+
ptd := hc.GetTd(headers[0].ParentHash, headers[0].Number.Uint64()-1)
154+
if ptd == nil {
155+
return &headerWriteResult{}, consensus.ErrUnknownAncestor
156+
}
143157
var (
144-
hash = header.Hash()
145-
number = header.Number.Uint64()
158+
lastNumber = headers[0].Number.Uint64() - 1 // Last successfully imported number
159+
lastHash = headers[0].ParentHash // Last imported header hash
160+
newTD = new(big.Int).Set(ptd) // Total difficulty of inserted chain
161+
162+
lastHeader *types.Header
163+
inserted []numberHash // Ephemeral lookup of number/hash for the chain
164+
firstInserted = -1 // Index of the first non-ignored header
146165
)
147-
// Calculate the total difficulty of the header
148-
ptd := hc.GetTd(header.ParentHash, number-1)
149-
if ptd == nil {
150-
return NonStatTy, consensus.ErrUnknownAncestor
151-
}
152-
head := hc.CurrentHeader().Number.Uint64()
153-
localTd := hc.GetTd(hc.currentHeaderHash, head)
154-
externTd := new(big.Int).Add(header.Difficulty, ptd)
155-
156-
// Irrelevant of the canonical status, write the td and header to the database
157-
//
158-
// Note all the components of header(td, hash->number index and header) should
159-
// be written atomically.
160-
headerBatch := hc.chainDb.NewBatch()
161-
rawdb.WriteTd(headerBatch, hash, number, externTd)
162-
rawdb.WriteHeader(headerBatch, header)
163-
if err := headerBatch.Write(); err != nil {
164-
log.Crit("Failed to write header into disk", "err", err)
166+
167+
batch := hc.chainDb.NewBatch()
168+
for i, header := range headers {
169+
var hash common.Hash
170+
// The headers have already been validated at this point, so we already
171+
// know that it's a contiguous chain, where
172+
// headers[i].Hash() == headers[i+1].ParentHash
173+
if i < len(headers)-1 {
174+
hash = headers[i+1].ParentHash
175+
} else {
176+
hash = header.Hash()
177+
}
178+
number := header.Number.Uint64()
179+
newTD.Add(newTD, header.Difficulty)
180+
181+
// If the header is already known, skip it, otherwise store
182+
if !hc.HasHeader(hash, number) {
183+
// Irrelevant of the canonical status, write the TD and header to the database.
184+
rawdb.WriteTd(batch, hash, number, newTD)
185+
hc.tdCache.Add(hash, new(big.Int).Set(newTD))
186+
187+
rawdb.WriteHeader(batch, header)
188+
inserted = append(inserted, numberHash{number, hash})
189+
hc.headerCache.Add(hash, header)
190+
hc.numberCache.Add(hash, number)
191+
if firstInserted < 0 {
192+
firstInserted = i
193+
}
194+
}
195+
lastHeader, lastHash, lastNumber = header, hash, number
196+
}
197+
198+
// Skip the slow disk write of all headers if interrupted.
199+
if hc.procInterrupt() {
200+
log.Debug("Premature abort during headers import")
201+
return &headerWriteResult{}, errors.New("aborted")
165202
}
203+
// Commit to disk!
204+
if err := batch.Write(); err != nil {
205+
log.Crit("Failed to write headers", "error", err)
206+
}
207+
batch.Reset()
208+
209+
var (
210+
head = hc.CurrentHeader().Number.Uint64()
211+
localTD = hc.GetTd(hc.currentHeaderHash, head)
212+
status = SideStatTy
213+
)
166214
// If the total difficulty is higher than our known, add it to the canonical chain
167215
// Second clause in the if statement reduces the vulnerability to selfish mining.
168216
// Please refer to http://www.cs.cornell.edu/~ie53/publications/btcProcFC.pdf
169-
reorg := externTd.Cmp(localTd) > 0
170-
if !reorg && externTd.Cmp(localTd) == 0 {
171-
if header.Number.Uint64() < head {
217+
reorg := newTD.Cmp(localTD) > 0
218+
if !reorg && newTD.Cmp(localTD) == 0 {
219+
if lastNumber < head {
172220
reorg = true
173-
} else if header.Number.Uint64() == head {
221+
} else if lastNumber == head {
174222
reorg = mrand.Float64() < 0.5
175223
}
176224
}
225+
// If the parent of the (first) block is already the canon header,
226+
// we don't have to go backwards to delete canon blocks, but
227+
// simply pile them onto the existing chain
228+
chainAlreadyCanon := headers[0].ParentHash == hc.currentHeaderHash
177229
if reorg {
178230
// If the header can be added into canonical chain, adjust the
179231
// header chain markers(canonical indexes and head header flag).
180232
//
181233
// Note all markers should be written atomically.
182-
183-
// Delete any canonical number assignments above the new head
184-
markerBatch := hc.chainDb.NewBatch()
185-
for i := number + 1; ; i++ {
186-
hash := rawdb.ReadCanonicalHash(hc.chainDb, i)
187-
if hash == (common.Hash{}) {
188-
break
234+
markerBatch := batch // we can reuse the batch to keep allocs down
235+
if !chainAlreadyCanon {
236+
// Delete any canonical number assignments above the new head
237+
for i := lastNumber + 1; ; i++ {
238+
hash := rawdb.ReadCanonicalHash(hc.chainDb, i)
239+
if hash == (common.Hash{}) {
240+
break
241+
}
242+
rawdb.DeleteCanonicalHash(markerBatch, i)
243+
}
244+
// Overwrite any stale canonical number assignments, going
245+
// backwards from the first header in this import
246+
var (
247+
headHash = headers[0].ParentHash // inserted[0].parent?
248+
headNumber = headers[0].Number.Uint64() - 1 // inserted[0].num-1 ?
249+
headHeader = hc.GetHeader(headHash, headNumber)
250+
)
251+
for rawdb.ReadCanonicalHash(hc.chainDb, headNumber) != headHash {
252+
rawdb.WriteCanonicalHash(markerBatch, headHash, headNumber)
253+
headHash = headHeader.ParentHash
254+
headNumber = headHeader.Number.Uint64() - 1
255+
headHeader = hc.GetHeader(headHash, headNumber)
256+
}
257+
// If some of the older headers were already known, but obtained canon-status
258+
// during this import batch, then we need to write that now
259+
// Further down, we continue writing the staus for the ones that
260+
// were not already known
261+
for i := 0; i < firstInserted; i++ {
262+
hash := headers[i].Hash()
263+
num := headers[i].Number.Uint64()
264+
rawdb.WriteCanonicalHash(markerBatch, hash, num)
265+
rawdb.WriteHeadHeaderHash(markerBatch, hash)
189266
}
190-
rawdb.DeleteCanonicalHash(markerBatch, i)
191267
}
192-
193-
// Overwrite any stale canonical number assignments
194-
var (
195-
headHash = header.ParentHash
196-
headNumber = header.Number.Uint64() - 1
197-
headHeader = hc.GetHeader(headHash, headNumber)
198-
)
199-
for rawdb.ReadCanonicalHash(hc.chainDb, headNumber) != headHash {
200-
rawdb.WriteCanonicalHash(markerBatch, headHash, headNumber)
201-
202-
headHash = headHeader.ParentHash
203-
headNumber = headHeader.Number.Uint64() - 1
204-
headHeader = hc.GetHeader(headHash, headNumber)
268+
// Extend the canonical chain with the new headers
269+
for _, hn := range inserted {
270+
rawdb.WriteCanonicalHash(markerBatch, hn.hash, hn.number)
271+
rawdb.WriteHeadHeaderHash(markerBatch, hn.hash)
205272
}
206-
// Extend the canonical chain with the new header
207-
rawdb.WriteCanonicalHash(markerBatch, hash, number)
208-
rawdb.WriteHeadHeaderHash(markerBatch, hash)
209273
if err := markerBatch.Write(); err != nil {
210274
log.Crit("Failed to write header markers into disk", "err", err)
211275
}
276+
markerBatch.Reset()
212277
// Last step update all in-memory head header markers
213-
hc.currentHeaderHash = hash
214-
hc.currentHeader.Store(types.CopyHeader(header))
215-
headHeaderGauge.Update(header.Number.Int64())
278+
hc.currentHeaderHash = lastHash
279+
hc.currentHeader.Store(types.CopyHeader(lastHeader))
280+
headHeaderGauge.Update(lastHeader.Number.Int64())
216281

282+
// Chain status is canonical since this insert was a reorg.
283+
// Note that all inserts which have higher TD than existing are 'reorg'.
217284
status = CanonStatTy
218-
} else {
219-
status = SideStatTy
220285
}
221-
hc.tdCache.Add(hash, externTd)
222-
hc.headerCache.Add(hash, header)
223-
hc.numberCache.Add(hash, number)
224-
return
225-
}
226286

227-
// WhCallback is a callback function for inserting individual headers.
228-
// A callback is used for two reasons: first, in a LightChain, status should be
229-
// processed and light chain events sent, while in a BlockChain this is not
230-
// necessary since chain events are sent after inserting blocks. Second, the
231-
// header writes should be protected by the parent chain mutex individually.
232-
type WhCallback func(*types.Header) error
287+
if len(inserted) == 0 {
288+
status = NonStatTy
289+
}
290+
return &headerWriteResult{
291+
status: status,
292+
ignored: len(headers) - len(inserted),
293+
imported: len(inserted),
294+
lastHash: lastHash,
295+
lastHeader: lastHeader,
296+
}, nil
297+
}
233298

234299
func (hc *HeaderChain) ValidateHeaderChain(chain []*types.Header, checkFreq int) (int, error) {
235300
// Do a sanity check that the provided chain is actually ordered and linked
236301
for i := 1; i < len(chain); i++ {
237-
if chain[i].Number.Uint64() != chain[i-1].Number.Uint64()+1 || chain[i].ParentHash != chain[i-1].Hash() {
302+
parentHash := chain[i-1].Hash()
303+
if chain[i].Number.Uint64() != chain[i-1].Number.Uint64()+1 || chain[i].ParentHash != parentHash {
238304
// Chain broke ancestry, log a message (programming error) and skip insertion
239305
log.Error("Non contiguous header insert", "number", chain[i].Number, "hash", chain[i].Hash(),
240-
"parent", chain[i].ParentHash, "prevnumber", chain[i-1].Number, "prevhash", chain[i-1].Hash())
306+
"parent", chain[i].ParentHash, "prevnumber", chain[i-1].Number, "prevhash", parentHash)
241307

242308
return 0, fmt.Errorf("non contiguous insert: item %d is #%d [%x…], item %d is #%d [%x…] (parent [%x…])", i-1, chain[i-1].Number,
243-
chain[i-1].Hash().Bytes()[:4], i, chain[i].Number, chain[i].Hash().Bytes()[:4], chain[i].ParentHash[:4])
309+
parentHash.Bytes()[:4], i, chain[i].Number, chain[i].Hash().Bytes()[:4], chain[i].ParentHash[:4])
310+
}
311+
// If the header is a banned one, straight out abort
312+
if BadHashes[parentHash] {
313+
return i - 1, ErrBlacklistedHash
314+
}
315+
// If it's the last header in the cunk, we need to check it too
316+
if i == len(chain)-1 && BadHashes[chain[i].Hash()] {
317+
return i, ErrBlacklistedHash
244318
}
245319
}
246320

@@ -263,16 +337,12 @@ func (hc *HeaderChain) ValidateHeaderChain(chain []*types.Header, checkFreq int)
263337
defer close(abort)
264338

265339
// Iterate over the headers and ensure they all check out
266-
for i, header := range chain {
340+
for i := range chain {
267341
// If the chain is terminating, stop processing blocks
268342
if hc.procInterrupt() {
269343
log.Debug("Premature abort during headers verification")
270344
return 0, errors.New("aborted")
271345
}
272-
// If the header is a banned one, straight out abort
273-
if BadHashes[header.Hash()] {
274-
return i, ErrBlacklistedHash
275-
}
276346
// Otherwise wait for headers checks and ensure they pass
277347
if err := <-results; err != nil {
278348
return i, err
@@ -282,55 +352,41 @@ func (hc *HeaderChain) ValidateHeaderChain(chain []*types.Header, checkFreq int)
282352
return 0, nil
283353
}
284354

285-
// InsertHeaderChain attempts to insert the given header chain in to the local
286-
// chain, possibly creating a reorg. If an error is returned, it will return the
287-
// index number of the failing header as well an error describing what went wrong.
355+
// InsertHeaderChain inserts the given headers.
288356
//
289-
// The verify parameter can be used to fine tune whether nonce verification
290-
// should be done or not. The reason behind the optional check is because some
291-
// of the header retrieval mechanisms already need to verfy nonces, as well as
292-
// because nonces can be verified sparsely, not needing to check each.
293-
func (hc *HeaderChain) InsertHeaderChain(chain []*types.Header, writeHeader WhCallback, start time.Time) (int, error) {
294-
// Collect some import statistics to report on
295-
stats := struct{ processed, ignored int }{}
296-
// All headers passed verification, import them into the database
297-
for i, header := range chain {
298-
// Short circuit insertion if shutting down
299-
if hc.procInterrupt() {
300-
log.Debug("Premature abort during headers import")
301-
return i, errors.New("aborted")
302-
}
303-
// If the header's already known, skip it, otherwise store
304-
hash := header.Hash()
305-
if hc.HasHeader(hash, header.Number.Uint64()) {
306-
externTd := hc.GetTd(hash, header.Number.Uint64())
307-
localTd := hc.GetTd(hc.currentHeaderHash, hc.CurrentHeader().Number.Uint64())
308-
if externTd == nil || externTd.Cmp(localTd) <= 0 {
309-
stats.ignored++
310-
continue
311-
}
312-
}
313-
if err := writeHeader(header); err != nil {
314-
return i, err
315-
}
316-
stats.processed++
357+
// The validity of the headers is NOT CHECKED by this method, i.e. they need to be
358+
// validated by ValidateHeaderChain before calling InsertHeaderChain.
359+
//
360+
// This insert is all-or-nothing. If this returns an error, no headers were written,
361+
// otherwise they were all processed successfully.
362+
//
363+
// The returned 'write status' says if the inserted headers are part of the canonical chain
364+
// or a side chain.
365+
func (hc *HeaderChain) InsertHeaderChain(chain []*types.Header, start time.Time) (WriteStatus, error) {
366+
if hc.procInterrupt() {
367+
return 0, errors.New("aborted")
317368
}
318-
// Report some public statistics so the user has a clue what's going on
319-
last := chain[len(chain)-1]
369+
res, err := hc.writeHeaders(chain)
320370

371+
// Report some public statistics so the user has a clue what's going on
321372
context := []interface{}{
322-
"count", stats.processed, "elapsed", common.PrettyDuration(time.Since(start)),
323-
"number", last.Number, "hash", last.Hash(),
373+
"count", res.imported,
374+
"elapsed", common.PrettyDuration(time.Since(start)),
375+
}
376+
if err != nil {
377+
context = append(context, "err", err)
324378
}
325-
if timestamp := time.Unix(int64(last.Time), 0); time.Since(timestamp) > time.Minute {
326-
context = append(context, []interface{}{"age", common.PrettyAge(timestamp)}...)
379+
if last := res.lastHeader; last != nil {
380+
context = append(context, "number", last.Number, "hash", res.lastHash)
381+
if timestamp := time.Unix(int64(last.Time), 0); time.Since(timestamp) > time.Minute {
382+
context = append(context, []interface{}{"age", common.PrettyAge(timestamp)}...)
383+
}
327384
}
328-
if stats.ignored > 0 {
329-
context = append(context, []interface{}{"ignored", stats.ignored}...)
385+
if res.ignored > 0 {
386+
context = append(context, []interface{}{"ignored", res.ignored}...)
330387
}
331388
log.Info("Imported new block headers", context...)
332-
333-
return 0, nil
389+
return res.status, err
334390
}
335391

336392
// GetBlockHashesFromHash retrieves a number of block hashes starting at a given

0 commit comments

Comments
 (0)