-
Notifications
You must be signed in to change notification settings - Fork 0
Description
一、 可运行的geth源码阅读环境
本文档基于go-etherum v1.9.13,亦是bsc节点fork的eth版本起点。 目标在于以可debug的方式启动geth节点
准备config.toml
# Note: this config doesn't contain the genesis block.
[Eth]
NetworkId = 11133
SyncMode = "fast"
# DiscoveryURLs = ["enrtree://AKA3AM6LPBYEUDMVNU3BSVQJ5AD45Y7YPOHJLEF6W26QOE4VTUDPE@all.rinkeby.ethdisco.net"]
NoPruning = false
NoPrefetch = false
LightPeers = 100
UltraLightFraction = 75
DatabaseCache = 512
DatabaseFreezer = ""
TrieCleanCache = 256
TrieDirtyCache = 256
TrieTimeout = 3600000000000
EnablePreimageRecording = false
EWASMInterpreter = ""
EVMInterpreter = ""
[Eth.Miner]
GasFloor = 8000000
GasCeil = 8000000
GasPrice = 1000000000
Recommit = 3000000000
Noverify = false
[Eth.Ethash]
CacheDir = "ethash"
CachesInMem = 2
CachesOnDisk = 3
CachesLockMmap = false
DatasetDir = "/Users/lixianji/Documents/coding/play/go-ethereum/data/ethash"
DatasetsInMem = 1
DatasetsOnDisk = 2
DatasetsLockMmap = false
PowMode = 0
[Eth.TxPool]
Locals = []
NoLocals = false
Journal = "transactions.rlp"
Rejournal = 3600000000000
PriceLimit = 1
PriceBump = 10
AccountSlots = 16
GlobalSlots = 4096
AccountQueue = 64
GlobalQueue = 1024
Lifetime = 10800000000000
[Eth.GPO]
Blocks = 20
Percentile = 60
[Shh]
MaxMessageSize = 1048576
MinimumAcceptedPOW = 2e-01
RestrictConnectionBetweenLightClients = true
[Node]
DataDir = "/Users/lixianji/Documents/coding/play/go-ethereum/data/chain"
omitempty = ""
IPCPath = "geth.ipc"
HTTPPort = 18545
HTTPVirtualHosts = ["127.0.0.1"]
HTTPModules = ["net", "web3", "eth"]
WSPort = 18546
WSModules = ["net", "web3", "eth"]
GraphQLPort = 18547
GraphQLVirtualHosts = ["localhost"]
[Node.P2P]
MaxPeers = 50
NoDiscovery = false
# BootstrapNodes = ["enode://a24ac7c5484ef4ed0c5eb2d36620ba4e4aa13b8c84684e1b4aab0cebea2ae45cb4d375b77eab56516d34bfbd3c1a833fc51296ff084b770b94fb9028c4d25ccf@52.169.42.101:30303", "enode://343149e4feefa15d882d9fe4ac7d88f885bd05ebb735e547f12e12080a9fa07c8014ca6fd7f373123488102fe5e34111f8509cf0b7de3f5b44339c9f25e87cb8@52.3.158.184:30303", "enode://b6b28890b006743680c52e64e0d16db57f28124885595fa03a562be1d2bf0f3a1da297d56b13da25fb992888fd556d4c1a27b1f39d531bde7de1921c90061cc6@159.89.28.211:30303"]
# BootstrapNodesV5 = ["enode://a24ac7c5484ef4ed0c5eb2d36620ba4e4aa13b8c84684e1b4aab0cebea2ae45cb4d375b77eab56516d34bfbd3c1a833fc51296ff084b770b94fb9028c4d25ccf@52.169.42.101:30303", "enode://343149e4feefa15d882d9fe4ac7d88f885bd05ebb735e547f12e12080a9fa07c8014ca6fd7f373123488102fe5e34111f8509cf0b7de3f5b44339c9f25e87cb8@52.3.158.184:30303", "enode://b6b28890b006743680c52e64e0d16db57f28124885595fa03a562be1d2bf0f3a1da297d56b13da25fb992888fd556d4c1a27b1f39d531bde7de1921c90061cc6@159.89.28.211:30303"]
StaticNodes = []
TrustedNodes = []
ListenAddr = ":30303"
EnableMsgEvents = false
[Node.HTTPTimeouts]
ReadTimeout = 30000000000
WriteTimeout = 30000000000
IdleTimeout = 120000000000
准备gensis.json
替换alloc中的地址
{
"config": {
"chainId": 11133,
"homesteadBlock": 0,
"eip150Block": 0,
"eip155Block": 0,
"eip158Block": 0,
"byzantiumBlock": 0,
"constantinopleBlock": 0,
"petersburgBlock": 0,
"istanbulBlock": 0
},
"alloc": {
"xx": {
"balance": "111111111000000000000000000000"
},
"xxx": {
"balance": "22222111111111000000000000000000000"
}
},
"coinbase": "0x0000000000000000000000000000000000000000",
"difficulty": "0x20000",
"extraData": "",
"gasLimit": "0x2fefd8",
"nonce": "0x0000000000000042",
"mixhash": "0x0000000000000000000000000000000000000000000000000000000000000000",
"parentHash": "0x0000000000000000000000000000000000000000000000000000000000000000",
"timestamp": "0x00"
}
运行
geth init --datadir ./data/chain genesis.json本地启动geth
在goland中配置启动参数
尝试发送交易
参考
二、代码主流程分析
cmd/main.go
- newApp
- 若无subcommand,则进入geth方法
func geth(ctx *cli.Context) error {
if args := ctx.Args(); len(args) > 0 {
return fmt.Errorf("invalid command: %q", args[0])
}
prepare(ctx) //1
node := makeFullNode(ctx) //2
defer node.Close()
startNode(ctx, node) //3
node.Wait()
return nil
}prepare
- 配置默认cache大小
ctx.GlobalSet(utils.CacheFlag.Name, strconv.Itoa(4096))
- set up gc
gogc := math.Max(20, math.Min(100, 100/(float64(cache)/1024)))
godebug.SetGCPercent(int(gogc))
- set up metrics
utils.SetupMetrics(ctx)
make full node
makeFullNode(ctx *cli.Context)
// 1. read config
stack, cfg := makeConfigNode(ctx)
// 2. eable eth service
utils.RegisterEthService(stack, &cfg.Eth)
// 3. enable whisper
// 涉及到 shh: whisper 协议
// 用作dapp间,借助eth的p2p层进行通信
// ref:https://eth.wiki/concepts/whisper/whisper
shhEnabled := enableWhisper(ctx)
// 4. config graphql service
utils.RegisterGraphQLService(stack, cfg.Node.GraphQLEndpoint(), cfg.Node.GraphQLCors, cfg.Node.GraphQLVirtualHosts, cfg.Node.HTTPTimeouts)
// 5. eth stats service
// 节点状态上报,通过websocket
// 配套使用 https://github.com/cubedro/eth-netstats
// https://github.com/cubedro/eth-net-intelligence-api
// https://github.com/ethereum/eth-net-intelligence-api
// https://eth.wiki/concepts/network-status
// 目前官方使用情况不佳,不过可以用作自己的节点监控
utils.RegisterEthStatsService(stack, cfg.Ethstats.URL)上述流程中,关键点在于RegisterEthService, 展开其流程,如下
func RegisterEthService(stack *node.Node, cfg *eth.Config) {
var err error
// light sync只支持启动les节点
if cfg.SyncMode == downloader.LightSync {
err = stack.Register(func(ctx *node.ServiceContext) (node.Service, error) {
return les.New(ctx, cfg)
})
} else {
err = stack.Register(func(ctx *node.ServiceContext) (node.Service, error) {
fullNode, err := eth.New(ctx, cfg)
if fullNode != nil && cfg.LightServ > 0 {
ls, _ := les.NewLesServer(fullNode, cfg)
// 如果开启light client,则在fullnode基础上加入les功能,以支持web3.les
// les 协议:https://github.com/ethereum/devp2p/blob/master/caps/les.md
// les 介绍:https://eth.wiki/concepts/light-client-protocol
fullNode.AddLesServer(ls)
}
return fullNode, err
})
}
if err != nil {
Fatalf("Failed to register the Ethereum service: %v", err)
}
}start node
// 1. 启动节点
utils.StartNode(stack)
// 2. 启动wallet相关服务
events := make(chan accounts.WalletEvent, 16)
stack.AccountManager().Subscribe(events)
// 3. 启动geth client
rpcClient, err := stack.Attach()
ethClient := ethclient.NewClient(rpcClient)
// 4.注册lesservice,处理les请求
// 5. 处理POA共识最终具体的节点启动逻辑来到了node.go 中的start方法。在后续系列中详细解析
node wait
func (n *Node) Wait() {
n.lock.RLock()
if n.server == nil {
n.lock.RUnlock()
return
}
stop := n.stop
n.lock.RUnlock()
<-stop
}三. Node 启动流程分析
主流程
Node is a container on which services can be registered.
node.go中,start方法实现了对节点的启动
- 构造services
- 启动services对应的p2p server
- 启动services对应的rpc server
// 1. create p2p server
running := &p2p.Server{Config: n.serverConfig}
// 2. start services p2p
// 其中、p2p协议会多个版本共存、该版本中eth service会同时启动63、64、65三个版本的p2p服务
// Gather the protocols and start the freshly assembled P2P server
for _, service := range services {
running.Protocols = append(running.Protocols, service.Protocols()...)
}
if err := running.Start(); err != nil {
return convertFileLockError(err)
}
// Start each of the services
var started []reflect.Type
for kind, service := range services {
// Start the next service, stopping all previous upon failure
if err := service.Start(running); err != nil {
for _, kind := range started {
services[kind].Stop()
}
running.Stop()
return err
}
// Mark the service started for potential cleanup
started = append(started, kind)
}
// 3. start services RPC
if err := n.startRPC(services); err != nil {
for _, service := range services {
service.Stop()
}
running.Stop()
return err
}Services
对于定制化的节点,可以实现Service接口,注入node
service接口如下:
type Service interface {
// Protocols retrieves the P2P protocols the service wishes to start.
Protocols() []p2p.Protocol
// APIs retrieves the list of RPC descriptors the service provides
APIs() []rpc.API
// Start is called after all services have been constructed and the networking
// layer was also initialized to spawn any goroutines required by the service.
Start(server *p2p.Server) error
// Stop terminates all goroutines belonging to the service, blocking until they
// are all terminated.
Stop() error
}四、ETH service解析
Package eth implements the Ethereum protocol.
概况
ethereum struct
type Ethereum struct {
config *Config
// Handlers
txPool *core.TxPool
blockchain *core.BlockChain
protocolManager *ProtocolManager
lesServer LesServer
dialCandiates enode.Iterator
// DB interfaces
chainDb ethdb.Database // Block chain database
eventMux *event.TypeMux
engine consensus.Engine
accountManager *accounts.Manager
bloomRequests chan chan *bloombits.Retrieval // Channel receiving bloom data retrieval requests
bloomIndexer *core.ChainIndexer // Bloom indexer operating during block imports
closeBloomHandler chan struct{}
APIBackend *EthAPIBackend
miner *miner.Miner
gasPrice *big.Int
etherbase common.Address
networkID uint64
netRPCService *ethapi.PublicNetAPI
lock sync.RWMutex // Protects the variadic fields (e.g. gas price and etherbase)
}start 流程
func (s *Ethereum) Start(srvr *p2p.Server) error {
// EIP778: Ethereum Node Records (ENR)
// https://eips.ethereum.org/EIPS/eip-778
// EIP778 提供了一套节点记录的编码规范、用于p2p以及其他类似DNS等协议之上的节点发现
// EIP-2124:用于识别p2p网络中的协议分叉,可能可以用于快速扫描网络中的节点
// https://github.com/ethereum/EIPs/blob/master/EIPS/eip-2124.md
// P2P 一节可以详细讨论一下这里
s.startEthEntryUpdate(srvr.LocalNode())
// Start the bloom bits servicing goroutines
s.startBloomHandlers(params.BloomBitsBlocks)
// Start the RPC service
// ?? 这个API为什么传入的是p2p的server
s.netRPCService = ethapi.NewPublicNetAPI(srvr, s.NetVersion())
// Figure out a max peers count based on the server limits
maxPeers := srvr.MaxPeers
if s.config.LightServ > 0 {
if s.config.LightPeers >= srvr.MaxPeers {
return fmt.Errorf("invalid peer config: light peer count (%d) >= total peer count (%d)", s.config.LightPeers, srvr.MaxPeers)
}
maxPeers -= s.config.LightPeers
}
// 协议层启动发生在这个方法
// Start the networking layer and the light server if requested
s.protocolManager.Start(maxPeers)
if s.lesServer != nil {
s.lesServer.Start(srvr)
}
return nil
}ProtocalManager
func (pm *ProtocolManager) Start(maxPeers int) {
pm.maxPeers = maxPeers
// broadcast transactions
pm.wg.Add(1)
//txChanSize默认4096
//** 这个比较关键、决定了txPool的大小,可以做一定的修改
pm.txsCh = make(chan core.NewTxsEvent, txChanSize)
// 订阅pending
pm.txsSub = pm.txpool.SubscribeNewTxsEvent(pm.txsCh)
// 广播pending给peer,** 可以只广播自己的交易
go pm.txBroadcastLoop()
// broadcast mined blocks
pm.wg.Add(1)
// ?? 不知道这个从哪儿订阅上的
pm.minedBlockSub = pm.eventMux.Subscribe(core.NewMinedBlockEvent{})
go pm.minedBroadcastLoop()
// start sync handlers
pm.wg.Add(2)
// 开始sync
go pm.chainSync.loop()
go pm.txsyncLoop64() // TODO(karalabe): Legacy initial tx echange, drop with eth/64.
}Sync
五、交易接收流程
上一片儿失联了
然后可能要起两个节点了,通过两个节点,来figure out
- block的订阅
- 状态更新
SendTx
我们从交易发送的线索开始挖,主要关注
- 交易上链的过程
- 状态更新过程
以sendTx为例,流程如下
// 1. ethapi/backend.go 定义API接口,对接口的创建在 node 包实现
SendTx(ctx context.Context, signedTx *types.Transaction) error
// 2. eth/api_backend.go 实现了 SendTx
func (b *EthAPIBackend) SendTx(ctx context.Context, signedTx *types.Transaction) error {
return b.eth.txPool.AddLocal(signedTx)
}
// 3. 进入core/tx_pool.go
func (pool *TxPool) AddLocal(tx *types.Transaction) error {
...
pool.addTxs(txs, !pool.config.NoLocals, true)
}
// 4. 过滤已知交易、设置cache
pool.mu.Lock()
newErrs, dirtyAddrs := pool.addTxsLocked(news, local)
pool.mu.Unlock()
// 5. 加入池子 addTxsLocked
func (pool *TxPool) addTxsLocked(txs []*types.Transaction, local bool) ([]error, *accountSet) {
dirty := newAccountSet(pool.signer)
errs := make([]error, len(txs))
for i, tx := range txs {
// 把新交易加入池子
replaced, err := pool.add(tx, local)
errs[i] = err
if err == nil && !replaced {
dirty.addTx(tx)
}
}
// 6. pool.add 核心处理
// If a newly added transaction is marked as local, its sending account will be
// whitelisted, preventing any associated transaction from being dropped out of the pool
// due to pricing constraints. (*** local账户添加,防止因为价格原因drop)
func (pool *TxPool) add(tx *types.Transaction, local bool) (replaced bool, err error)
// a. If the transaction is already known, discard it
// b. If the transaction fails basic validation, discard it
// c. If the transaction pool is full, discard underpriced transactions
// d. pricing bump : Try to replace an existing transaction in the pending pool
// 对同nonce替换的处理,参数决定了交易替换的最小gas比例,本地可能可以逃过,但网络中不太好跳过,看出块节点的配置
// TxPoolPriceBumpFlag = cli.Uint64Flag{
// Name: "txpool.pricebump",
// Usage: "Price bump percentage to replace an already existing transaction",
// Value: eth.DefaultConfig.TxPool.PriceBump,
// }
// e. 送入交易队列
pool.enqueueTx(hash, tx)
func (pool *TxPool) enqueueTx(hash common.Hash, tx *types.Transaction) (bool, error) {
// Try to insert the transaction into the future queue
from, _ := types.Sender(pool.signer, tx) // already validated
if pool.queue[from] == nil {
pool.queue[from] = newTxList(false)
}
inserted, old := pool.queue[from].Add(tx, pool.config.PriceBump)
if !inserted {
// An older transaction was better, discard this
queuedDiscardMeter.Mark(1)
return false, ErrReplaceUnderpriced
}
// Discard any previous transaction and mark this
if old != nil {
pool.all.Remove(old.Hash())
pool.priced.Removed(1)
queuedReplaceMeter.Mark(1)
} else {
// Nothing was replaced, bump the queued counter
queuedGauge.Inc(1)
}
if pool.all.Get(hash) == nil {
pool.all.Add(tx)
pool.priced.Put(tx)
}
return old != nil, nil
}接收交易后,如果是miner,就需要进行交易打包了
如果作为普通的全节点,接下来主要是广播交易,
*** 广播交易和节点处理的链路,我们可以专门优化,此处可以测试一下典型的延时,应该链路上优化空间比较大
六、block insert
顶层从InsertChain开始track
// blockchain.go
func (bc *BlockChain) InsertChain(chain types.Blocks) (int, error)
-->
func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, error)
// 处理区块
receipts, logs, usedGas, err := bc.processor.Process(block, statedb, bc.vmConfig)
// 写入区块
status, err := bc.writeBlockWithState(block, receipts, logs, statedb, false)
-->
func (p *StateProcessor) Process(block *types.Block, statedb *state.StateDB, cfg vm.Config) (types.Receipts, []*types.Log, uint64, error)七、同步机制:SYNC
fast sync
An outline of the fast sync algorithm would be:
- Similarly to classical sync, download the block headers and bodies that make up the blockchain
- Similarly to classical sync, verify the header chain's consistency (POW, total difficulty, etc)
- Instead of processing the blocks, download the transaction receipts as defined by the header
- Store the downloaded blockchain, along with the receipt chain, enabling all historical queries
- When the chain reaches a recent enough state (head - 1024 blocks), pause for state sync:
- Retrieve the entire Merkel Patricia state trie defined by the root hash of the pivot point
- For every account found in the trie, retrieve it's contract code and internal storage state trie
- Upon successful trie download, mark the pivot point (head - 1024 blocks) as the current head
- Import all remaining blocks (1024) by fully processing them as in the classical sync
这是相对省资源的一种方式,fast sync 的模式会下载 Header, Body 和 Receipt,插入的过程不会执行交易,因此也不会产生 StateDB 的数据,然后在某一个区块高度(最高的区块高度 - 1024)的时候同步完成所有的 StateDB 中的数据,作为本节点上的初始 StateDB Root 信息,最后的 downloader.fsMinFullBlocks(当前为 64)个区块会采用 full mode 的方式来构建。这种模式会缩小区块的同步时间(不需要执行交易),同时不会产生大量的历史的 StateDB 信息(也就不会产生大量的磁盘空间),但是对于网络的消耗会更高(因为需要下载 Receipt 和 StateDB),与 full sync 相比较,fast sync 是用网络带宽换取 CPU 资源。
SNAP sync
-
snap sync 是1.10.0引入的一个新的同步机制,旨在解决fast sync带来的带宽及disk io的浪费问题
-
fast sync可以工作在snap协议上,
handler中的snapSync字段控制了这个选项的开关 -
协议详情:https://github.com/ethereum/devp2p/blob/master/caps/snap.md
-
snap 从geth 1.10.0加入
与fast sync对比
In the case of fast sync, the unforeseen bottleneck was latency, caused by Ethereum’s data model. Ethereum’s state trie is a Merkle tree, where the leaves contain the useful data and each node above is the hash of 16 children. Syncing from the root of the tree (the hash embedded in a block header), the only way to download everything is to request each node one-by-one. With 675 million nodes to download, even by batching 384 requests together, it ends up needing 1.75 million round-trips. Assuming an overly generous 50ms RTT to 10 serving peers, fast sync is essentially waiting for over 150 minutes for data to arrive. But network latency is only 1/3rd of the problem.
When a serving peer receives a request for trie nodes, it needs to retrieve them from disk. Ethereum’s Merkle trie doesn’t help here either. Since trie nodes are keyed by hash, there’s no meaningful way to store/retrieve them batched, each requiring it’s own database read. To make matters worse, LevelDB (used by Geth) stores data in 7 levels, so a random read will generally touch as many files. Multiplying it all up, a single network request of 384 nodes - at 7 reads a pop - amounts to 2.7 thousand disk reads. With the fastest SATA SSDs’ speed of 100.000 IOPS, that’s 37ms extra latency. With the same 10 serving peer assumption as above, fast sync just added an extra 108 minutes waiting time. But serving latency is only 1/3 of the problem.
Requesting that many trie nodes individually means actually uploading that many hashes to remote peers to serve. With 675 million nodes to download, that’s 675 million hashes to upload, or 675 * 32 bytes = 21GB. At a global average of 51Mbps upload speed (X Doubt), fast sync just added an extra 56 minutes waiting time. Downloads are a bit more than twice as large, so with global averages of 97Mbps, fast sync popped on a further 63 minutes. Bandwidth delays are the last 1/3 of the problem.
原理
yellow paper
- MPT
- word state 结构
八、P2P 工作流程
eth/backend.go
1. node start
err := n.openEndpoints()
--> server.Start() // server start 即为p2p模块的server
2. p2p server
- setupLocalNode
- create rplx handshake obj
- init or load node db: level db
- initial NAT // ref NAT 网络类型 https://juejin.cn/post/6984996578587050021
- setupListening
- tcp listen
- update enr
- server.listenLoop()
- set max pending peers
- accept and check inbound
- 这里有一个"too many attempts"的控制,逻辑为同一个peer每30秒只能尝试一次handshake
- 这里使用ip控制,使用代理池可以解决这个限制
- `srv.inboundHistory.add(remoteIP.String(), now.Add(inboundThrottleTime))`
- 记录metric
- SetupConn
- doEncHandshake // 秘钥协商
- srv.checkpoint(c, srv.checkpointPostHandshake) --> 检查max peer
// 是否能在未建立连接的情况下获取到hello返回,不能
- doProtoHandshake // rplx hello mesg,包含client id,即name,客户端版本
- srv.checkpoint(c, srv.checkpointAddPeer) // 检查协议的兼容性
细节见文档 p2p.md
九、mining工作流程
1. cmd/geth
threads := ctx.GlobalInt(utils.MinerThreadsFlag.Name)
if err := ethBackend.StartMining(threads); err != nil {
utils.Fatalf("Failed to start mining: %v", err)
}
2. eth/backend
// get miner account
eb, err := s.Etherbase()
if parlia, ok := s.engine.(*parlia.Parlia); ok {
wallet, err := s.accountManager.Find(accounts.Account{Address: eb})
if wallet == nil || err != nil {
log.Error("Etherbase account unavailable locally", "err", err)
return fmt.Errorf("signer missing: %v", err)
}
// sign dat func & sign tx func
parlia.Authorize(eb, wallet.SignData, wallet.SignTx)
}
// start miner
go s.miner.Start(eb)
3. miner/miner
// save start signal
func (miner *Miner) Start(coinbase common.Address) {
miner.startCh <- coinbase
}
// start signal fire
case addr := <-miner.startCh:
miner.SetEtherbase(addr)
if canStart {
// start worker
miner.worker.start()
}
shouldStart = true
4. commit to engine
// worker.go/commit: commit seal to task channal
case w.taskCh <- &task{receipts: receipts, state: s, block: block, createdAt: time.Now()}:
// worker.go/taskLoop: commit to engine
w.engine.Seal(w.chain, task.block, w.resultCh, stopCh);
5. seal in engine
parlia/Seal:
func (p *Parlia) Seal(chain consensus.ChainHeaderReader, block *types.Block, results chan<- *types.Block, stop <-chan struct{})
val, signFn := p.val, p.signFn // val 是出块的签名地址
snap, err := p.snapshot(chain, number-1, header.ParentHash, nil)
// Bail out if we're unauthorized to sign a block
if _, authorized := snap.Validators[val]; !authorized {
return errUnauthorizedValidator
}
// Sweet, the protocol permits us to sign the block, wait for our time
delay := p.delayForRamanujanFork(snap, header)
// Sign all the things!
sig, err := signFn(accounts.Account{Address: val}, accounts.MimetypeParlia, ParliaRLP(header, p.chainConfig.ChainID))
// send result
results <- block.WithSeal(header):
6. validators set 中的逻辑


