Skip to content

Commit 99b2b1d

Browse files
karalabejagdeep sidhu
authored andcommitted
eth/downloader: remove stale beacon headers as backfilling progresses (ethereum#24670)
* eth/downloader: remove stale beacon headers as backfilling progresses * eth/downloader: remove leftover from a previous design * eth/downloader: do partial beacon cleanups if chain is large * eth/downloader: linter != heart
1 parent 165ffaf commit 99b2b1d

File tree

3 files changed

+147
-16
lines changed

3 files changed

+147
-16
lines changed

eth/downloader/beaconsync.go

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ type beaconBackfiller struct {
3636
syncMode SyncMode // Sync mode to use for backfilling the skeleton chains
3737
success func() // Callback to run on successful sync cycle completion
3838
filling bool // Flag whether the downloader is backfilling or not
39+
filled *types.Header // Last header filled by the last terminated sync loop
3940
started chan struct{} // Notification channel whether the downloader inited
4041
lock sync.Mutex // Mutex protecting the sync lock
4142
}
@@ -48,16 +49,18 @@ func newBeaconBackfiller(dl *Downloader, success func()) backfiller {
4849
}
4950
}
5051

51-
// suspend cancels any background downloader threads.
52-
func (b *beaconBackfiller) suspend() {
52+
// suspend cancels any background downloader threads and returns the last header
53+
// that has been successfully backfilled.
54+
func (b *beaconBackfiller) suspend() *types.Header {
5355
// If no filling is running, don't waste cycles
5456
b.lock.Lock()
5557
filling := b.filling
58+
filled := b.filled
5659
started := b.started
5760
b.lock.Unlock()
5861

5962
if !filling {
60-
return
63+
return filled // Return the filled header on the previous sync completion
6164
}
6265
// A previous filling should be running, though it may happen that it hasn't
6366
// yet started (being done on a new goroutine). Many concurrent beacon head
@@ -69,6 +72,10 @@ func (b *beaconBackfiller) suspend() {
6972
// Now that we're sure the downloader successfully started up, we can cancel
7073
// it safely without running the risk of data races.
7174
b.downloader.Cancel()
75+
76+
// Sync cycle was just terminated, retrieve and return the last filled header.
77+
// Can't use `filled` as that contains a stale value from before cancellation.
78+
return b.downloader.blockchain.CurrentFastBlock().Header()
7279
}
7380

7481
// resume starts the downloader threads for backfilling state and chain data.
@@ -81,6 +88,7 @@ func (b *beaconBackfiller) resume() {
8188
return
8289
}
8390
b.filling = true
91+
b.filled = nil
8492
b.started = make(chan struct{})
8593
mode := b.syncMode
8694
b.lock.Unlock()
@@ -92,6 +100,7 @@ func (b *beaconBackfiller) resume() {
92100
defer func() {
93101
b.lock.Lock()
94102
b.filling = false
103+
b.filled = b.downloader.blockchain.CurrentFastBlock().Header()
95104
b.lock.Unlock()
96105
}()
97106
// If the downloader fails, report an error as in beacon chain mode there

eth/downloader/skeleton.go

Lines changed: 132 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package downloader
1919
import (
2020
"encoding/json"
2121
"errors"
22+
"fmt"
2223
"math/rand"
2324
"sort"
2425
"time"
@@ -148,11 +149,15 @@ type backfiller interface {
148149
// based on the skeleton chain as it might be invalid. The backfiller should
149150
// gracefully handle multiple consecutive suspends without a resume, even
150151
// on initial sartup.
151-
suspend()
152+
//
153+
// The method should return the last block header that has been successfully
154+
// backfilled, or nil if the backfiller was not resumed.
155+
suspend() *types.Header
152156

153157
// resume requests the backfiller to start running fill or snap sync based on
154158
// the skeleton chain as it has successfully been linked. Appending new heads
155159
// to the end of the chain will not result in suspend/resume cycles.
160+
// leaking too much sync logic out to the filler.
156161
resume()
157162
}
158163

@@ -358,8 +363,17 @@ func (s *skeleton) sync(head *types.Header) (*types.Header, error) {
358363
if linked {
359364
s.filler.resume()
360365
}
361-
defer s.filler.suspend()
362-
366+
defer func() {
367+
if filled := s.filler.suspend(); filled != nil {
368+
// If something was filled, try to delete stale sync helpers. If
369+
// unsuccessful, warn the user, but not much else we can do (it's
370+
// a programming error, just let users report an issue and don't
371+
// choke in the meantime).
372+
if err := s.cleanStales(filled); err != nil {
373+
log.Error("Failed to clean stale beacon headers", "err", err)
374+
}
375+
}
376+
}()
363377
// Create a set of unique channels for this sync cycle. We need these to be
364378
// ephemeral so a data race doesn't accidentally deliver something stale on
365379
// a persistent channel across syncs (yup, this happened)
@@ -582,8 +596,16 @@ func (s *skeleton) processNewHead(head *types.Header, force bool) bool {
582596

583597
lastchain := s.progress.Subchains[0]
584598
if lastchain.Tail >= number {
599+
// If the chain is down to a single beacon header, and it is re-announced
600+
// once more, ignore it instead of tearing down sync for a noop.
601+
if lastchain.Head == lastchain.Tail {
602+
if current := rawdb.ReadSkeletonHeader(s.db, number); current.Hash() == head.Hash() {
603+
return false
604+
}
605+
}
606+
// Not a noop / double head announce, abort with a reorg
585607
if force {
586-
log.Warn("Beacon chain reorged", "tail", lastchain.Tail, "newHead", number)
608+
log.Warn("Beacon chain reorged", "tail", lastchain.Tail, "head", lastchain.Head, "newHead", number)
587609
}
588610
return true
589611
}
@@ -943,12 +965,44 @@ func (s *skeleton) processResponse(res *headerResponse) (linked bool, merged boo
943965
// If the beacon chain was linked to the local chain, completely swap out
944966
// all internal progress and abort header synchronization.
945967
if linked {
946-
// Note, linking into the local chain should also mean that there are
947-
// no leftover subchains, but just in case there's some junk due to
948-
// strange conditions or bugs, clean up all internal state.
949-
if len(s.progress.Subchains) > 1 {
950-
log.Error("Cleaning up leftovers after beacon link")
951-
s.progress.Subchains = s.progress.Subchains[:1]
968+
// Linking into the local chain should also mean that there are no
969+
// leftover subchains, but in the case of importing the blocks via
970+
// the engine API, we will not push the subchains forward. This will
971+
// lead to a gap between an old sync cycle and a future one.
972+
if subchains := len(s.progress.Subchains); subchains > 1 {
973+
switch {
974+
// If there are only 2 subchains - the current one and an older
975+
// one - and the old one consists of a single block, then it's
976+
// the expected new sync cycle after some propagated blocks. Log
977+
// it for debugging purposes, explicitly clean and don't escalate.
978+
case subchains == 2 && s.progress.Subchains[1].Head == s.progress.Subchains[1].Tail:
979+
log.Debug("Cleaning previous beacon sync state", "head", s.progress.Subchains[1].Head)
980+
rawdb.DeleteSkeletonHeader(batch, s.progress.Subchains[1].Head)
981+
s.progress.Subchains = s.progress.Subchains[:1]
982+
983+
// If we have more than one header or more than one leftover chain,
984+
// the syncer's internal state is corrupted. Do try to fix it, but
985+
// be very vocal about the fault.
986+
default:
987+
var context []interface{}
988+
989+
for i := range s.progress.Subchains[1:] {
990+
context = append(context, fmt.Sprintf("stale_head_%d", i+1))
991+
context = append(context, s.progress.Subchains[i+1].Head)
992+
context = append(context, fmt.Sprintf("stale_tail_%d", i+1))
993+
context = append(context, s.progress.Subchains[i+1].Tail)
994+
context = append(context, fmt.Sprintf("stale_next_%d", i+1))
995+
context = append(context, s.progress.Subchains[i+1].Next)
996+
}
997+
log.Error("Cleaning spurious beacon sync leftovers", context...)
998+
s.progress.Subchains = s.progress.Subchains[:1]
999+
1000+
// Note, here we didn't actually delete the headers at all,
1001+
// just the metadata. We could implement a cleanup mechanism,
1002+
// but further modifying corrupted state is kind of asking
1003+
// for it. Unless there's a good enough reason to risk it,
1004+
// better to live with the small database junk.
1005+
}
9521006
}
9531007
break
9541008
}
@@ -1023,6 +1077,74 @@ func (s *skeleton) processResponse(res *headerResponse) (linked bool, merged boo
10231077
return linked, merged
10241078
}
10251079

1080+
// cleanStales removes previously synced beacon headers that have become stale
1081+
// due to the downloader backfilling past the tracked tail.
1082+
func (s *skeleton) cleanStales(filled *types.Header) error {
1083+
number := filled.Number.Uint64()
1084+
log.Trace("Cleaning stale beacon headers", "filled", number, "hash", filled.Hash())
1085+
1086+
// If the filled header is below the linked subchain, something's
1087+
// corrupted internally. Report and error and refuse to do anything.
1088+
if number < s.progress.Subchains[0].Tail {
1089+
return fmt.Errorf("filled header below beacon header tail: %d < %d", number, s.progress.Subchains[0].Tail)
1090+
}
1091+
// Subchain seems trimmable, push the tail forward up to the last
1092+
// filled header and delete everything before it - if available. In
1093+
// case we filled past the head, recreate the subchain with a new
1094+
// head to keep it consistent with the data on disk.
1095+
var (
1096+
start = s.progress.Subchains[0].Tail // start deleting from the first known header
1097+
end = number // delete until the requested threshold
1098+
)
1099+
s.progress.Subchains[0].Tail = number
1100+
s.progress.Subchains[0].Next = filled.ParentHash
1101+
1102+
if s.progress.Subchains[0].Head < number {
1103+
// If more headers were filled than available, push the entire
1104+
// subchain forward to keep tracking the node's block imports
1105+
end = s.progress.Subchains[0].Head + 1 // delete the entire original range, including the head
1106+
s.progress.Subchains[0].Head = number // assign a new head (tail is already assigned to this)
1107+
}
1108+
// Execute the trimming and the potential rewiring of the progress
1109+
batch := s.db.NewBatch()
1110+
1111+
if end != number {
1112+
// The entire original skeleton chain was deleted and a new one
1113+
// defined. Make sure the new single-header chain gets pushed to
1114+
// disk to keep internal state consistent.
1115+
rawdb.WriteSkeletonHeader(batch, filled)
1116+
}
1117+
s.saveSyncStatus(batch)
1118+
for n := start; n < end; n++ {
1119+
// If the batch grew too big, flush it and continue with a new batch.
1120+
// The catch is that the sync metadata needs to reflect the actually
1121+
// flushed state, so temporarily change the subchain progress and
1122+
// revert after the flush.
1123+
if batch.ValueSize() >= ethdb.IdealBatchSize {
1124+
tmpTail := s.progress.Subchains[0].Tail
1125+
tmpNext := s.progress.Subchains[0].Next
1126+
1127+
s.progress.Subchains[0].Tail = n
1128+
s.progress.Subchains[0].Next = rawdb.ReadSkeletonHeader(s.db, n).ParentHash
1129+
s.saveSyncStatus(batch)
1130+
1131+
if err := batch.Write(); err != nil {
1132+
log.Crit("Failed to write beacon trim data", "err", err)
1133+
}
1134+
batch.Reset()
1135+
1136+
s.progress.Subchains[0].Tail = tmpTail
1137+
s.progress.Subchains[0].Next = tmpNext
1138+
s.saveSyncStatus(batch)
1139+
}
1140+
rawdb.DeleteSkeletonHeader(batch, n)
1141+
}
1142+
if err := batch.Write(); err != nil {
1143+
log.Crit("Failed to write beacon trim data", "err", err)
1144+
}
1145+
return nil
1146+
}
1147+
10261148
// Bounds retrieves the current head and tail tracked by the skeleton syncer.
10271149
// This method is used by the backfiller, whose life cycle is controlled by the
10281150
// skeleton syncer.

eth/downloader/skeleton_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -55,10 +55,11 @@ func newHookedBackfiller() backfiller {
5555
// based on the skeleton chain as it might be invalid. The backfiller should
5656
// gracefully handle multiple consecutive suspends without a resume, even
5757
// on initial sartup.
58-
func (hf *hookedBackfiller) suspend() {
58+
func (hf *hookedBackfiller) suspend() *types.Header {
5959
if hf.suspendHook != nil {
6060
hf.suspendHook()
6161
}
62+
return nil // we don't really care about header cleanups for now
6263
}
6364

6465
// resume requests the backfiller to start running fill or snap sync based on
@@ -426,7 +427,6 @@ func TestSkeletonSyncExtend(t *testing.T) {
426427
newstate: []*subchain{
427428
{Head: 49, Tail: 49},
428429
},
429-
err: errReorgDenied,
430430
},
431431
// Initialize a sync and try to extend it with a sibling block.
432432
{
@@ -489,7 +489,7 @@ func TestSkeletonSyncExtend(t *testing.T) {
489489

490490
<-wait
491491
if err := skeleton.Sync(tt.extend, false); err != tt.err {
492-
t.Errorf("extension failure mismatch: have %v, want %v", err, tt.err)
492+
t.Errorf("test %d: extension failure mismatch: have %v, want %v", i, err, tt.err)
493493
}
494494
skeleton.Terminate()
495495

0 commit comments

Comments
 (0)