Skip to content

Commit b58fe2d

Browse files
committed
core, light: write headerchains in batches
1 parent 7ac6513 commit b58fe2d

File tree

4 files changed

+170
-130
lines changed

4 files changed

+170
-130
lines changed

core/blockchain.go

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2438,12 +2438,7 @@ func (bc *BlockChain) InsertHeaderChain(chain []*types.Header, checkFreq int) (i
24382438

24392439
bc.wg.Add(1)
24402440
defer bc.wg.Done()
2441-
2442-
whFunc := func(header *types.Header) error {
2443-
_, err := bc.hc.WriteHeader(header)
2444-
return err
2445-
}
2446-
return bc.hc.InsertHeaderChain(chain, whFunc, start)
2441+
return bc.hc.InsertHeaderChain(chain, nil, start)
24472442
}
24482443

24492444
// CurrentHeader retrieves the current head header of the canonical chain. The

core/headerchain.go

Lines changed: 161 additions & 108 deletions
Original file line numberDiff line numberDiff line change
@@ -129,107 +129,182 @@ func (hc *HeaderChain) GetBlockNumber(hash common.Hash) *uint64 {
129129
return number
130130
}
131131

132-
// WriteHeader writes a header into the local chain, given that its parent is
133-
// already known. If the total difficulty of the newly inserted header becomes
134-
// greater than the current known TD, the canonical chain is re-routed.
132+
type headerWriteResult struct {
133+
ignored int
134+
imported int
135+
status WriteStatus
136+
lastHash common.Hash
137+
lastHeader *types.Header
138+
}
139+
140+
// WriteHeaders writes a chain of headers into the local chain, given that the parents
141+
// are already known. If the total difficulty of the newly inserted chain becomes
142+
// greater than the current known TD, the canonical chain is reorged.
135143
//
136144
// Note: This method is not concurrent-safe with inserting blocks simultaneously
137145
// into the chain, as side effects caused by reorganisations cannot be emulated
138146
// without the real blocks. Hence, writing headers directly should only be done
139147
// in two scenarios: pure-header mode of operation (light clients), or properly
140148
// separated header/block phases (non-archive clients).
141-
func (hc *HeaderChain) WriteHeader(header *types.Header) (status WriteStatus, err error) {
142-
// Cache some values to prevent constant recalculation
149+
func (hc *HeaderChain) WriteHeaders(headers []*types.Header, postWriteFn PostWriteCallback) (result *headerWriteResult, err error) {
150+
if len(headers) == 0 {
151+
return &headerWriteResult{}, nil
152+
}
153+
ptd := hc.GetTd(headers[0].ParentHash, headers[0].Number.Uint64()-1)
154+
if ptd == nil {
155+
return &headerWriteResult{}, consensus.ErrUnknownAncestor
156+
}
143157
var (
144-
hash = header.Hash()
145-
number = header.Number.Uint64()
158+
lastHeader *types.Header // Last successfully imported header
159+
lastNumber uint64 // Last successfully imported number
160+
lastHash common.Hash
161+
externTd *big.Int // TD of successfully imported chain
162+
inserted []numberHash // Ephemeral lookup of number/hash for the chain
163+
firstInsertedIndex = -1 // Index of the first non-ignored header
146164
)
147-
// Calculate the total difficulty of the header
148-
ptd := hc.GetTd(header.ParentHash, number-1)
149-
if ptd == nil {
150-
return NonStatTy, consensus.ErrUnknownAncestor
151-
}
152-
head := hc.CurrentHeader().Number.Uint64()
153-
localTd := hc.GetTd(hc.currentHeaderHash, head)
154-
externTd := new(big.Int).Add(header.Difficulty, ptd)
155-
156-
// Irrelevant of the canonical status, write the td and header to the database
157-
//
158-
// Note all the components of header(td, hash->number index and header) should
159-
// be written atomically.
160-
headerBatch := hc.chainDb.NewBatch()
161-
rawdb.WriteTd(headerBatch, hash, number, externTd)
162-
rawdb.WriteHeader(headerBatch, header)
163-
if err := headerBatch.Write(); err != nil {
164-
log.Crit("Failed to write header into disk", "err", err)
165+
lastHash, lastNumber = headers[0].ParentHash, headers[0].Number.Uint64()-1 // Already validated above
166+
batch := hc.chainDb.NewBatch()
167+
for i, header := range headers {
168+
// Short circuit insertion if shutting down
169+
if hc.procInterrupt() {
170+
log.Debug("Premature abort during headers import")
171+
// if we haven't done anything yet, we can return
172+
if i == 0 {
173+
return &headerWriteResult{}, errors.New("aborted")
174+
}
175+
// We only 'break' here - since we want to try and keep the
176+
// db consistent
177+
break
178+
}
179+
hash, number := header.Hash(), header.Number.Uint64()
180+
if header.ParentHash != lastHash {
181+
log.Warn("Non-contiguous header insertion", "header.parent", header.ParentHash, "expected", hash, "number", number)
182+
break
183+
}
184+
externTd = new(big.Int).Add(header.Difficulty, ptd)
185+
186+
// If the header is already known, skip it, otherwise store
187+
if !hc.HasHeader(hash, number) {
188+
// Irrelevant of the canonical status, write the td and header to the database
189+
rawdb.WriteTd(batch, hash, number, externTd)
190+
hc.tdCache.Add(hash, new(big.Int).Set(externTd))
191+
192+
rawdb.WriteHeader(batch, header)
193+
inserted = append(inserted, numberHash{number, hash})
194+
hc.headerCache.Add(hash, header)
195+
hc.numberCache.Add(hash, number)
196+
if firstInsertedIndex < 0 {
197+
firstInsertedIndex = i
198+
}
199+
}
200+
lastHeader, lastHash, lastNumber, ptd = header, hash, number, externTd
165201
}
202+
batch.Write()
203+
batch.Reset()
204+
var (
205+
head = hc.CurrentHeader().Number.Uint64()
206+
localTd = hc.GetTd(hc.currentHeaderHash, head)
207+
status = SideStatTy
208+
)
166209
// If the total difficulty is higher than our known, add it to the canonical chain
167210
// Second clause in the if statement reduces the vulnerability to selfish mining.
168211
// Please refer to http://www.cs.cornell.edu/~ie53/publications/btcProcFC.pdf
169212
reorg := externTd.Cmp(localTd) > 0
170213
if !reorg && externTd.Cmp(localTd) == 0 {
171-
if header.Number.Uint64() < head {
214+
if lastNumber < head {
172215
reorg = true
173-
} else if header.Number.Uint64() == head {
216+
} else if lastNumber == head {
174217
reorg = mrand.Float64() < 0.5
175218
}
176219
}
220+
// If the parent of the (first) block is already the canon header,
221+
// we don't have to go backwards to delete canon blocks, but
222+
// simply pile them onto the existing chain
223+
chainAlreadyCanon := headers[0].ParentHash == hc.currentHeaderHash
177224
if reorg {
178225
// If the header can be added into canonical chain, adjust the
179226
// header chain markers(canonical indexes and head header flag).
180227
//
181228
// Note all markers should be written atomically.
182-
183-
// Delete any canonical number assignments above the new head
184-
markerBatch := hc.chainDb.NewBatch()
185-
for i := number + 1; ; i++ {
186-
hash := rawdb.ReadCanonicalHash(hc.chainDb, i)
187-
if hash == (common.Hash{}) {
188-
break
229+
markerBatch := batch // we can reuse the batch to keep allocs down
230+
if !chainAlreadyCanon {
231+
// Delete any canonical number assignments above the new head
232+
for i := lastNumber + 1; ; i++ {
233+
hash := rawdb.ReadCanonicalHash(hc.chainDb, i)
234+
if hash == (common.Hash{}) {
235+
break
236+
}
237+
rawdb.DeleteCanonicalHash(markerBatch, i)
238+
}
239+
// Overwrite any stale canonical number assignments, going
240+
// backwards from the first header in this import
241+
var (
242+
headHash = headers[0].ParentHash // inserted[0].parent?
243+
headNumber = headers[0].Number.Uint64() - 1 // inserted[0].num-1 ?
244+
headHeader = hc.GetHeader(headHash, headNumber)
245+
)
246+
for rawdb.ReadCanonicalHash(hc.chainDb, headNumber) != headHash {
247+
rawdb.WriteCanonicalHash(markerBatch, headHash, headNumber)
248+
headHash = headHeader.ParentHash
249+
headNumber = headHeader.Number.Uint64() - 1
250+
headHeader = hc.GetHeader(headHash, headNumber)
251+
}
252+
// If some of the older headers were already known, but obtained canon-status
253+
// during this import batch, then we need to write that now
254+
// Further down, we continue writing the staus for the ones that
255+
// were not already known
256+
for i := 0; i < firstInsertedIndex; i++ {
257+
hash := headers[i].Hash()
258+
num := headers[i].Number.Uint64()
259+
rawdb.WriteCanonicalHash(markerBatch, hash, num)
260+
rawdb.WriteHeadHeaderHash(markerBatch, hash)
189261
}
190-
rawdb.DeleteCanonicalHash(markerBatch, i)
191262
}
192-
193-
// Overwrite any stale canonical number assignments
194-
var (
195-
headHash = header.ParentHash
196-
headNumber = header.Number.Uint64() - 1
197-
headHeader = hc.GetHeader(headHash, headNumber)
198-
)
199-
for rawdb.ReadCanonicalHash(hc.chainDb, headNumber) != headHash {
200-
rawdb.WriteCanonicalHash(markerBatch, headHash, headNumber)
201-
202-
headHash = headHeader.ParentHash
203-
headNumber = headHeader.Number.Uint64() - 1
204-
headHeader = hc.GetHeader(headHash, headNumber)
263+
// Extend the canonical chain with the new headers
264+
for _, hn := range inserted {
265+
rawdb.WriteCanonicalHash(markerBatch, hn.hash, hn.number)
266+
rawdb.WriteHeadHeaderHash(markerBatch, hn.hash)
205267
}
206-
// Extend the canonical chain with the new header
207-
rawdb.WriteCanonicalHash(markerBatch, hash, number)
208-
rawdb.WriteHeadHeaderHash(markerBatch, hash)
209268
if err := markerBatch.Write(); err != nil {
210269
log.Crit("Failed to write header markers into disk", "err", err)
211270
}
271+
markerBatch.Reset()
212272
// Last step update all in-memory head header markers
213-
hc.currentHeaderHash = hash
214-
hc.currentHeader.Store(types.CopyHeader(header))
215-
headHeaderGauge.Update(header.Number.Int64())
273+
hc.currentHeaderHash = lastHash
274+
hc.currentHeader.Store(types.CopyHeader(lastHeader))
275+
headHeaderGauge.Update(lastHeader.Number.Int64())
216276

217277
status = CanonStatTy
218-
} else {
219-
status = SideStatTy
220278
}
221-
hc.tdCache.Add(hash, externTd)
222-
hc.headerCache.Add(hash, header)
223-
hc.numberCache.Add(hash, number)
224-
return
279+
// Execute any post-write callback function
280+
// - unless we're exiting
281+
// - and unless we ignored everything
282+
if postWriteFn != nil && !hc.procInterrupt() && firstInsertedIndex >= 0 {
283+
// TODO: Is it really necessary to invoke this N times, instead of just
284+
// invoking it for the last header?
285+
// It is only used for lightchain event aggregation
286+
for _, header := range headers[firstInsertedIndex:] {
287+
if header.Number.Uint64() > lastNumber {
288+
break
289+
}
290+
postWriteFn(header, status)
291+
}
292+
}
293+
return &headerWriteResult{
294+
ignored: len(headers) - len(inserted),
295+
imported: len(inserted),
296+
status: status,
297+
lastHash: lastHash,
298+
lastHeader: lastHeader,
299+
}, nil
225300
}
226301

227-
// WhCallback is a callback function for inserting individual headers.
228-
// A callback is used for two reasons: first, in a LightChain, status should be
229-
// processed and light chain events sent, while in a BlockChain this is not
230-
// necessary since chain events are sent after inserting blocks. Second, the
231-
// header writes should be protected by the parent chain mutex individually.
232-
type WhCallback func(*types.Header) error
302+
// PostWriteCallback is a callback function for inserting headers,
303+
// which is called after each header is inserted.
304+
// The reson for having it is:
305+
// In light-chain mode, status should be processed and light chain events sent,
306+
// whereas in a non-light mode this is not necessary since chain events are sent after inserting blocks.
307+
type PostWriteCallback func(header *types.Header, status WriteStatus)
233308

234309
func (hc *HeaderChain) ValidateHeaderChain(chain []*types.Header, checkFreq int) (int, error) {
235310
// Do a sanity check that the provided chain is actually ordered and linked
@@ -282,55 +357,33 @@ func (hc *HeaderChain) ValidateHeaderChain(chain []*types.Header, checkFreq int)
282357
return 0, nil
283358
}
284359

285-
// InsertHeaderChain attempts to insert the given header chain in to the local
286-
// chain, possibly creating a reorg. If an error is returned, it will return the
287-
// index number of the failing header as well an error describing what went wrong.
288-
//
289-
// The verify parameter can be used to fine tune whether nonce verification
290-
// should be done or not. The reason behind the optional check is because some
291-
// of the header retrieval mechanisms already need to verfy nonces, as well as
292-
// because nonces can be verified sparsely, not needing to check each.
293-
func (hc *HeaderChain) InsertHeaderChain(chain []*types.Header, writeHeader WhCallback, start time.Time) (int, error) {
294-
// Collect some import statistics to report on
295-
stats := struct{ processed, ignored int }{}
296-
// All headers passed verification, import them into the database
297-
for i, header := range chain {
298-
// Short circuit insertion if shutting down
299-
if hc.procInterrupt() {
300-
log.Debug("Premature abort during headers import")
301-
return i, errors.New("aborted")
302-
}
303-
// If the header's already known, skip it, otherwise store
304-
hash := header.Hash()
305-
if hc.HasHeader(hash, header.Number.Uint64()) {
306-
externTd := hc.GetTd(hash, header.Number.Uint64())
307-
localTd := hc.GetTd(hc.currentHeaderHash, hc.CurrentHeader().Number.Uint64())
308-
if externTd == nil || externTd.Cmp(localTd) <= 0 {
309-
stats.ignored++
310-
continue
311-
}
312-
}
313-
if err := writeHeader(header); err != nil {
314-
return i, err
315-
}
316-
stats.processed++
360+
// InsertHeaderChain inserts the given headers, and returns the
361+
// index at which something went wrong, and the error (if any)
362+
// The caller should hold applicable mutexes
363+
func (hc *HeaderChain) InsertHeaderChain(chain []*types.Header, postWriteFn PostWriteCallback, start time.Time) (int, error) {
364+
if hc.procInterrupt() {
365+
return 0, errors.New("aborted")
317366
}
367+
res, err := hc.WriteHeaders(chain, postWriteFn)
318368
// Report some public statistics so the user has a clue what's going on
319-
last := chain[len(chain)-1]
320-
321369
context := []interface{}{
322-
"count", stats.processed, "elapsed", common.PrettyDuration(time.Since(start)),
323-
"number", last.Number, "hash", last.Hash(),
370+
"count", res.imported,
371+
"elapsed", common.PrettyDuration(time.Since(start)),
324372
}
325-
if timestamp := time.Unix(int64(last.Time), 0); time.Since(timestamp) > time.Minute {
326-
context = append(context, []interface{}{"age", common.PrettyAge(timestamp)}...)
373+
if err != nil {
374+
context = append(context, "err", err)
375+
}
376+
if last := res.lastHeader; last != nil {
377+
context = append(context, "number", last.Number, "hash", res.lastHash)
378+
if timestamp := time.Unix(int64(last.Time), 0); time.Since(timestamp) > time.Minute {
379+
context = append(context, []interface{}{"age", common.PrettyAge(timestamp)}...)
380+
}
327381
}
328-
if stats.ignored > 0 {
329-
context = append(context, []interface{}{"ignored", stats.ignored}...)
382+
if res.ignored > 0 {
383+
context = append(context, []interface{}{"ignored", res.ignored}...)
330384
}
331385
log.Info("Imported new block headers", context...)
332-
333-
return 0, nil
386+
return 0, err
334387
}
335388

336389
// GetBlockHashesFromHash retrieves a number of block hashes starting at a given

core/headerchain_test.go

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -54,20 +54,14 @@ func testInsert(t *testing.T, hc *HeaderChain, chain []*types.Header, expInsert,
5454
t.Helper()
5555
gotInsert, gotCanon, gotSide := 0, 0, 0
5656

57-
_, err := hc.InsertHeaderChain(chain, func(header *types.Header) error {
58-
status, err := hc.WriteHeader(header)
59-
if err != nil{
60-
return err
61-
}
57+
_, err := hc.InsertHeaderChain(chain, func(header *types.Header, status WriteStatus) {
6258
gotInsert++
6359
switch status {
6460
case CanonStatTy:
6561
gotCanon++
6662
default:
6763
gotSide++
6864
}
69-
return nil
70-
7165
}, time.Now())
7266

7367
if gotInsert != expInsert {
@@ -109,7 +103,7 @@ func TestHeaderInsertion(t *testing.T) {
109103
t.Fatal(err)
110104
}
111105

112-
// Inserting 64 indentical headers, expecting
106+
// Inserting 64 inentical headers, expecting
113107
// 0 callbacks, 0 canon-status, 0 sidestatus,
114108
if err := testInsert(t, hc, chainA[:64], 0, 0, 0); err != nil {
115109
t.Fatal(err)

light/lightchain.go

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -397,21 +397,19 @@ func (lc *LightChain) InsertHeaderChain(chain []*types.Header, checkFreq int) (i
397397
defer lc.wg.Done()
398398

399399
var events []interface{}
400-
whFunc := func(header *types.Header) error {
401-
status, err := lc.hc.WriteHeader(header)
402-
400+
postWriteCallback := func(header *types.Header, status core.WriteStatus) {
401+
hash := header.Hash()
403402
switch status {
404403
case core.CanonStatTy:
405-
log.Debug("Inserted new header", "number", header.Number, "hash", header.Hash())
406-
events = append(events, core.ChainEvent{Block: types.NewBlockWithHeader(header), Hash: header.Hash()})
404+
log.Debug("Inserted new header", "number", header.Number, "hash", hash)
405+
events = append(events, core.ChainEvent{Block: types.NewBlockWithHeader(header), Hash: hash})
407406

408407
case core.SideStatTy:
409-
log.Debug("Inserted forked header", "number", header.Number, "hash", header.Hash())
408+
log.Debug("Inserted forked header", "number", header.Number, "hash", hash)
410409
events = append(events, core.ChainSideEvent{Block: types.NewBlockWithHeader(header)})
411410
}
412-
return err
413411
}
414-
i, err := lc.hc.InsertHeaderChain(chain, whFunc, start)
412+
i, err := lc.hc.InsertHeaderChain(chain, postWriteCallback, start)
415413
lc.postChainEvents(events)
416414
return i, err
417415
}

0 commit comments

Comments
 (0)