From d55ba4077ae5a24c658f80d030b14614de9d4684 Mon Sep 17 00:00:00 2001 From: Roshan Date: Fri, 25 Feb 2022 12:07:33 +0800 Subject: [PATCH 1/4] [WIP]p2p protocol implement of fast finality --- core/types/vote.go | 28 ++++++++++++++ eth/fetcher/tx_fetcher.go | 2 +- eth/handler.go | 12 +++++- eth/handler_eth.go | 15 ++++++++ eth/protocols/eth/handler.go | 40 ++++++++++++++++++-- eth/protocols/eth/handlers.go | 16 +++++++- eth/protocols/eth/peer.go | 71 ++++++++++++++++++++++++++++++++--- eth/protocols/eth/protocol.go | 36 +++++++++++------- eth/sync.go | 14 +++++++ 9 files changed, 209 insertions(+), 25 deletions(-) create mode 100644 core/types/vote.go diff --git a/core/types/vote.go b/core/types/vote.go new file mode 100644 index 0000000000..7f1f825234 --- /dev/null +++ b/core/types/vote.go @@ -0,0 +1,28 @@ +package types + +import "github.com/ethereum/go-ethereum/common" + +const ( + BLSPublicKeyLength = 48 + BLSSignatureLength = 96 +) + +type BLSPublicKey [BLSPublicKeyLength]byte +type BLSSignature [BLSSignatureLength]byte + +type VoteData struct { + BlockNumber uint64 + BlockHash common.Hash +} + +type VoteRecord struct { + VoteAddress BLSPublicKey + Signature BLSSignature + Data VoteData +} + +type VoteRecords []VoteRecord + +func (v *VoteRecord) Hash() common.Hash { + return rlpHash(v) +} diff --git a/eth/fetcher/tx_fetcher.go b/eth/fetcher/tx_fetcher.go index d0c1348014..d22da56f0b 100644 --- a/eth/fetcher/tx_fetcher.go +++ b/eth/fetcher/tx_fetcher.go @@ -260,7 +260,7 @@ func (f *TxFetcher) Notify(peer string, hashes []common.Hash) error { // Enqueue imports a batch of received transaction into the transaction pool // and the fetcher. This method may be called by both transaction broadcasts and // direct request replies. The differentiation is important so the fetcher can -// re-shedule missing transactions as soon as possible. +// re-schedule missing transactions as soon as possible. func (f *TxFetcher) Enqueue(peer string, txs []*types.Transaction, direct bool) error { // Keep track of all the propagated transactions if direct { diff --git a/eth/handler.go b/eth/handler.go index cbc6eca809..414195d549 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -79,12 +79,20 @@ type txPool interface { SubscribeReannoTxsEvent(chan<- core.ReannoTxsEvent) event.Subscription } +// votePool defines the methods needed from a votes pool implementation to +// support all the operations needed by the Ethereum chain protocols. +type votePool interface { + Put(hash common.Hash, vote types.VoteRecord) error + GetVotes() *types.VoteRecords +} + // handlerConfig is the collection of initialization parameters to create a full // node network handler. type handlerConfig struct { Database ethdb.Database // Database for direct sync insertions Chain *core.BlockChain // Blockchain to serve data from TxPool txPool // Transaction pool to propagate from + VotePool votePool // Votes pool to propagate from Network uint64 // Network identifier to adfvertise Sync downloader.SyncMode // Whether to fast or full sync DiffSync bool // Whether to diff sync @@ -112,6 +120,7 @@ type handler struct { database ethdb.Database txpool txPool + votepool votePool chain *core.BlockChain maxPeers int @@ -152,6 +161,7 @@ func newHandler(config *handlerConfig) (*handler, error) { eventMux: config.EventMux, database: config.Database, txpool: config.TxPool, + votepool: config.VotePool, chain: config.Chain, peers: newPeerSet(), whitelist: config.Whitelist, @@ -510,7 +520,7 @@ func (h *handler) BroadcastBlock(block *types.Block, propagate bool) { log.Trace("Propagated block", "hash", hash, "recipients", len(transfer), "duration", common.PrettyDuration(time.Since(block.ReceivedAt))) return } - // Otherwise if the block is indeed in out own chain, announce it + // Otherwise if the block is indeed in our own chain, announce it if h.chain.HasBlock(hash, block.NumberU64()) { for _, peer := range peers { peer.AsyncSendNewBlockHash(block) diff --git a/eth/handler_eth.go b/eth/handler_eth.go index d2cf83fbfe..5eea2dc233 100644 --- a/eth/handler_eth.go +++ b/eth/handler_eth.go @@ -99,6 +99,9 @@ func (h *ethHandler) Handle(peer *eth.Peer, packet eth.Packet) error { case *eth.PooledTransactionsPacket: return h.txFetcher.Enqueue(peer.ID(), *packet, true) + + case *eth.VotesPacket: + return h.handleVotesBroadcast(peer, *packet) default: return fmt.Errorf("unexpected eth packet type: %T", packet) } @@ -225,3 +228,15 @@ func (h *ethHandler) handleBlockBroadcast(peer *eth.Peer, block *types.Block, td } return nil } + +// handleVotesBroadcast is invoked from a peer's message handler when it transmits a +// votes broadcast for the local node to process. +func (h *ethHandler) handleVotesBroadcast(peer *eth.Peer, votes []*types.VoteRecord) error { + // Try to put votes into votepool + for _, vote := range votes { + if err := h.votepool.Put(vote.Hash(), *vote); err != nil { + return err + } + } + return nil +} diff --git a/eth/protocols/eth/handler.go b/eth/protocols/eth/handler.go index 6bbaa2f555..72ccbd34a0 100644 --- a/eth/protocols/eth/handler.go +++ b/eth/protocols/eth/handler.go @@ -79,6 +79,13 @@ type Backend interface { // or if inbound transactions should simply be dropped. AcceptTxs() bool + // VotePool retrieves the votes pool object to serve data. + VotePool() VotePool + + // AcceptVotes retrieves whether votes processing is enabled on the node + // or if inbound votes should simply be dropped. + AcceptVotes() bool + // RunPeer is invoked when a peer joins on the `eth` protocol. The handler // should do any peer maintenance work, handshakes and validations. If all // is passed, control should be given back to the `handler` to process the @@ -96,10 +103,15 @@ type Backend interface { // TxPool defines the methods needed by the protocol handler to serve transactions. type TxPool interface { - // Get retrieves the the transaction from the local txpool with the given hash. + // Get retrieves the transaction from the local txpool with the given hash. Get(hash common.Hash) *types.Transaction } +type VotePool interface { + // Get retrieves the vote from the local votepool with the given hash. + Get(hash common.Hash) *types.VoteRecord +} + // MakeProtocols constructs the P2P protocol definitions for `eth`. func MakeProtocols(backend Backend, network uint64, dnsdisc enode.Iterator) []p2p.Protocol { protocols := make([]p2p.Protocol, len(ProtocolVersions)) @@ -111,7 +123,7 @@ func MakeProtocols(backend Backend, network uint64, dnsdisc enode.Iterator) []p2 Version: version, Length: protocolLengths[version], Run: func(p *p2p.Peer, rw p2p.MsgReadWriter) error { - peer := NewPeer(version, p, rw, backend.TxPool()) + peer := NewPeer(version, p, rw, backend.TxPool(), backend.VotePool()) defer peer.Close() return backend.RunPeer(peer, func(peer *Peer) error { @@ -206,6 +218,26 @@ var eth66 = map[uint64]msgHandler{ PooledTransactionsMsg: handlePooledTransactions66, } +var eth68 = map[uint64]msgHandler{ + NewBlockHashesMsg: handleNewBlockhashes, + NewBlockMsg: handleNewBlock, + TransactionsMsg: handleTransactions, + NewPooledTransactionHashesMsg: handleNewPooledTransactionHashes, + // eth66 messages with request-id + GetBlockHeadersMsg: handleGetBlockHeaders66, + BlockHeadersMsg: handleBlockHeaders66, + GetBlockBodiesMsg: handleGetBlockBodies66, + BlockBodiesMsg: handleBlockBodies66, + GetNodeDataMsg: handleGetNodeData66, + NodeDataMsg: handleNodeData66, + GetReceiptsMsg: handleGetReceipts66, + ReceiptsMsg: handleReceipts66, + GetPooledTransactionsMsg: handleGetPooledTransactions66, + PooledTransactionsMsg: handlePooledTransactions66, + // eth68 messages + VotesMsg: handleVotes, +} + // handleMessage is invoked whenever an inbound message is received from a remote // peer. The remote connection is torn down upon returning any error. func handleMessage(backend Backend, peer *Peer) error { @@ -220,7 +252,9 @@ func handleMessage(backend Backend, peer *Peer) error { defer msg.Discard() var handlers = eth65 - if peer.Version() >= ETH66 { + if peer.Version() >= ETH68 { + handlers = eth68 + } else if peer.Version() >= ETH66 { handlers = eth66 } // Track the amount of time it takes to serve the request and run the handler diff --git a/eth/protocols/eth/handlers.go b/eth/protocols/eth/handlers.go index d7d993a23d..4458a4d49f 100644 --- a/eth/protocols/eth/handlers.go +++ b/eth/protocols/eth/handlers.go @@ -410,7 +410,7 @@ func handleNewPooledTransactionHashes(backend Backend, msg Decoder, peer *Peer) } func handleGetPooledTransactions(backend Backend, msg Decoder, peer *Peer) error { - // Decode the pooled transactions retrieval message + // Decode the pooled transactions' retrieval message var query GetPooledTransactionsPacket if err := msg.Decode(&query); err != nil { return fmt.Errorf("%w: message %v: %v", errDecode, msg, err) @@ -420,7 +420,7 @@ func handleGetPooledTransactions(backend Backend, msg Decoder, peer *Peer) error } func handleGetPooledTransactions66(backend Backend, msg Decoder, peer *Peer) error { - // Decode the pooled transactions retrieval message + // Decode the pooled transactions' retrieval message var query GetPooledTransactionsPacket66 if err := msg.Decode(&query); err != nil { return fmt.Errorf("%w: message %v: %v", errDecode, msg, err) @@ -518,3 +518,15 @@ func handlePooledTransactions66(backend Backend, msg Decoder, peer *Peer) error return backend.Handle(peer, &txs.PooledTransactionsPacket) } + +func handleVotes(backend Backend, msg Decoder, peer *Peer) error { + ann := new(VotesPacket) + if err := msg.Decode(ann); err != nil { + return fmt.Errorf("%w: message %v: %v", errDecode, msg, err) + } + // Schedule all the unknown hashes for retrieval + for _, vote := range *ann { + peer.markVote(vote.Hash()) + } + return backend.Handle(peer, ann) +} diff --git a/eth/protocols/eth/peer.go b/eth/protocols/eth/peer.go index 7ab4fa1a36..d3b027ac57 100644 --- a/eth/protocols/eth/peer.go +++ b/eth/protocols/eth/peer.go @@ -34,6 +34,10 @@ const ( // before starting to randomly evict them. maxKnownTxs = 32768 + // maxKnownVotes is the maximum vote hashes to keep in the known list + // before starting to randomly evict them. + maxKnownVotes = 5376 + // maxKnownBlocks is the maximum block hashes to keep in the known list // before starting to randomly evict them. maxKnownBlocks = 1024 @@ -86,14 +90,19 @@ type Peer struct { txBroadcast chan []common.Hash // Channel used to queue transaction propagation requests txAnnounce chan []common.Hash // Channel used to queue transaction announcement requests - term chan struct{} // Termination channel to stop the broadcasters - txTerm chan struct{} // Termination channel to stop the tx broadcasters - lock sync.RWMutex // Mutex protecting the internal fields + votepool VotePool // Votes pool used by the broadcasters + knownVotes mapset.Set // Set of vote hashes known to be known by this peer + voteBroadcast chan *types.VoteRecords // Channel used to queue votes propagation requests + + term chan struct{} // Termination channel to stop the broadcasters + txTerm chan struct{} // Termination channel to stop the tx broadcasters + voteTerm chan struct{} // Termination channel to stop the votes broadcasters + lock sync.RWMutex // Mutex protecting the internal fields } // NewPeer create a wrapper for a network connection and negotiated protocol // version. -func NewPeer(version uint, p *p2p.Peer, rw p2p.MsgReadWriter, txpool TxPool) *Peer { +func NewPeer(version uint, p *p2p.Peer, rw p2p.MsgReadWriter, txpool TxPool, votepool VotePool) *Peer { peer := &Peer{ id: p.ID().String(), Peer: p, @@ -101,13 +110,17 @@ func NewPeer(version uint, p *p2p.Peer, rw p2p.MsgReadWriter, txpool TxPool) *Pe version: version, knownTxs: mapset.NewSet(), knownBlocks: mapset.NewSet(), + knownVotes: mapset.NewSet(), queuedBlocks: make(chan *blockPropagation, maxQueuedBlocks), queuedBlockAnns: make(chan *types.Block, maxQueuedBlockAnns), txBroadcast: make(chan []common.Hash), txAnnounce: make(chan []common.Hash), txpool: txpool, + voteBroadcast: make(chan *types.VoteRecords), + votepool: votepool, term: make(chan struct{}), txTerm: make(chan struct{}), + voteTerm: make(chan struct{}), } // Start up all the broadcasters go peer.broadcastBlocks() @@ -174,6 +187,11 @@ func (p *Peer) KnownTransaction(hash common.Hash) bool { return p.knownTxs.Contains(hash) } +// KnownVote returns whether peer is known to already have a vote. +func (p *Peer) KnownVote(hash common.Hash) bool { + return p.knownVotes.Contains(hash) +} + // markBlock marks a block as known for the peer, ensuring that the block will // never be propagated to this particular peer. func (p *Peer) markBlock(hash common.Hash) { @@ -194,6 +212,16 @@ func (p *Peer) markTransaction(hash common.Hash) { p.knownTxs.Add(hash) } +// markVote marks a vote as known for the peer, ensuring that it +// will never be propagated to this particular peer. +func (p *Peer) markVote(hash common.Hash) { + // If we reached the memory allowance, drop a previously known transaction hash + for p.knownVotes.Cardinality() >= maxKnownVotes { + p.knownVotes.Pop() + } + p.knownVotes.Add(hash) +} + // SendTransactions sends transactions to the peer and includes the hashes // in its transaction hash set for future reference. // @@ -369,6 +397,39 @@ func (p *Peer) AsyncSendNewBlock(block *types.Block, td *big.Int) { } } +// SendVotes propagates a batch of votes to the remote peer. +func (p *Peer) SendVotes(votes types.VoteRecords) error { + // Mark all the transactions as known, but ensure we don't overflow our limits + for p.knownVotes.Cardinality() > max(0, maxKnownTxs-len(votes)) { + p.knownVotes.Pop() + } + for _, vote := range votes { + p.knownVotes.Add(vote.Hash()) + } + return p2p.Send(p.rw, VotesMsg, votes) +} + +// AsyncSendVotes queues a batch of vote hashes for propagation to a remote peer. If +// the peer's broadcast queue is full, the event is silently dropped. +func (p *Peer) AsyncSendVotes(votes *types.VoteRecords) { + select { + case p.voteBroadcast <- votes: + // Mark all the transactions as known, but ensure we don't overflow our limits + for p.knownVotes.Cardinality() > max(0, maxKnownVotes-len(*votes)) { + p.knownVotes.Pop() + } + for _, vote := range *votes { + p.knownVotes.Add(vote.Hash()) + } + + case <-p.voteTerm: + p.Log().Debug("Dropping vote propagation", "count", len(*votes)) + + case <-p.term: + p.Log().Debug("Dropping vote propagation", "count", len(*votes)) + } +} + // SendBlockHeaders sends a batch of block headers to the remote peer. func (p *Peer) SendBlockHeaders(headers []*types.Header) error { return p2p.Send(p.rw, BlockHeadersMsg, BlockHeadersPacket(headers)) @@ -397,7 +458,7 @@ func (p *Peer) ReplyBlockBodiesRLP(id uint64, bodies []rlp.RawValue) error { }) } -// SendNodeDataRLP sends a batch of arbitrary internal data, corresponding to the +// SendNodeData sends a batch of arbitrary internal data, corresponding to the // hashes requested. func (p *Peer) SendNodeData(data [][]byte) error { return p2p.Send(p.rw, NodeDataMsg, NodeDataPacket(data)) diff --git a/eth/protocols/eth/protocol.go b/eth/protocols/eth/protocol.go index 3e0e0cf6ed..7b09c48917 100644 --- a/eth/protocols/eth/protocol.go +++ b/eth/protocols/eth/protocol.go @@ -33,6 +33,7 @@ const ( ETH65 = 65 ETH66 = 66 ETH67 = 67 + ETH68 = 68 ) // ProtocolName is the official short name of the `eth` protocol used during @@ -41,11 +42,11 @@ const ProtocolName = "eth" // ProtocolVersions are the supported versions of the `eth` protocol (first // is primary). -var ProtocolVersions = []uint{ETH67, ETH66, ETH65} +var ProtocolVersions = []uint{ETH68, ETH67, ETH66, ETH65} // protocolLengths are the number of implemented message corresponding to // different protocol versions. -var protocolLengths = map[uint]uint64{ETH67: 18, ETH66: 17, ETH65: 17} +var protocolLengths = map[uint]uint64{ETH68: 19, ETH67: 18, ETH66: 17, ETH65: 17} // maxMessageSize is the maximum cap on the size of a protocol message. const maxMessageSize = 10 * 1024 * 1024 @@ -72,6 +73,9 @@ const ( // Protocol messages overloaded in eth/66 UpgradeStatusMsg = 0x0b + + // Protocol messages overloaded in eth/68 + VotesMsg = 0x80 ) var ( @@ -161,7 +165,7 @@ type GetBlockHeadersPacket struct { Reverse bool // Query direction (false = rising towards latest, true = falling towards genesis) } -// GetBlockHeadersPacket represents a block header query over eth/66 +// GetBlockHeadersPacket66 represents a block header query over eth/66 type GetBlockHeadersPacket66 struct { RequestId uint64 *GetBlockHeadersPacket @@ -206,7 +210,7 @@ func (hn *HashOrNumber) DecodeRLP(s *rlp.Stream) error { // BlockHeadersPacket represents a block header response. type BlockHeadersPacket []*types.Header -// BlockHeadersPacket represents a block header response over eth/66. +// BlockHeadersPacket66 represents a block header response over eth/66. type BlockHeadersPacket66 struct { RequestId uint64 BlockHeadersPacket @@ -234,7 +238,7 @@ func (request *NewBlockPacket) sanityCheck() error { // GetBlockBodiesPacket represents a block body query. type GetBlockBodiesPacket []common.Hash -// GetBlockBodiesPacket represents a block body query over eth/66. +// GetBlockBodiesPacket66 represents a block body query over eth/66. type GetBlockBodiesPacket66 struct { RequestId uint64 GetBlockBodiesPacket @@ -243,7 +247,7 @@ type GetBlockBodiesPacket66 struct { // BlockBodiesPacket is the network packet for block content distribution. type BlockBodiesPacket []*BlockBody -// BlockBodiesPacket is the network packet for block content distribution over eth/66. +// BlockBodiesPacket66 is the network packet for block content distribution over eth/66. type BlockBodiesPacket66 struct { RequestId uint64 BlockBodiesPacket @@ -282,7 +286,7 @@ func (p *BlockBodiesPacket) Unpack() ([][]*types.Transaction, [][]*types.Header) // GetNodeDataPacket represents a trie node data query. type GetNodeDataPacket []common.Hash -// GetNodeDataPacket represents a trie node data query over eth/66. +// GetNodeDataPacket66 represents a trie node data query over eth/66. type GetNodeDataPacket66 struct { RequestId uint64 GetNodeDataPacket @@ -291,7 +295,7 @@ type GetNodeDataPacket66 struct { // NodeDataPacket is the network packet for trie node data distribution. type NodeDataPacket [][]byte -// NodeDataPacket is the network packet for trie node data distribution over eth/66. +// NodeDataPacket66 is the network packet for trie node data distribution over eth/66. type NodeDataPacket66 struct { RequestId uint64 NodeDataPacket @@ -300,7 +304,7 @@ type NodeDataPacket66 struct { // GetReceiptsPacket represents a block receipts query. type GetReceiptsPacket []common.Hash -// GetReceiptsPacket represents a block receipts query over eth/66. +// GetReceiptsPacket66 represents a block receipts query over eth/66. type GetReceiptsPacket66 struct { RequestId uint64 GetReceiptsPacket @@ -309,7 +313,7 @@ type GetReceiptsPacket66 struct { // ReceiptsPacket is the network packet for block receipts distribution. type ReceiptsPacket [][]*types.Receipt -// ReceiptsPacket is the network packet for block receipts distribution over eth/66. +// ReceiptsPacket66 is the network packet for block receipts distribution over eth/66. type ReceiptsPacket66 struct { RequestId uint64 ReceiptsPacket @@ -318,7 +322,7 @@ type ReceiptsPacket66 struct { // ReceiptsRLPPacket is used for receipts, when we already have it encoded type ReceiptsRLPPacket []rlp.RawValue -// ReceiptsPacket66 is the eth-66 version of ReceiptsRLPPacket +// ReceiptsRLPPacket66 is the eth-66 version of ReceiptsRLPPacket type ReceiptsRLPPacket66 struct { RequestId uint64 ReceiptsRLPPacket @@ -338,13 +342,13 @@ type GetPooledTransactionsPacket66 struct { // PooledTransactionsPacket is the network packet for transaction distribution. type PooledTransactionsPacket []*types.Transaction -// PooledTransactionsPacket is the network packet for transaction distribution over eth/66. +// PooledTransactionsPacket66 is the network packet for transaction distribution over eth/66. type PooledTransactionsPacket66 struct { RequestId uint64 PooledTransactionsPacket } -// PooledTransactionsPacket is the network packet for transaction distribution, used +// PooledTransactionsRLPPacket is the network packet for transaction distribution, used // in the cases we already have them in rlp-encoded form type PooledTransactionsRLPPacket []rlp.RawValue @@ -354,6 +358,12 @@ type PooledTransactionsRLPPacket66 struct { PooledTransactionsRLPPacket } +// VotesPacket is the network packet for votes record. +type VotesPacket []*types.VoteRecord + +func (*VotesPacket) Name() string { return "Votes" } +func (*VotesPacket) Kind() byte { return VotesMsg } + func (*StatusPacket) Name() string { return "Status" } func (*StatusPacket) Kind() byte { return StatusMsg } diff --git a/eth/sync.go b/eth/sync.go index 2256c7cb99..d251427a79 100644 --- a/eth/sync.go +++ b/eth/sync.go @@ -80,6 +80,20 @@ func (h *handler) syncTransactions(p *eth.Peer) { } } +func (h *handler) syncVotes(p *eth.Peer) { + votes := h.votepool.GetVotes() + if len(*votes) == 0 { + return + } + // The eth/65 protocol introduces proper transaction announcements, so instead + // of dripping transactions across multiple peers, just send the entire list as + // an announcement and let the remote side decide what they need (likely nothing). + if p.Version() >= eth.ETH68 { + p.AsyncSendVotes(votes) + return + } +} + // txsyncLoop64 takes care of the initial transaction sync for each new // connection. When a new peer appears, we relay all currently pending // transactions. In order to minimise egress bandwidth usage, we send From 20ce599636243d33629f06d1a0549b0bb52bdecb Mon Sep 17 00:00:00 2001 From: Roshan Date: Mon, 28 Feb 2022 19:57:26 +0800 Subject: [PATCH 2/4] vote broadcast --- core/events.go | 5 ++- core/types/vote.go | 2 +- eth/handler.go | 60 ++++++++++++++++++++++++++++++++-- eth/handler_eth.go | 2 +- eth/peerset.go | 17 +++++++++- eth/protocols/eth/broadcast.go | 18 ++++++++++ eth/protocols/eth/peer.go | 21 +++++++----- eth/sync.go | 2 +- 8 files changed, 110 insertions(+), 17 deletions(-) diff --git a/core/events.go b/core/events.go index 5e730a24a7..fcf1da360e 100644 --- a/core/events.go +++ b/core/events.go @@ -21,7 +21,7 @@ import ( "github.com/ethereum/go-ethereum/core/types" ) -// NewTxsEvent is posted when a batch of transactions enter the transaction pool. +// NewTxsEvent is posted when a batch of transactions enters the transaction pool. type NewTxsEvent struct{ Txs []*types.Transaction } // ReannoTxsEvent is posted when a batch of local pending transactions exceed a specified duration. @@ -33,6 +33,9 @@ type NewMinedBlockEvent struct{ Block *types.Block } // RemovedLogsEvent is posted when a reorg happens type RemovedLogsEvent struct{ Logs []*types.Log } +// NewVotesEvent is posted when a batch of votes enters the vote pool. +type NewVotesEvent struct{ Votes []*types.VoteRecord } + type ChainEvent struct { Block *types.Block Hash common.Hash diff --git a/core/types/vote.go b/core/types/vote.go index 7f1f825234..05a9f9c333 100644 --- a/core/types/vote.go +++ b/core/types/vote.go @@ -21,7 +21,7 @@ type VoteRecord struct { Data VoteData } -type VoteRecords []VoteRecord +type VoteRecords []*VoteRecord func (v *VoteRecord) Hash() common.Hash { return rlpHash(v) diff --git a/eth/handler.go b/eth/handler.go index 414195d549..023c3e5bce 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -45,7 +45,8 @@ import ( const ( // txChanSize is the size of channel listening to NewTxsEvent. // The number is referenced from the size of tx pool. - txChanSize = 4096 + txChanSize = 4096 + voteChanSize = 4096 ) var ( @@ -82,8 +83,12 @@ type txPool interface { // votePool defines the methods needed from a votes pool implementation to // support all the operations needed by the Ethereum chain protocols. type votePool interface { - Put(hash common.Hash, vote types.VoteRecord) error - GetVotes() *types.VoteRecords + PutVote(vote types.VoteRecord) error + GetVotes() types.VoteRecords + + // SubscribeNewVotesEvent should return an event subscription of + // NewVotesEvent and send events to the given channel. + SubscribeNewVotesEvent(chan<- core.NewVotesEvent) event.Subscription } // handlerConfig is the collection of initialization parameters to create a full @@ -136,6 +141,8 @@ type handler struct { reannoTxsCh chan core.ReannoTxsEvent reannoTxsSub event.Subscription minedBlockSub *event.TypeMuxSubscription + votesCh chan core.NewVotesEvent + votesSub event.Subscription whitelist map[uint64]common.Hash @@ -448,6 +455,12 @@ func (h *handler) Start(maxPeers int) { h.txsSub = h.txpool.SubscribeNewTxsEvent(h.txsCh) go h.txBroadcastLoop() + // broadcast votes + h.wg.Add(1) + h.votesCh = make(chan core.NewVotesEvent, voteChanSize) + h.votesSub = h.votepool.SubscribeNewVotesEvent(h.votesCh) + go h.voteBroadcastLoop() + // announce local pending transactions again h.wg.Add(1) h.reannoTxsCh = make(chan core.ReannoTxsEvent, txChanSize) @@ -590,6 +603,34 @@ func (h *handler) ReannounceTransactions(txs types.Transactions) { "announce packs", peersCount, "announced hashes", peersCount*uint(len(hashes))) } +// BroadcastVotes will propagate a batch of votes to a square root of all peers +// which are not known to already have the given vote. +func (h *handler) BroadcastVotes(votes types.VoteRecords) { + var ( + directCount int // Count of announcements made + directPeers int + + voteset = make(map[*ethPeer][]*types.VoteRecord) // Set peer->hash to transfer directly + ) + + // Broadcast votes to a batch of peers not knowing about it + for _, vote := range votes { + peers := h.peers.peersWithoutVote(vote.Hash()) + numDirect := int(math.Sqrt(float64(len(peers)))) + for _, peer := range peers[:numDirect] { + voteset[peer] = append(voteset[peer], vote) + } + } + + for peer, _votes := range voteset { + directPeers++ + directCount += len(_votes) + peer.AsyncSendVotes(_votes) + } + log.Debug("Vote broadcast", "votes", len(votes), + "vote packs", directPeers, "broadcast votes", directCount) +} + // minedBroadcastLoop sends mined blocks to connected peers. func (h *handler) minedBroadcastLoop() { defer h.wg.Done() @@ -627,3 +668,16 @@ func (h *handler) txReannounceLoop() { } } } + +// voteBroadcastLoop announces new transactions to connected peers. +func (h *handler) voteBroadcastLoop() { + defer h.wg.Done() + for { + select { + case event := <-h.votesCh: + h.BroadcastVotes(event.Votes) + case <-h.votesSub.Err(): + return + } + } +} diff --git a/eth/handler_eth.go b/eth/handler_eth.go index 5eea2dc233..a677027479 100644 --- a/eth/handler_eth.go +++ b/eth/handler_eth.go @@ -234,7 +234,7 @@ func (h *ethHandler) handleBlockBroadcast(peer *eth.Peer, block *types.Block, td func (h *ethHandler) handleVotesBroadcast(peer *eth.Peer, votes []*types.VoteRecord) error { // Try to put votes into votepool for _, vote := range votes { - if err := h.votepool.Put(vote.Hash(), *vote); err != nil { + if err := h.votepool.PutVote(*vote); err != nil { return err } } diff --git a/eth/peerset.go b/eth/peerset.go index 0f5245a05e..fb2109bb01 100644 --- a/eth/peerset.go +++ b/eth/peerset.go @@ -311,7 +311,7 @@ func (ps *peerSet) headPeers(num uint) []*ethPeer { } // peersWithoutBlock retrieves a list of peers that do not have a given block in -// their set of known hashes so it might be propagated to them. +// their set of known hashes, so it might be propagated to them. func (ps *peerSet) peersWithoutBlock(hash common.Hash) []*ethPeer { ps.lock.RLock() defer ps.lock.RUnlock() @@ -340,6 +340,21 @@ func (ps *peerSet) peersWithoutTransaction(hash common.Hash) []*ethPeer { return list } +// peersWithoutVote retrieves a list of peers that do not have a given +// vote in their set of known hashes. +func (ps *peerSet) peersWithoutVote(hash common.Hash) []*ethPeer { + ps.lock.RLock() + defer ps.lock.RUnlock() + + list := make([]*ethPeer, 0, len(ps.peers)) + for _, p := range ps.peers { + if !p.KnownVote(hash) { + list = append(list, p) + } + } + return list +} + // len returns if the current number of `eth` peers in the set. Since the `snap` // peers are tied to the existence of an `eth` connection, that will always be a // subset of `eth`. diff --git a/eth/protocols/eth/broadcast.go b/eth/protocols/eth/broadcast.go index 132eac0102..06f2a71c9d 100644 --- a/eth/protocols/eth/broadcast.go +++ b/eth/protocols/eth/broadcast.go @@ -200,3 +200,21 @@ func (p *Peer) announceTransactions() { } } } + +// broadcastVotes is a write loop that schedules votes broadcasts +// to the remote peer. The goal is to have an async writer that does not lock up +// node internals and at the same time rate limits queued data. +func (p *Peer) broadcastVotes() { + for { + select { + case prop := <-p.queuedBlocks: + if err := p.SendNewBlock(prop.block, prop.td); err != nil { + return + } + p.Log().Trace("Propagated block", "number", prop.block.Number(), "hash", prop.block.Hash(), "td", prop.td) + + case <-p.term: + return + } + } +} diff --git a/eth/protocols/eth/peer.go b/eth/protocols/eth/peer.go index d3b027ac57..c6ff8e0bbf 100644 --- a/eth/protocols/eth/peer.go +++ b/eth/protocols/eth/peer.go @@ -90,9 +90,9 @@ type Peer struct { txBroadcast chan []common.Hash // Channel used to queue transaction propagation requests txAnnounce chan []common.Hash // Channel used to queue transaction announcement requests - votepool VotePool // Votes pool used by the broadcasters - knownVotes mapset.Set // Set of vote hashes known to be known by this peer - voteBroadcast chan *types.VoteRecords // Channel used to queue votes propagation requests + votepool VotePool // Votes pool used by the broadcasters + knownVotes mapset.Set // Set of vote hashes known to be known by this peer + voteBroadcast chan types.VoteRecords // Channel used to queue votes propagation requests term chan struct{} // Termination channel to stop the broadcasters txTerm chan struct{} // Termination channel to stop the tx broadcasters @@ -116,7 +116,7 @@ func NewPeer(version uint, p *p2p.Peer, rw p2p.MsgReadWriter, txpool TxPool, vot txBroadcast: make(chan []common.Hash), txAnnounce: make(chan []common.Hash), txpool: txpool, - voteBroadcast: make(chan *types.VoteRecords), + voteBroadcast: make(chan types.VoteRecords), votepool: votepool, term: make(chan struct{}), txTerm: make(chan struct{}), @@ -128,6 +128,9 @@ func NewPeer(version uint, p *p2p.Peer, rw p2p.MsgReadWriter, txpool TxPool, vot if version >= ETH65 { go peer.announceTransactions() } + if version >= ETH68 { + go peer.broadcastVotes() + } return peer } @@ -411,22 +414,22 @@ func (p *Peer) SendVotes(votes types.VoteRecords) error { // AsyncSendVotes queues a batch of vote hashes for propagation to a remote peer. If // the peer's broadcast queue is full, the event is silently dropped. -func (p *Peer) AsyncSendVotes(votes *types.VoteRecords) { +func (p *Peer) AsyncSendVotes(votes types.VoteRecords) { select { case p.voteBroadcast <- votes: // Mark all the transactions as known, but ensure we don't overflow our limits - for p.knownVotes.Cardinality() > max(0, maxKnownVotes-len(*votes)) { + for p.knownVotes.Cardinality() > max(0, maxKnownVotes-len(votes)) { p.knownVotes.Pop() } - for _, vote := range *votes { + for _, vote := range votes { p.knownVotes.Add(vote.Hash()) } case <-p.voteTerm: - p.Log().Debug("Dropping vote propagation", "count", len(*votes)) + p.Log().Debug("Dropping vote propagation", "count", len(votes)) case <-p.term: - p.Log().Debug("Dropping vote propagation", "count", len(*votes)) + p.Log().Debug("Dropping vote propagation", "count", len(votes)) } } diff --git a/eth/sync.go b/eth/sync.go index d251427a79..e1684b890e 100644 --- a/eth/sync.go +++ b/eth/sync.go @@ -82,7 +82,7 @@ func (h *handler) syncTransactions(p *eth.Peer) { func (h *handler) syncVotes(p *eth.Peer) { votes := h.votepool.GetVotes() - if len(*votes) == 0 { + if len(votes) == 0 { return } // The eth/65 protocol introduces proper transaction announcements, so instead From 733d404e9a8024a85161d66acd374cbe07c7e929 Mon Sep 17 00:00:00 2001 From: Roshan Date: Mon, 28 Feb 2022 19:57:26 +0800 Subject: [PATCH 3/4] vote broadcast --- core/events.go | 5 ++- core/types/vote.go | 2 +- eth/handler.go | 61 ++++++++++++++++++++++++++++++++-- eth/handler_eth.go | 3 +- eth/peerset.go | 17 +++++++++- eth/protocols/eth/broadcast.go | 18 ++++++++++ eth/protocols/eth/handler.go | 4 --- eth/protocols/eth/peer.go | 21 +++++++----- eth/sync.go | 2 +- 9 files changed, 112 insertions(+), 21 deletions(-) diff --git a/core/events.go b/core/events.go index 5e730a24a7..fcf1da360e 100644 --- a/core/events.go +++ b/core/events.go @@ -21,7 +21,7 @@ import ( "github.com/ethereum/go-ethereum/core/types" ) -// NewTxsEvent is posted when a batch of transactions enter the transaction pool. +// NewTxsEvent is posted when a batch of transactions enters the transaction pool. type NewTxsEvent struct{ Txs []*types.Transaction } // ReannoTxsEvent is posted when a batch of local pending transactions exceed a specified duration. @@ -33,6 +33,9 @@ type NewMinedBlockEvent struct{ Block *types.Block } // RemovedLogsEvent is posted when a reorg happens type RemovedLogsEvent struct{ Logs []*types.Log } +// NewVotesEvent is posted when a batch of votes enters the vote pool. +type NewVotesEvent struct{ Votes []*types.VoteRecord } + type ChainEvent struct { Block *types.Block Hash common.Hash diff --git a/core/types/vote.go b/core/types/vote.go index 7f1f825234..05a9f9c333 100644 --- a/core/types/vote.go +++ b/core/types/vote.go @@ -21,7 +21,7 @@ type VoteRecord struct { Data VoteData } -type VoteRecords []VoteRecord +type VoteRecords []*VoteRecord func (v *VoteRecord) Hash() common.Hash { return rlpHash(v) diff --git a/eth/handler.go b/eth/handler.go index 414195d549..2b22074669 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -45,7 +45,8 @@ import ( const ( // txChanSize is the size of channel listening to NewTxsEvent. // The number is referenced from the size of tx pool. - txChanSize = 4096 + txChanSize = 4096 + voteChanSize = 4096 ) var ( @@ -82,8 +83,13 @@ type txPool interface { // votePool defines the methods needed from a votes pool implementation to // support all the operations needed by the Ethereum chain protocols. type votePool interface { - Put(hash common.Hash, vote types.VoteRecord) error - GetVotes() *types.VoteRecords + PutVote(vote types.VoteRecord) error + Get(hash common.Hash) *types.VoteRecord + GetVotes() types.VoteRecords + + // SubscribeNewVotesEvent should return an event subscription of + // NewVotesEvent and send events to the given channel. + SubscribeNewVotesEvent(chan<- core.NewVotesEvent) event.Subscription } // handlerConfig is the collection of initialization parameters to create a full @@ -136,6 +142,8 @@ type handler struct { reannoTxsCh chan core.ReannoTxsEvent reannoTxsSub event.Subscription minedBlockSub *event.TypeMuxSubscription + votesCh chan core.NewVotesEvent + votesSub event.Subscription whitelist map[uint64]common.Hash @@ -448,6 +456,12 @@ func (h *handler) Start(maxPeers int) { h.txsSub = h.txpool.SubscribeNewTxsEvent(h.txsCh) go h.txBroadcastLoop() + // broadcast votes + h.wg.Add(1) + h.votesCh = make(chan core.NewVotesEvent, voteChanSize) + h.votesSub = h.votepool.SubscribeNewVotesEvent(h.votesCh) + go h.voteBroadcastLoop() + // announce local pending transactions again h.wg.Add(1) h.reannoTxsCh = make(chan core.ReannoTxsEvent, txChanSize) @@ -590,6 +604,34 @@ func (h *handler) ReannounceTransactions(txs types.Transactions) { "announce packs", peersCount, "announced hashes", peersCount*uint(len(hashes))) } +// BroadcastVotes will propagate a batch of votes to a square root of all peers +// which are not known to already have the given vote. +func (h *handler) BroadcastVotes(votes types.VoteRecords) { + var ( + directCount int // Count of announcements made + directPeers int + + voteset = make(map[*ethPeer][]*types.VoteRecord) // Set peer->hash to transfer directly + ) + + // Broadcast votes to a batch of peers not knowing about it + for _, vote := range votes { + peers := h.peers.peersWithoutVote(vote.Hash()) + numDirect := int(math.Sqrt(float64(len(peers)))) + for _, peer := range peers[:numDirect] { + voteset[peer] = append(voteset[peer], vote) + } + } + + for peer, _votes := range voteset { + directPeers++ + directCount += len(_votes) + peer.AsyncSendVotes(_votes) + } + log.Debug("Vote broadcast", "votes", len(votes), + "vote packs", directPeers, "broadcast votes", directCount) +} + // minedBroadcastLoop sends mined blocks to connected peers. func (h *handler) minedBroadcastLoop() { defer h.wg.Done() @@ -627,3 +669,16 @@ func (h *handler) txReannounceLoop() { } } } + +// voteBroadcastLoop announces new transactions to connected peers. +func (h *handler) voteBroadcastLoop() { + defer h.wg.Done() + for { + select { + case event := <-h.votesCh: + h.BroadcastVotes(event.Votes) + case <-h.votesSub.Err(): + return + } + } +} diff --git a/eth/handler_eth.go b/eth/handler_eth.go index 5eea2dc233..46fd52511c 100644 --- a/eth/handler_eth.go +++ b/eth/handler_eth.go @@ -40,6 +40,7 @@ type ethHandler handler func (h *ethHandler) Chain() *core.BlockChain { return h.chain } func (h *ethHandler) StateBloom() *trie.SyncBloom { return h.stateBloom } func (h *ethHandler) TxPool() eth.TxPool { return h.txpool } +func (h *ethHandler) VotePool() eth.VotePool { return h.votepool } // RunPeer is invoked when a peer joins on the `eth` protocol. func (h *ethHandler) RunPeer(peer *eth.Peer, hand eth.Handler) error { @@ -234,7 +235,7 @@ func (h *ethHandler) handleBlockBroadcast(peer *eth.Peer, block *types.Block, td func (h *ethHandler) handleVotesBroadcast(peer *eth.Peer, votes []*types.VoteRecord) error { // Try to put votes into votepool for _, vote := range votes { - if err := h.votepool.Put(vote.Hash(), *vote); err != nil { + if err := h.votepool.PutVote(*vote); err != nil { return err } } diff --git a/eth/peerset.go b/eth/peerset.go index 0f5245a05e..fb2109bb01 100644 --- a/eth/peerset.go +++ b/eth/peerset.go @@ -311,7 +311,7 @@ func (ps *peerSet) headPeers(num uint) []*ethPeer { } // peersWithoutBlock retrieves a list of peers that do not have a given block in -// their set of known hashes so it might be propagated to them. +// their set of known hashes, so it might be propagated to them. func (ps *peerSet) peersWithoutBlock(hash common.Hash) []*ethPeer { ps.lock.RLock() defer ps.lock.RUnlock() @@ -340,6 +340,21 @@ func (ps *peerSet) peersWithoutTransaction(hash common.Hash) []*ethPeer { return list } +// peersWithoutVote retrieves a list of peers that do not have a given +// vote in their set of known hashes. +func (ps *peerSet) peersWithoutVote(hash common.Hash) []*ethPeer { + ps.lock.RLock() + defer ps.lock.RUnlock() + + list := make([]*ethPeer, 0, len(ps.peers)) + for _, p := range ps.peers { + if !p.KnownVote(hash) { + list = append(list, p) + } + } + return list +} + // len returns if the current number of `eth` peers in the set. Since the `snap` // peers are tied to the existence of an `eth` connection, that will always be a // subset of `eth`. diff --git a/eth/protocols/eth/broadcast.go b/eth/protocols/eth/broadcast.go index 132eac0102..06f2a71c9d 100644 --- a/eth/protocols/eth/broadcast.go +++ b/eth/protocols/eth/broadcast.go @@ -200,3 +200,21 @@ func (p *Peer) announceTransactions() { } } } + +// broadcastVotes is a write loop that schedules votes broadcasts +// to the remote peer. The goal is to have an async writer that does not lock up +// node internals and at the same time rate limits queued data. +func (p *Peer) broadcastVotes() { + for { + select { + case prop := <-p.queuedBlocks: + if err := p.SendNewBlock(prop.block, prop.td); err != nil { + return + } + p.Log().Trace("Propagated block", "number", prop.block.Number(), "hash", prop.block.Hash(), "td", prop.td) + + case <-p.term: + return + } + } +} diff --git a/eth/protocols/eth/handler.go b/eth/protocols/eth/handler.go index 72ccbd34a0..a8278e71c0 100644 --- a/eth/protocols/eth/handler.go +++ b/eth/protocols/eth/handler.go @@ -82,10 +82,6 @@ type Backend interface { // VotePool retrieves the votes pool object to serve data. VotePool() VotePool - // AcceptVotes retrieves whether votes processing is enabled on the node - // or if inbound votes should simply be dropped. - AcceptVotes() bool - // RunPeer is invoked when a peer joins on the `eth` protocol. The handler // should do any peer maintenance work, handshakes and validations. If all // is passed, control should be given back to the `handler` to process the diff --git a/eth/protocols/eth/peer.go b/eth/protocols/eth/peer.go index d3b027ac57..c6ff8e0bbf 100644 --- a/eth/protocols/eth/peer.go +++ b/eth/protocols/eth/peer.go @@ -90,9 +90,9 @@ type Peer struct { txBroadcast chan []common.Hash // Channel used to queue transaction propagation requests txAnnounce chan []common.Hash // Channel used to queue transaction announcement requests - votepool VotePool // Votes pool used by the broadcasters - knownVotes mapset.Set // Set of vote hashes known to be known by this peer - voteBroadcast chan *types.VoteRecords // Channel used to queue votes propagation requests + votepool VotePool // Votes pool used by the broadcasters + knownVotes mapset.Set // Set of vote hashes known to be known by this peer + voteBroadcast chan types.VoteRecords // Channel used to queue votes propagation requests term chan struct{} // Termination channel to stop the broadcasters txTerm chan struct{} // Termination channel to stop the tx broadcasters @@ -116,7 +116,7 @@ func NewPeer(version uint, p *p2p.Peer, rw p2p.MsgReadWriter, txpool TxPool, vot txBroadcast: make(chan []common.Hash), txAnnounce: make(chan []common.Hash), txpool: txpool, - voteBroadcast: make(chan *types.VoteRecords), + voteBroadcast: make(chan types.VoteRecords), votepool: votepool, term: make(chan struct{}), txTerm: make(chan struct{}), @@ -128,6 +128,9 @@ func NewPeer(version uint, p *p2p.Peer, rw p2p.MsgReadWriter, txpool TxPool, vot if version >= ETH65 { go peer.announceTransactions() } + if version >= ETH68 { + go peer.broadcastVotes() + } return peer } @@ -411,22 +414,22 @@ func (p *Peer) SendVotes(votes types.VoteRecords) error { // AsyncSendVotes queues a batch of vote hashes for propagation to a remote peer. If // the peer's broadcast queue is full, the event is silently dropped. -func (p *Peer) AsyncSendVotes(votes *types.VoteRecords) { +func (p *Peer) AsyncSendVotes(votes types.VoteRecords) { select { case p.voteBroadcast <- votes: // Mark all the transactions as known, but ensure we don't overflow our limits - for p.knownVotes.Cardinality() > max(0, maxKnownVotes-len(*votes)) { + for p.knownVotes.Cardinality() > max(0, maxKnownVotes-len(votes)) { p.knownVotes.Pop() } - for _, vote := range *votes { + for _, vote := range votes { p.knownVotes.Add(vote.Hash()) } case <-p.voteTerm: - p.Log().Debug("Dropping vote propagation", "count", len(*votes)) + p.Log().Debug("Dropping vote propagation", "count", len(votes)) case <-p.term: - p.Log().Debug("Dropping vote propagation", "count", len(*votes)) + p.Log().Debug("Dropping vote propagation", "count", len(votes)) } } diff --git a/eth/sync.go b/eth/sync.go index d251427a79..e1684b890e 100644 --- a/eth/sync.go +++ b/eth/sync.go @@ -82,7 +82,7 @@ func (h *handler) syncTransactions(p *eth.Peer) { func (h *handler) syncVotes(p *eth.Peer) { votes := h.votepool.GetVotes() - if len(*votes) == 0 { + if len(votes) == 0 { return } // The eth/65 protocol introduces proper transaction announcements, so instead From 9ae0983ca167286e191f69b5821a8c47557bbd61 Mon Sep 17 00:00:00 2001 From: Roshan Date: Tue, 1 Mar 2022 15:13:43 +0800 Subject: [PATCH 4/4] Fix review comments --- core/events.go | 2 +- core/types/vote.go | 33 ++++++++++++++++++++++++++++----- eth/handler.go | 20 ++++++++++---------- eth/handler_eth.go | 4 ++-- eth/protocols/eth/broadcast.go | 9 ++++++--- eth/protocols/eth/handler.go | 2 +- eth/protocols/eth/peer.go | 14 +++++++------- eth/protocols/eth/protocol.go | 2 +- eth/sync.go | 14 ++++++-------- 9 files changed, 62 insertions(+), 38 deletions(-) diff --git a/core/events.go b/core/events.go index fcf1da360e..60e929c982 100644 --- a/core/events.go +++ b/core/events.go @@ -34,7 +34,7 @@ type NewMinedBlockEvent struct{ Block *types.Block } type RemovedLogsEvent struct{ Logs []*types.Log } // NewVotesEvent is posted when a batch of votes enters the vote pool. -type NewVotesEvent struct{ Votes []*types.VoteRecord } +type NewVotesEvent struct{ Votes []*types.VoteEnvelope } type ChainEvent struct { Block *types.Block diff --git a/core/types/vote.go b/core/types/vote.go index 05a9f9c333..85a80649fe 100644 --- a/core/types/vote.go +++ b/core/types/vote.go @@ -1,6 +1,10 @@ package types -import "github.com/ethereum/go-ethereum/common" +import ( + "sync/atomic" + + "github.com/ethereum/go-ethereum/common" +) const ( BLSPublicKeyLength = 48 @@ -15,14 +19,33 @@ type VoteData struct { BlockHash common.Hash } -type VoteRecord struct { +type VoteEnvelope struct { VoteAddress BLSPublicKey Signature BLSSignature Data VoteData + + // caches + hash atomic.Value } -type VoteRecords []*VoteRecord +type VoteEnvelopes []*VoteEnvelope + +// Hash returns the vote hash. +func (v *VoteEnvelope) Hash() common.Hash { + if hash := v.hash.Load(); hash != nil { + return hash.(common.Hash) + } + + h := v.calcVoteHash() + v.hash.Store(h) + return h +} -func (v *VoteRecord) Hash() common.Hash { - return rlpHash(v) +func (v *VoteEnvelope) calcVoteHash() common.Hash { + voteData := struct { + VoteAddress BLSPublicKey + Signature BLSSignature + Data VoteData + }{v.VoteAddress, v.Signature, v.Data} + return rlpHash(voteData) } diff --git a/eth/handler.go b/eth/handler.go index 2b22074669..9d928134f3 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -45,7 +45,7 @@ import ( const ( // txChanSize is the size of channel listening to NewTxsEvent. // The number is referenced from the size of tx pool. - txChanSize = 4096 + txChanSize = 256 voteChanSize = 4096 ) @@ -83,9 +83,9 @@ type txPool interface { // votePool defines the methods needed from a votes pool implementation to // support all the operations needed by the Ethereum chain protocols. type votePool interface { - PutVote(vote types.VoteRecord) error - Get(hash common.Hash) *types.VoteRecord - GetVotes() types.VoteRecords + PutVote(vote *types.VoteEnvelope) error + Get(hash common.Hash) *types.VoteEnvelope + GetVotes() types.VoteEnvelopes // SubscribeNewVotesEvent should return an event subscription of // NewVotesEvent and send events to the given channel. @@ -350,9 +350,10 @@ func (h *handler) runEthPeer(peer *eth.Peer, handler eth.Handler) error { } h.chainSync.handlePeerEvent(peer) - // Propagate existing transactions. new transactions appearing + // Propagate existing transactions and votes. new transactions and votes appearing // after this will be sent via broadcasts. h.syncTransactions(peer) + h.syncVotes(peer) // If we have a trusted CHT, reject all peers below that (avoid fast sync eclipse) if h.checkpointHash != (common.Hash{}) { @@ -604,21 +605,20 @@ func (h *handler) ReannounceTransactions(txs types.Transactions) { "announce packs", peersCount, "announced hashes", peersCount*uint(len(hashes))) } -// BroadcastVotes will propagate a batch of votes to a square root of all peers +// BroadcastVotes will propagate a batch of votes to all peers // which are not known to already have the given vote. -func (h *handler) BroadcastVotes(votes types.VoteRecords) { +func (h *handler) BroadcastVotes(votes types.VoteEnvelopes) { var ( directCount int // Count of announcements made directPeers int - voteset = make(map[*ethPeer][]*types.VoteRecord) // Set peer->hash to transfer directly + voteset = make(map[*ethPeer][]*types.VoteEnvelope) // Set peer->hash to transfer directly ) // Broadcast votes to a batch of peers not knowing about it for _, vote := range votes { peers := h.peers.peersWithoutVote(vote.Hash()) - numDirect := int(math.Sqrt(float64(len(peers)))) - for _, peer := range peers[:numDirect] { + for _, peer := range peers { voteset[peer] = append(voteset[peer], vote) } } diff --git a/eth/handler_eth.go b/eth/handler_eth.go index 46fd52511c..be0858da73 100644 --- a/eth/handler_eth.go +++ b/eth/handler_eth.go @@ -232,10 +232,10 @@ func (h *ethHandler) handleBlockBroadcast(peer *eth.Peer, block *types.Block, td // handleVotesBroadcast is invoked from a peer's message handler when it transmits a // votes broadcast for the local node to process. -func (h *ethHandler) handleVotesBroadcast(peer *eth.Peer, votes []*types.VoteRecord) error { +func (h *ethHandler) handleVotesBroadcast(peer *eth.Peer, votes []*types.VoteEnvelope) error { // Try to put votes into votepool for _, vote := range votes { - if err := h.votepool.PutVote(*vote); err != nil { + if err := h.votepool.PutVote(vote); err != nil { return err } } diff --git a/eth/protocols/eth/broadcast.go b/eth/protocols/eth/broadcast.go index 06f2a71c9d..9976ff70b5 100644 --- a/eth/protocols/eth/broadcast.go +++ b/eth/protocols/eth/broadcast.go @@ -207,11 +207,14 @@ func (p *Peer) announceTransactions() { func (p *Peer) broadcastVotes() { for { select { - case prop := <-p.queuedBlocks: - if err := p.SendNewBlock(prop.block, prop.td); err != nil { + case votes := <-p.voteBroadcast: + if err := p.SendVotes(votes); err != nil { return } - p.Log().Trace("Propagated block", "number", prop.block.Number(), "hash", prop.block.Hash(), "td", prop.td) + p.Log().Trace("Sent votes", "count", len(votes)) + + case <-p.voteTerm: + return case <-p.term: return diff --git a/eth/protocols/eth/handler.go b/eth/protocols/eth/handler.go index a8278e71c0..be7b2f2bdc 100644 --- a/eth/protocols/eth/handler.go +++ b/eth/protocols/eth/handler.go @@ -105,7 +105,7 @@ type TxPool interface { type VotePool interface { // Get retrieves the vote from the local votepool with the given hash. - Get(hash common.Hash) *types.VoteRecord + Get(hash common.Hash) *types.VoteEnvelope } // MakeProtocols constructs the P2P protocol definitions for `eth`. diff --git a/eth/protocols/eth/peer.go b/eth/protocols/eth/peer.go index c6ff8e0bbf..bd0071d224 100644 --- a/eth/protocols/eth/peer.go +++ b/eth/protocols/eth/peer.go @@ -90,9 +90,9 @@ type Peer struct { txBroadcast chan []common.Hash // Channel used to queue transaction propagation requests txAnnounce chan []common.Hash // Channel used to queue transaction announcement requests - votepool VotePool // Votes pool used by the broadcasters - knownVotes mapset.Set // Set of vote hashes known to be known by this peer - voteBroadcast chan types.VoteRecords // Channel used to queue votes propagation requests + votepool VotePool // Votes pool used by the broadcasters + knownVotes mapset.Set // Set of vote hashes known to be known by this peer + voteBroadcast chan types.VoteEnvelopes // Channel used to queue votes propagation requests term chan struct{} // Termination channel to stop the broadcasters txTerm chan struct{} // Termination channel to stop the tx broadcasters @@ -116,7 +116,7 @@ func NewPeer(version uint, p *p2p.Peer, rw p2p.MsgReadWriter, txpool TxPool, vot txBroadcast: make(chan []common.Hash), txAnnounce: make(chan []common.Hash), txpool: txpool, - voteBroadcast: make(chan types.VoteRecords), + voteBroadcast: make(chan types.VoteEnvelopes), votepool: votepool, term: make(chan struct{}), txTerm: make(chan struct{}), @@ -218,7 +218,7 @@ func (p *Peer) markTransaction(hash common.Hash) { // markVote marks a vote as known for the peer, ensuring that it // will never be propagated to this particular peer. func (p *Peer) markVote(hash common.Hash) { - // If we reached the memory allowance, drop a previously known transaction hash + // If we reached the memory allowance, drop a previously known vote hash for p.knownVotes.Cardinality() >= maxKnownVotes { p.knownVotes.Pop() } @@ -401,7 +401,7 @@ func (p *Peer) AsyncSendNewBlock(block *types.Block, td *big.Int) { } // SendVotes propagates a batch of votes to the remote peer. -func (p *Peer) SendVotes(votes types.VoteRecords) error { +func (p *Peer) SendVotes(votes types.VoteEnvelopes) error { // Mark all the transactions as known, but ensure we don't overflow our limits for p.knownVotes.Cardinality() > max(0, maxKnownTxs-len(votes)) { p.knownVotes.Pop() @@ -414,7 +414,7 @@ func (p *Peer) SendVotes(votes types.VoteRecords) error { // AsyncSendVotes queues a batch of vote hashes for propagation to a remote peer. If // the peer's broadcast queue is full, the event is silently dropped. -func (p *Peer) AsyncSendVotes(votes types.VoteRecords) { +func (p *Peer) AsyncSendVotes(votes types.VoteEnvelopes) { select { case p.voteBroadcast <- votes: // Mark all the transactions as known, but ensure we don't overflow our limits diff --git a/eth/protocols/eth/protocol.go b/eth/protocols/eth/protocol.go index 7b09c48917..8d311c3c04 100644 --- a/eth/protocols/eth/protocol.go +++ b/eth/protocols/eth/protocol.go @@ -359,7 +359,7 @@ type PooledTransactionsRLPPacket66 struct { } // VotesPacket is the network packet for votes record. -type VotesPacket []*types.VoteRecord +type VotesPacket []*types.VoteEnvelope func (*VotesPacket) Name() string { return "Votes" } func (*VotesPacket) Kind() byte { return VotesMsg } diff --git a/eth/sync.go b/eth/sync.go index e1684b890e..1c7eb36262 100644 --- a/eth/sync.go +++ b/eth/sync.go @@ -81,17 +81,15 @@ func (h *handler) syncTransactions(p *eth.Peer) { } func (h *handler) syncVotes(p *eth.Peer) { - votes := h.votepool.GetVotes() - if len(votes) == 0 { - return - } - // The eth/65 protocol introduces proper transaction announcements, so instead - // of dripping transactions across multiple peers, just send the entire list as - // an announcement and let the remote side decide what they need (likely nothing). + // Vote is introduces since the eth/68 protocol. if p.Version() >= eth.ETH68 { + votes := h.votepool.GetVotes() + if len(votes) == 0 { + return + } p.AsyncSendVotes(votes) - return } + return } // txsyncLoop64 takes care of the initial transaction sync for each new