Skip to content

Commit

Permalink
Merge pull request bnb-chain#4 from Loverush/develop
Browse files Browse the repository at this point in the history
[WIP]Fast Finality: p2p protocol
  • Loading branch information
pythonberg1997 authored Mar 2, 2022
2 parents 21a3b11 + 9ae0983 commit db60920
Show file tree
Hide file tree
Showing 12 changed files with 328 additions and 29 deletions.
5 changes: 4 additions & 1 deletion core/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
"github.com/ethereum/go-ethereum/core/types"
)

// NewTxsEvent is posted when a batch of transactions enter the transaction pool.
// NewTxsEvent is posted when a batch of transactions enters the transaction pool.
type NewTxsEvent struct{ Txs []*types.Transaction }

// ReannoTxsEvent is posted when a batch of local pending transactions exceed a specified duration.
Expand All @@ -33,6 +33,9 @@ type NewMinedBlockEvent struct{ Block *types.Block }
// RemovedLogsEvent is posted when a reorg happens
type RemovedLogsEvent struct{ Logs []*types.Log }

// NewVotesEvent is posted when a batch of votes enters the vote pool.
type NewVotesEvent struct{ Votes []*types.VoteEnvelope }

type ChainEvent struct {
Block *types.Block
Hash common.Hash
Expand Down
51 changes: 51 additions & 0 deletions core/types/vote.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package types

import (
"sync/atomic"

"github.com/ethereum/go-ethereum/common"
)

const (
BLSPublicKeyLength = 48
BLSSignatureLength = 96
)

type BLSPublicKey [BLSPublicKeyLength]byte
type BLSSignature [BLSSignatureLength]byte

type VoteData struct {
BlockNumber uint64
BlockHash common.Hash
}

type VoteEnvelope struct {
VoteAddress BLSPublicKey
Signature BLSSignature
Data VoteData

// caches
hash atomic.Value
}

type VoteEnvelopes []*VoteEnvelope

// Hash returns the vote hash.
func (v *VoteEnvelope) Hash() common.Hash {
if hash := v.hash.Load(); hash != nil {
return hash.(common.Hash)
}

h := v.calcVoteHash()
v.hash.Store(h)
return h
}

func (v *VoteEnvelope) calcVoteHash() common.Hash {
voteData := struct {
VoteAddress BLSPublicKey
Signature BLSSignature
Data VoteData
}{v.VoteAddress, v.Signature, v.Data}
return rlpHash(voteData)
}
2 changes: 1 addition & 1 deletion eth/fetcher/tx_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ func (f *TxFetcher) Notify(peer string, hashes []common.Hash) error {
// Enqueue imports a batch of received transaction into the transaction pool
// and the fetcher. This method may be called by both transaction broadcasts and
// direct request replies. The differentiation is important so the fetcher can
// re-shedule missing transactions as soon as possible.
// re-schedule missing transactions as soon as possible.
func (f *TxFetcher) Enqueue(peer string, txs []*types.Transaction, direct bool) error {
// Keep track of all the propagated transactions
if direct {
Expand Down
71 changes: 68 additions & 3 deletions eth/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ import (
const (
// txChanSize is the size of channel listening to NewTxsEvent.
// The number is referenced from the size of tx pool.
txChanSize = 4096
txChanSize = 256
voteChanSize = 4096
)

var (
Expand Down Expand Up @@ -79,12 +80,25 @@ type txPool interface {
SubscribeReannoTxsEvent(chan<- core.ReannoTxsEvent) event.Subscription
}

// votePool defines the methods needed from a votes pool implementation to
// support all the operations needed by the Ethereum chain protocols.
type votePool interface {
PutVote(vote *types.VoteEnvelope) error
Get(hash common.Hash) *types.VoteEnvelope
GetVotes() types.VoteEnvelopes

// SubscribeNewVotesEvent should return an event subscription of
// NewVotesEvent and send events to the given channel.
SubscribeNewVotesEvent(chan<- core.NewVotesEvent) event.Subscription
}

// handlerConfig is the collection of initialization parameters to create a full
// node network handler.
type handlerConfig struct {
Database ethdb.Database // Database for direct sync insertions
Chain *core.BlockChain // Blockchain to serve data from
TxPool txPool // Transaction pool to propagate from
VotePool votePool // Votes pool to propagate from
Network uint64 // Network identifier to adfvertise
Sync downloader.SyncMode // Whether to fast or full sync
DiffSync bool // Whether to diff sync
Expand Down Expand Up @@ -112,6 +126,7 @@ type handler struct {

database ethdb.Database
txpool txPool
votepool votePool
chain *core.BlockChain
maxPeers int

Expand All @@ -127,6 +142,8 @@ type handler struct {
reannoTxsCh chan core.ReannoTxsEvent
reannoTxsSub event.Subscription
minedBlockSub *event.TypeMuxSubscription
votesCh chan core.NewVotesEvent
votesSub event.Subscription

whitelist map[uint64]common.Hash

Expand All @@ -152,6 +169,7 @@ func newHandler(config *handlerConfig) (*handler, error) {
eventMux: config.EventMux,
database: config.Database,
txpool: config.TxPool,
votepool: config.VotePool,
chain: config.Chain,
peers: newPeerSet(),
whitelist: config.Whitelist,
Expand Down Expand Up @@ -332,9 +350,10 @@ func (h *handler) runEthPeer(peer *eth.Peer, handler eth.Handler) error {
}
h.chainSync.handlePeerEvent(peer)

// Propagate existing transactions. new transactions appearing
// Propagate existing transactions and votes. new transactions and votes appearing
// after this will be sent via broadcasts.
h.syncTransactions(peer)
h.syncVotes(peer)

// If we have a trusted CHT, reject all peers below that (avoid fast sync eclipse)
if h.checkpointHash != (common.Hash{}) {
Expand Down Expand Up @@ -438,6 +457,12 @@ func (h *handler) Start(maxPeers int) {
h.txsSub = h.txpool.SubscribeNewTxsEvent(h.txsCh)
go h.txBroadcastLoop()

// broadcast votes
h.wg.Add(1)
h.votesCh = make(chan core.NewVotesEvent, voteChanSize)
h.votesSub = h.votepool.SubscribeNewVotesEvent(h.votesCh)
go h.voteBroadcastLoop()

// announce local pending transactions again
h.wg.Add(1)
h.reannoTxsCh = make(chan core.ReannoTxsEvent, txChanSize)
Expand Down Expand Up @@ -510,7 +535,7 @@ func (h *handler) BroadcastBlock(block *types.Block, propagate bool) {
log.Trace("Propagated block", "hash", hash, "recipients", len(transfer), "duration", common.PrettyDuration(time.Since(block.ReceivedAt)))
return
}
// Otherwise if the block is indeed in out own chain, announce it
// Otherwise if the block is indeed in our own chain, announce it
if h.chain.HasBlock(hash, block.NumberU64()) {
for _, peer := range peers {
peer.AsyncSendNewBlockHash(block)
Expand Down Expand Up @@ -580,6 +605,33 @@ func (h *handler) ReannounceTransactions(txs types.Transactions) {
"announce packs", peersCount, "announced hashes", peersCount*uint(len(hashes)))
}

// BroadcastVotes will propagate a batch of votes to all peers
// which are not known to already have the given vote.
func (h *handler) BroadcastVotes(votes types.VoteEnvelopes) {
var (
directCount int // Count of announcements made
directPeers int

voteset = make(map[*ethPeer][]*types.VoteEnvelope) // Set peer->hash to transfer directly
)

// Broadcast votes to a batch of peers not knowing about it
for _, vote := range votes {
peers := h.peers.peersWithoutVote(vote.Hash())
for _, peer := range peers {
voteset[peer] = append(voteset[peer], vote)
}
}

for peer, _votes := range voteset {
directPeers++
directCount += len(_votes)
peer.AsyncSendVotes(_votes)
}
log.Debug("Vote broadcast", "votes", len(votes),
"vote packs", directPeers, "broadcast votes", directCount)
}

// minedBroadcastLoop sends mined blocks to connected peers.
func (h *handler) minedBroadcastLoop() {
defer h.wg.Done()
Expand Down Expand Up @@ -617,3 +669,16 @@ func (h *handler) txReannounceLoop() {
}
}
}

// voteBroadcastLoop announces new transactions to connected peers.
func (h *handler) voteBroadcastLoop() {
defer h.wg.Done()
for {
select {
case event := <-h.votesCh:
h.BroadcastVotes(event.Votes)
case <-h.votesSub.Err():
return
}
}
}
16 changes: 16 additions & 0 deletions eth/handler_eth.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ type ethHandler handler
func (h *ethHandler) Chain() *core.BlockChain { return h.chain }
func (h *ethHandler) StateBloom() *trie.SyncBloom { return h.stateBloom }
func (h *ethHandler) TxPool() eth.TxPool { return h.txpool }
func (h *ethHandler) VotePool() eth.VotePool { return h.votepool }

// RunPeer is invoked when a peer joins on the `eth` protocol.
func (h *ethHandler) RunPeer(peer *eth.Peer, hand eth.Handler) error {
Expand Down Expand Up @@ -99,6 +100,9 @@ func (h *ethHandler) Handle(peer *eth.Peer, packet eth.Packet) error {

case *eth.PooledTransactionsPacket:
return h.txFetcher.Enqueue(peer.ID(), *packet, true)

case *eth.VotesPacket:
return h.handleVotesBroadcast(peer, *packet)
default:
return fmt.Errorf("unexpected eth packet type: %T", packet)
}
Expand Down Expand Up @@ -225,3 +229,15 @@ func (h *ethHandler) handleBlockBroadcast(peer *eth.Peer, block *types.Block, td
}
return nil
}

// handleVotesBroadcast is invoked from a peer's message handler when it transmits a
// votes broadcast for the local node to process.
func (h *ethHandler) handleVotesBroadcast(peer *eth.Peer, votes []*types.VoteEnvelope) error {
// Try to put votes into votepool
for _, vote := range votes {
if err := h.votepool.PutVote(vote); err != nil {
return err
}
}
return nil
}
17 changes: 16 additions & 1 deletion eth/peerset.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ func (ps *peerSet) headPeers(num uint) []*ethPeer {
}

// peersWithoutBlock retrieves a list of peers that do not have a given block in
// their set of known hashes so it might be propagated to them.
// their set of known hashes, so it might be propagated to them.
func (ps *peerSet) peersWithoutBlock(hash common.Hash) []*ethPeer {
ps.lock.RLock()
defer ps.lock.RUnlock()
Expand Down Expand Up @@ -340,6 +340,21 @@ func (ps *peerSet) peersWithoutTransaction(hash common.Hash) []*ethPeer {
return list
}

// peersWithoutVote retrieves a list of peers that do not have a given
// vote in their set of known hashes.
func (ps *peerSet) peersWithoutVote(hash common.Hash) []*ethPeer {
ps.lock.RLock()
defer ps.lock.RUnlock()

list := make([]*ethPeer, 0, len(ps.peers))
for _, p := range ps.peers {
if !p.KnownVote(hash) {
list = append(list, p)
}
}
return list
}

// len returns if the current number of `eth` peers in the set. Since the `snap`
// peers are tied to the existence of an `eth` connection, that will always be a
// subset of `eth`.
Expand Down
21 changes: 21 additions & 0 deletions eth/protocols/eth/broadcast.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,3 +200,24 @@ func (p *Peer) announceTransactions() {
}
}
}

// broadcastVotes is a write loop that schedules votes broadcasts
// to the remote peer. The goal is to have an async writer that does not lock up
// node internals and at the same time rate limits queued data.
func (p *Peer) broadcastVotes() {
for {
select {
case votes := <-p.voteBroadcast:
if err := p.SendVotes(votes); err != nil {
return
}
p.Log().Trace("Sent votes", "count", len(votes))

case <-p.voteTerm:
return

case <-p.term:
return
}
}
}
36 changes: 33 additions & 3 deletions eth/protocols/eth/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,9 @@ type Backend interface {
// or if inbound transactions should simply be dropped.
AcceptTxs() bool

// VotePool retrieves the votes pool object to serve data.
VotePool() VotePool

// RunPeer is invoked when a peer joins on the `eth` protocol. The handler
// should do any peer maintenance work, handshakes and validations. If all
// is passed, control should be given back to the `handler` to process the
Expand All @@ -96,10 +99,15 @@ type Backend interface {

// TxPool defines the methods needed by the protocol handler to serve transactions.
type TxPool interface {
// Get retrieves the the transaction from the local txpool with the given hash.
// Get retrieves the transaction from the local txpool with the given hash.
Get(hash common.Hash) *types.Transaction
}

type VotePool interface {
// Get retrieves the vote from the local votepool with the given hash.
Get(hash common.Hash) *types.VoteEnvelope
}

// MakeProtocols constructs the P2P protocol definitions for `eth`.
func MakeProtocols(backend Backend, network uint64, dnsdisc enode.Iterator) []p2p.Protocol {
protocols := make([]p2p.Protocol, len(ProtocolVersions))
Expand All @@ -111,7 +119,7 @@ func MakeProtocols(backend Backend, network uint64, dnsdisc enode.Iterator) []p2
Version: version,
Length: protocolLengths[version],
Run: func(p *p2p.Peer, rw p2p.MsgReadWriter) error {
peer := NewPeer(version, p, rw, backend.TxPool())
peer := NewPeer(version, p, rw, backend.TxPool(), backend.VotePool())
defer peer.Close()

return backend.RunPeer(peer, func(peer *Peer) error {
Expand Down Expand Up @@ -206,6 +214,26 @@ var eth66 = map[uint64]msgHandler{
PooledTransactionsMsg: handlePooledTransactions66,
}

var eth68 = map[uint64]msgHandler{
NewBlockHashesMsg: handleNewBlockhashes,
NewBlockMsg: handleNewBlock,
TransactionsMsg: handleTransactions,
NewPooledTransactionHashesMsg: handleNewPooledTransactionHashes,
// eth66 messages with request-id
GetBlockHeadersMsg: handleGetBlockHeaders66,
BlockHeadersMsg: handleBlockHeaders66,
GetBlockBodiesMsg: handleGetBlockBodies66,
BlockBodiesMsg: handleBlockBodies66,
GetNodeDataMsg: handleGetNodeData66,
NodeDataMsg: handleNodeData66,
GetReceiptsMsg: handleGetReceipts66,
ReceiptsMsg: handleReceipts66,
GetPooledTransactionsMsg: handleGetPooledTransactions66,
PooledTransactionsMsg: handlePooledTransactions66,
// eth68 messages
VotesMsg: handleVotes,
}

// handleMessage is invoked whenever an inbound message is received from a remote
// peer. The remote connection is torn down upon returning any error.
func handleMessage(backend Backend, peer *Peer) error {
Expand All @@ -220,7 +248,9 @@ func handleMessage(backend Backend, peer *Peer) error {
defer msg.Discard()

var handlers = eth65
if peer.Version() >= ETH66 {
if peer.Version() >= ETH68 {
handlers = eth68
} else if peer.Version() >= ETH66 {
handlers = eth66
}
// Track the amount of time it takes to serve the request and run the handler
Expand Down
Loading

0 comments on commit db60920

Please sign in to comment.