Skip to content

Commit 54a5b2f

Browse files
committed
eth: reqid dispatcher, nuke fast sync, add beacon sync
1 parent b798cbd commit 54a5b2f

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

45 files changed

+3654
-2053
lines changed

cmd/utils/flags.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -206,7 +206,7 @@ var (
206206
defaultSyncMode = ethconfig.Defaults.SyncMode
207207
SyncModeFlag = TextMarshalerFlag{
208208
Name: "syncmode",
209-
Usage: `Blockchain sync mode ("fast", "full", "snap" or "light")`,
209+
Usage: `Blockchain sync mode ("snap", "full" or "light")`,
210210
Value: &defaultSyncMode,
211211
}
212212
GCModeFlag = cli.StringFlag{

core/rawdb/accessors_chain.go

Lines changed: 0 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -247,24 +247,6 @@ func WriteLastPivotNumber(db ethdb.KeyValueWriter, pivot uint64) {
247247
}
248248
}
249249

250-
// ReadFastTrieProgress retrieves the number of tries nodes fast synced to allow
251-
// reporting correct numbers across restarts.
252-
func ReadFastTrieProgress(db ethdb.KeyValueReader) uint64 {
253-
data, _ := db.Get(fastTrieProgressKey)
254-
if len(data) == 0 {
255-
return 0
256-
}
257-
return new(big.Int).SetBytes(data).Uint64()
258-
}
259-
260-
// WriteFastTrieProgress stores the fast sync trie process counter to support
261-
// retrieving it across restarts.
262-
func WriteFastTrieProgress(db ethdb.KeyValueWriter, count uint64) {
263-
if err := db.Put(fastTrieProgressKey, new(big.Int).SetUint64(count).Bytes()); err != nil {
264-
log.Crit("Failed to store fast sync trie progress", "err", err)
265-
}
266-
}
267-
268250
// ReadTxIndexTail retrieves the number of oldest indexed block
269251
// whose transaction indices has been indexed. If the corresponding entry
270252
// is non-existent in database it means the indexing has been finished.

core/rawdb/accessors_snapshot.go

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -208,11 +208,3 @@ func WriteSnapshotSyncStatus(db ethdb.KeyValueWriter, status []byte) {
208208
log.Crit("Failed to store snapshot sync status", "err", err)
209209
}
210210
}
211-
212-
// DeleteSnapshotSyncStatus deletes the serialized sync status saved at the last
213-
// shutdown
214-
func DeleteSnapshotSyncStatus(db ethdb.KeyValueWriter) {
215-
if err := db.Delete(snapshotSyncStatusKey); err != nil {
216-
log.Crit("Failed to remove snapshot sync status", "err", err)
217-
}
218-
}

core/rawdb/accessors_sync.go

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
// Copyright 2021 The go-ethereum Authors
2+
// This file is part of the go-ethereum library.
3+
//
4+
// The go-ethereum library is free software: you can redistribute it and/or modify
5+
// it under the terms of the GNU Lesser General Public License as published by
6+
// the Free Software Foundation, either version 3 of the License, or
7+
// (at your option) any later version.
8+
//
9+
// The go-ethereum library is distributed in the hope that it will be useful,
10+
// but WITHOUT ANY WARRANTY; without even the implied warranty of
11+
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12+
// GNU Lesser General Public License for more details.
13+
//
14+
// You should have received a copy of the GNU Lesser General Public License
15+
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
16+
17+
package rawdb
18+
19+
import (
20+
"bytes"
21+
22+
"github.com/ethereum/go-ethereum/core/types"
23+
"github.com/ethereum/go-ethereum/ethdb"
24+
"github.com/ethereum/go-ethereum/log"
25+
"github.com/ethereum/go-ethereum/rlp"
26+
)
27+
28+
// ReadSkeletonSyncStatus retrieves the serialized sync status saved at shutdown.
29+
func ReadSkeletonSyncStatus(db ethdb.KeyValueReader) []byte {
30+
data, _ := db.Get(skeletonSyncStatusKey)
31+
return data
32+
}
33+
34+
// WriteSkeletonSyncStatus stores the serialized sync status to save at shutdown.
35+
func WriteSkeletonSyncStatus(db ethdb.KeyValueWriter, status []byte) {
36+
if err := db.Put(skeletonSyncStatusKey, status); err != nil {
37+
log.Crit("Failed to store skeleton sync status", "err", err)
38+
}
39+
}
40+
41+
// DeleteSkeletonSyncStatus deletes the serialized sync status saved at the last
42+
// shutdown
43+
func DeleteSkeletonSyncStatus(db ethdb.KeyValueWriter) {
44+
if err := db.Delete(skeletonSyncStatusKey); err != nil {
45+
log.Crit("Failed to remove skeleton sync status", "err", err)
46+
}
47+
}
48+
49+
// ReadSkeletonHeader retrieves a block header from the skeleton sync store,
50+
func ReadSkeletonHeader(db ethdb.KeyValueReader, number uint64) *types.Header {
51+
data, _ := db.Get(skeletonHeaderKey(number))
52+
if len(data) == 0 {
53+
return nil
54+
}
55+
header := new(types.Header)
56+
if err := rlp.Decode(bytes.NewReader(data), header); err != nil {
57+
log.Error("Invalid skeleton header RLP", "number", number, "err", err)
58+
return nil
59+
}
60+
return header
61+
}
62+
63+
// WriteSkeletonHeader stores a block header into the skeleton sync store.
64+
func WriteSkeletonHeader(db ethdb.KeyValueWriter, header *types.Header) {
65+
data, err := rlp.EncodeToBytes(header)
66+
if err != nil {
67+
log.Crit("Failed to RLP encode header", "err", err)
68+
}
69+
key := skeletonHeaderKey(header.Number.Uint64())
70+
if err := db.Put(key, data); err != nil {
71+
log.Crit("Failed to store skeleton header", "err", err)
72+
}
73+
}
74+
75+
// DeleteSkeletonHeader removes all block header data associated with a hash.
76+
func DeleteSkeletonHeader(db ethdb.KeyValueWriter, number uint64) {
77+
if err := db.Delete(skeletonHeaderKey(number)); err != nil {
78+
log.Crit("Failed to delete skeleton header", "err", err)
79+
}
80+
}

core/rawdb/schema.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,9 @@ var (
6363
// snapshotSyncStatusKey tracks the snapshot sync status across restarts.
6464
snapshotSyncStatusKey = []byte("SnapshotSyncStatus")
6565

66+
// skeletonSyncStatusKey tracks the skeleton sync status across restarts.
67+
skeletonSyncStatusKey = []byte("SkeletonSyncStatus")
68+
6669
// txIndexTailKey tracks the oldest block whose transactions have been indexed.
6770
txIndexTailKey = []byte("TransactionIndexTail")
6871

@@ -93,6 +96,8 @@ var (
9396
SnapshotStoragePrefix = []byte("o") // SnapshotStoragePrefix + account hash + storage hash -> storage trie value
9497
CodePrefix = []byte("c") // CodePrefix + code hash -> account code
9598

99+
skeletonHeaderPrefix = []byte("S") // skeletonHeaderPrefox + num (uint64 big endian) -> header
100+
96101
preimagePrefix = []byte("secure-key-") // preimagePrefix + hash -> preimage
97102
configPrefix = []byte("ethereum-config-") // config prefix for the db
98103

@@ -210,6 +215,11 @@ func bloomBitsKey(bit uint, section uint64, hash common.Hash) []byte {
210215
return key
211216
}
212217

218+
// skeletonHeaderKey = skeletonHeaderPrefix + num (uint64 big endian)
219+
func skeletonHeaderKey(number uint64) []byte {
220+
return append(skeletonHeaderPrefix, encodeBlockNumber(number)...)
221+
}
222+
213223
// preimageKey = preimagePrefix + hash
214224
func preimageKey(hash common.Hash) []byte {
215225
return append(preimagePrefix, hash.Bytes()...)

eth/api.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -256,6 +256,16 @@ func (api *PrivateAdminAPI) ImportChain(file string) (bool, error) {
256256
return true, nil
257257
}
258258

259+
// NewHead requests the node to beacon-sync to the designated head header.
260+
func (api *PrivateAdminAPI) NewHead(blob hexutil.Bytes) error {
261+
header := new(types.Header)
262+
if err := rlp.DecodeBytes(blob, header); err != nil {
263+
return err
264+
}
265+
mode, _ := api.eth.handler.chainSync.modeAndLocalHead()
266+
return api.eth.Downloader().BeaconSync(mode, header)
267+
}
268+
259269
// PublicDebugAPI is the collection of Ethereum full node APIs exposed
260270
// over the public debugging endpoint.
261271
type PublicDebugAPI struct {

eth/downloader/beaconsync.go

Lines changed: 215 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,215 @@
1+
// Copyright 2021 The go-ethereum Authors
2+
// This file is part of the go-ethereum library.
3+
//
4+
// The go-ethereum library is free software: you can redistribute it and/or modify
5+
// it under the terms of the GNU Lesser General Public License as published by
6+
// the Free Software Foundation, either version 3 of the License, or
7+
// (at your option) any later version.
8+
//
9+
// The go-ethereum library is distributed in the hope that it will be useful,
10+
// but WITHOUT ANY WARRANTY; without even the implied warranty of
11+
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12+
// GNU Lesser General Public License for more details.
13+
//
14+
// You should have received a copy of the GNU Lesser General Public License
15+
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
16+
17+
package downloader
18+
19+
import (
20+
"sync"
21+
"sync/atomic"
22+
"time"
23+
24+
"github.com/ethereum/go-ethereum/common"
25+
"github.com/ethereum/go-ethereum/core/types"
26+
"github.com/ethereum/go-ethereum/log"
27+
)
28+
29+
// beaconBackfiller is the chain and state backfilling that can be commenced once
30+
// the skeleton syncer has successfully reverse downloaded all the headers up to
31+
// the genesis block or an existing header in the database. Its operation is fully
32+
// directed by the skeleton sync's head/tail events.
33+
type beaconBackfiller struct {
34+
downloader *Downloader // Downloader to direct via this callback implementation
35+
syncMode SyncMode // Sync mode to use for backfilling the skeleton chains
36+
success func() // Callback to run on successful sync cycle completion
37+
filling bool // Flag whether the downloader is backfilling or not
38+
lock sync.Mutex // Mutex protecting the sync lock
39+
}
40+
41+
// newBeaconBackfiller is a helper method to create the backfiller.
42+
func newBeaconBackfiller(dl *Downloader, success func()) backfiller {
43+
return &beaconBackfiller{
44+
downloader: dl,
45+
success: success,
46+
}
47+
}
48+
49+
// suspend cancels any background downloader threads.
50+
func (b *beaconBackfiller) suspend() {
51+
b.downloader.Cancel()
52+
}
53+
54+
// resume starts the downloader threads for backfilling state and chain data.
55+
func (b *beaconBackfiller) resume() {
56+
b.lock.Lock()
57+
b.filling = true
58+
mode := b.syncMode
59+
b.lock.Unlock()
60+
61+
// Start the backfilling on its own thread since the downloader does not have
62+
// its own lifecycle runloop.
63+
go func() {
64+
// Set the backfiller to non-filling when download completes
65+
defer func() {
66+
b.lock.Lock()
67+
b.filling = false
68+
b.lock.Unlock()
69+
}()
70+
// If the downloader fails, report an error as in beacon chain mode there
71+
// should be no errors as long as the chain we're syncing to is valid.
72+
if err := b.downloader.synchronise("", common.Hash{}, nil, mode, true); err != nil {
73+
log.Error("Beacon backfilling failed", "err", err)
74+
return
75+
}
76+
// Synchronization succeeded. Since this happens async, notify the outer
77+
// context to disable snap syncing and enable transaction propagation.
78+
if b.success != nil {
79+
b.success()
80+
}
81+
}()
82+
}
83+
84+
// setMode updates the sync mode from the current one to the requested one. If
85+
// there's an active sync in progress, it will be cancelled and restarted.
86+
func (b *beaconBackfiller) setMode(mode SyncMode) {
87+
// Update the old sync mode and track if it was changed
88+
b.lock.Lock()
89+
updated := b.syncMode != mode
90+
filling := b.filling
91+
b.syncMode = mode
92+
b.lock.Unlock()
93+
94+
// If the sync mode was changed mid-sync, restart. This should never ever
95+
// really happen, we just handle it to detect programming errors.
96+
if !updated || !filling {
97+
return
98+
}
99+
log.Error("Downloader sync mode changed mid-run", "old", mode.String(), "new", mode.String())
100+
b.suspend()
101+
b.resume()
102+
}
103+
104+
// BeaconSync is the Ethereum 2 version of the chain synchronization, where the
105+
// chain is not downloaded from genesis onward, rather from trusted head announces
106+
// backwards.
107+
//
108+
// Internally backfilling and state sync is done the same way, but the header
109+
// retrieval and scheduling is replaced.
110+
func (d *Downloader) BeaconSync(mode SyncMode, head *types.Header) error {
111+
// When the downloader starts a sync cycle, it needs to be aware of the sync
112+
// mode to use (full, snap). To keep the skeleton chain oblivious, inject the
113+
// mode into the backfiller directly.
114+
//
115+
// Super crazy dangerous type cast. Should be fine (TM), we're only using a
116+
// different backfiller implementation for skeleton tests.
117+
d.skeleton.filler.(*beaconBackfiller).setMode(mode)
118+
119+
// Signal the skeleton sync to switch to a new head, however it wants
120+
if err := d.skeleton.Sync(head); err != nil {
121+
return err
122+
}
123+
return nil
124+
}
125+
126+
// findBeaconAncestor tries to locate the common ancestor link of the local chain
127+
// and the beacon chain just requested. In the general case when our node was in
128+
// sync and on the correct chain, checking the top N links should already get us
129+
// a match. In the rare scenario when we ended up on a long reorganisation (i.e.
130+
// none of the head links match), we do a binary search to find the ancestor.
131+
func (d *Downloader) findBeaconAncestor() uint64 {
132+
// Figure out the current local head position
133+
var head *types.Header
134+
135+
switch d.getMode() {
136+
case FullSync:
137+
head = d.blockchain.CurrentBlock().Header()
138+
case SnapSync:
139+
head = d.blockchain.CurrentFastBlock().Header()
140+
default:
141+
head = d.lightchain.CurrentHeader()
142+
}
143+
number := head.Number.Uint64()
144+
145+
// If the head is present in the skeleton chain, return that
146+
if head.Hash() == d.skeleton.Header(number).Hash() {
147+
return number
148+
}
149+
// Head header not present, binary search to find the ancestor
150+
start, end := uint64(0), number
151+
for start+1 < end {
152+
// Split our chain interval in two, and request the hash to cross check
153+
check := (start + end) / 2
154+
155+
h := d.skeleton.Header(check)
156+
n := h.Number.Uint64()
157+
158+
var known bool
159+
switch d.getMode() {
160+
case FullSync:
161+
known = d.blockchain.HasBlock(h.Hash(), n)
162+
case SnapSync:
163+
known = d.blockchain.HasFastBlock(h.Hash(), n)
164+
default:
165+
known = d.lightchain.HasHeader(h.Hash(), n)
166+
}
167+
if !known {
168+
end = check
169+
continue
170+
}
171+
start = check
172+
}
173+
return start
174+
}
175+
176+
// fetchBeaconHeaders feeds skeleton headers to the downloader queue for scheduling
177+
// until sync errors or is finished.
178+
func (d *Downloader) fetchBeaconHeaders(from uint64) error {
179+
head, err := d.skeleton.Head()
180+
if err != nil {
181+
return err
182+
}
183+
for {
184+
// Retrieve a batch of headers and feed it to the header processor
185+
headers := make([]*types.Header, 0, maxHeadersProcess)
186+
for i := 0; i < maxHeadersProcess && from <= head.Number.Uint64(); i++ {
187+
headers = append(headers, d.skeleton.Header(from))
188+
from++
189+
}
190+
select {
191+
case d.headerProcCh <- headers:
192+
case <-d.cancelCh:
193+
return errCanceled
194+
}
195+
// If we still have headers to import, loop and keep pushing them
196+
if from <= head.Number.Uint64() {
197+
continue
198+
}
199+
// If the pivot block is committed, signal header sync termination
200+
if atomic.LoadInt32(&d.committed) == 1 {
201+
d.headerProcCh <- nil
202+
return nil
203+
}
204+
// State sync still going, wait a bit for new headers and retry
205+
select {
206+
case <-time.After(fsHeaderContCheck):
207+
case <-d.cancelCh:
208+
return errCanceled
209+
}
210+
head, err = d.skeleton.Head()
211+
if err != nil {
212+
return err
213+
}
214+
}
215+
}

0 commit comments

Comments
 (0)