Skip to content

Commit

Permalink
eth: check snap satelliteness, delegate drop to eth (ethereum#22235)
Browse files Browse the repository at this point in the history
* eth: check snap satelliteness, delegate drop to eth

* eth: better handle eth/snap satellite relation, merge reg/unreg paths
  • Loading branch information
karalabe authored Feb 2, 2021
1 parent 3c728fb commit e3430ac
Show file tree
Hide file tree
Showing 13 changed files with 201 additions and 236 deletions.
106 changes: 58 additions & 48 deletions eth/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ func newHandler(config *handlerConfig) (*handler, error) {
h.blockFetcher = fetcher.NewBlockFetcher(false, nil, h.chain.GetBlockByHash, validator, h.BroadcastBlock, heighter, nil, inserter, h.removePeer)

fetchTx := func(peer string, hashes []common.Hash) error {
p := h.peers.ethPeer(peer)
p := h.peers.peer(peer)
if p == nil {
return errors.New("unknown peer")
}
Expand All @@ -229,8 +229,17 @@ func newHandler(config *handlerConfig) (*handler, error) {
return h, nil
}

// runEthPeer
// runEthPeer registers an eth peer into the joint eth/snap peerset, adds it to
// various subsistems and starts handling messages.
func (h *handler) runEthPeer(peer *eth.Peer, handler eth.Handler) error {
// If the peer has a `snap` extension, wait for it to connect so we can have
// a uniform initialization/teardown mechanism
snap, err := h.peers.waitSnapExtension(peer)
if err != nil {
peer.Log().Error("Snapshot extension barrier failed", "err", err)
return err
}
// TODO(karalabe): Not sure why this is needed
if !h.chainSync.handlePeerEvent(peer) {
return p2p.DiscQuitting
}
Expand All @@ -251,37 +260,46 @@ func (h *handler) runEthPeer(peer *eth.Peer, handler eth.Handler) error {
return err
}
reject := false // reserved peer slots
if atomic.LoadUint32(&h.snapSync) == 1 && !peer.SupportsCap("snap", 1) {
// If we are running snap-sync, we want to reserve roughly half the peer
// slots for peers supporting the snap protocol.
// The logic here is; we only allow up to 5 more non-snap peers than snap-peers.
if all, snp := h.peers.Len(), h.peers.SnapLen(); all-snp > snp+5 {
reject = true
if atomic.LoadUint32(&h.snapSync) == 1 {
if snap == nil {
// If we are running snap-sync, we want to reserve roughly half the peer
// slots for peers supporting the snap protocol.
// The logic here is; we only allow up to 5 more non-snap peers than snap-peers.
if all, snp := h.peers.len(), h.peers.snapLen(); all-snp > snp+5 {
reject = true
}
}
}
// Ignore maxPeers if this is a trusted peer
if !peer.Peer.Info().Network.Trusted {
if reject || h.peers.Len() >= h.maxPeers {
if reject || h.peers.len() >= h.maxPeers {
return p2p.DiscTooManyPeers
}
}
peer.Log().Debug("Ethereum peer connected", "name", peer.Name())

// Register the peer locally
if err := h.peers.registerEthPeer(peer); err != nil {
if err := h.peers.registerPeer(peer, snap); err != nil {
peer.Log().Error("Ethereum peer registration failed", "err", err)
return err
}
defer h.removePeer(peer.ID())

p := h.peers.ethPeer(peer.ID())
p := h.peers.peer(peer.ID())
if p == nil {
return errors.New("peer dropped during handling")
}
// Register the peer in the downloader. If the downloader considers it banned, we disconnect
if err := h.downloader.RegisterPeer(peer.ID(), peer.Version(), peer); err != nil {
peer.Log().Error("Failed to register peer in eth syncer", "err", err)
return err
}
if snap != nil {
if err := h.downloader.SnapSyncer.Register(snap); err != nil {
peer.Log().Error("Failed to register peer in snap syncer", "err", err)
return err
}
}
h.chainSync.handlePeerEvent(peer)

// Propagate existing transactions. new transactions appearing
Expand Down Expand Up @@ -317,25 +335,23 @@ func (h *handler) runEthPeer(peer *eth.Peer, handler eth.Handler) error {
return handler(peer)
}

// runSnapPeer
func (h *handler) runSnapPeer(peer *snap.Peer, handler snap.Handler) error {
// runSnapExtension registers a `snap` peer into the joint eth/snap peerset and
// starts handling inbound messages. As `snap` is only a satellite protocol to
// `eth`, all subsystem registrations and lifecycle management will be done by
// the main `eth` handler to prevent strange races.
func (h *handler) runSnapExtension(peer *snap.Peer, handler snap.Handler) error {
h.peerWG.Add(1)
defer h.peerWG.Done()

// Register the peer locally
if err := h.peers.registerSnapPeer(peer); err != nil {
peer.Log().Error("Snapshot peer registration failed", "err", err)
return err
}
defer h.removePeer(peer.ID())

if err := h.downloader.SnapSyncer.Register(peer); err != nil {
if err := h.peers.registerSnapExtension(peer); err != nil {
peer.Log().Error("Snapshot extension registration failed", "err", err)
return err
}
// Handle incoming messages until the connection is torn down
return handler(peer)
}

// removePeer unregisters a peer from the downloader and fetchers, removes it from
// the set of tracked peers and closes the network connection to it.
func (h *handler) removePeer(id string) {
// Create a custom logger to avoid printing the entire id
var logger log.Logger
Expand All @@ -345,33 +361,27 @@ func (h *handler) removePeer(id string) {
} else {
logger = log.New("peer", id[:8])
}
// Remove the eth peer if it exists
eth := h.peers.ethPeer(id)
if eth != nil {
logger.Debug("Removing Ethereum peer")
h.downloader.UnregisterPeer(id)
h.txFetcher.Drop(id)

if err := h.peers.unregisterEthPeer(id); err != nil {
logger.Error("Ethereum peer removal failed", "err", err)
}
// Abort if the peer does not exist
peer := h.peers.peer(id)
if peer == nil {
logger.Error("Ethereum peer removal failed", "err", errPeerNotRegistered)
return
}
// Remove the snap peer if it exists
snap := h.peers.snapPeer(id)
if snap != nil {
logger.Debug("Removing Snapshot peer")
// Remove the `eth` peer if it exists
logger.Debug("Removing Ethereum peer", "snap", peer.snapExt != nil)

// Remove the `snap` extension if it exists
if peer.snapExt != nil {
h.downloader.SnapSyncer.Unregister(id)
if err := h.peers.unregisterSnapPeer(id); err != nil {
logger.Error("Snapshot peer removel failed", "err", err)
}
}
// Hard disconnect at the networking layer
if eth != nil {
eth.Peer.Disconnect(p2p.DiscUselessPeer)
}
if snap != nil {
snap.Peer.Disconnect(p2p.DiscUselessPeer)
h.downloader.UnregisterPeer(id)
h.txFetcher.Drop(id)

if err := h.peers.unregisterPeer(id); err != nil {
logger.Error("Ethereum peer removal failed", "err", err)
}
// Hard disconnect at the networking layer
peer.Peer.Disconnect(p2p.DiscUselessPeer)
}

func (h *handler) Start(maxPeers int) {
Expand Down Expand Up @@ -417,7 +427,7 @@ func (h *handler) Stop() {
// will only announce its availability (depending what's requested).
func (h *handler) BroadcastBlock(block *types.Block, propagate bool) {
hash := block.Hash()
peers := h.peers.ethPeersWithoutBlock(hash)
peers := h.peers.peersWithoutBlock(hash)

// If propagation is requested, send to a subset of the peer
if propagate {
Expand Down Expand Up @@ -456,7 +466,7 @@ func (h *handler) BroadcastTransactions(txs types.Transactions, propagate bool)
// Broadcast transactions to a batch of peers not knowing about it
if propagate {
for _, tx := range txs {
peers := h.peers.ethPeersWithoutTransaction(tx.Hash())
peers := h.peers.peersWithoutTransaction(tx.Hash())

// Send the block to a subset of our peers
transfer := peers[:int(math.Sqrt(float64(len(peers))))]
Expand All @@ -472,7 +482,7 @@ func (h *handler) BroadcastTransactions(txs types.Transactions, propagate bool)
}
// Otherwise only broadcast the announcement to peers
for _, tx := range txs {
peers := h.peers.ethPeersWithoutTransaction(tx.Hash())
peers := h.peers.peersWithoutTransaction(tx.Hash())
for _, peer := range peers {
annos[peer] = append(annos[peer], tx.Hash())
}
Expand Down
4 changes: 2 additions & 2 deletions eth/handler_eth.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func (h *ethHandler) RunPeer(peer *eth.Peer, hand eth.Handler) error {

// PeerInfo retrieves all known `eth` information about a peer.
func (h *ethHandler) PeerInfo(id enode.ID) interface{} {
if p := h.peers.ethPeer(id.String()); p != nil {
if p := h.peers.peer(id.String()); p != nil {
return p.info()
}
return nil
Expand Down Expand Up @@ -107,7 +107,7 @@ func (h *ethHandler) Handle(peer *eth.Peer, packet eth.Packet) error {
// handleHeaders is invoked from a peer's message handler when it transmits a batch
// of headers for the local node to process.
func (h *ethHandler) handleHeaders(peer *eth.Peer, headers []*types.Header) error {
p := h.peers.ethPeer(peer.ID())
p := h.peers.peer(peer.ID())
if p == nil {
return errors.New("unregistered during callback")
}
Expand Down
4 changes: 2 additions & 2 deletions eth/handler_eth_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -574,11 +574,11 @@ func testCheckpointChallenge(t *testing.T, syncmode downloader.SyncMode, checkpo

// Verify that the remote peer is maintained or dropped
if drop {
if peers := handler.handler.peers.Len(); peers != 0 {
if peers := handler.handler.peers.len(); peers != 0 {
t.Fatalf("peer count mismatch: have %d, want %d", peers, 0)
}
} else {
if peers := handler.handler.peers.Len(); peers != 1 {
if peers := handler.handler.peers.len(); peers != 1 {
t.Fatalf("peer count mismatch: have %d, want %d", peers, 1)
}
}
Expand Down
8 changes: 5 additions & 3 deletions eth/handler_snap.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,15 @@ func (h *snapHandler) Chain() *core.BlockChain { return h.chain }

// RunPeer is invoked when a peer joins on the `snap` protocol.
func (h *snapHandler) RunPeer(peer *snap.Peer, hand snap.Handler) error {
return (*handler)(h).runSnapPeer(peer, hand)
return (*handler)(h).runSnapExtension(peer, hand)
}

// PeerInfo retrieves all known `snap` information about a peer.
func (h *snapHandler) PeerInfo(id enode.ID) interface{} {
if p := h.peers.snapPeer(id.String()); p != nil {
return p.info()
if p := h.peers.peer(id.String()); p != nil {
if p.snapExt != nil {
return p.snapExt.info()
}
}
return nil
}
Expand Down
9 changes: 4 additions & 5 deletions eth/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,11 @@ type ethPeerInfo struct {
// ethPeer is a wrapper around eth.Peer to maintain a few extra metadata.
type ethPeer struct {
*eth.Peer
snapExt *snapPeer // Satellite `snap` connection

syncDrop *time.Timer // Connection dropper if `eth` sync progress isn't validated in time
lock sync.RWMutex // Mutex protecting the internal fields
syncDrop *time.Timer // Connection dropper if `eth` sync progress isn't validated in time
snapWait chan struct{} // Notification channel for snap connections
lock sync.RWMutex // Mutex protecting the internal fields
}

// info gathers and returns some `eth` protocol metadata known about a peer.
Expand All @@ -61,9 +63,6 @@ type snapPeerInfo struct {
// snapPeer is a wrapper around snap.Peer to maintain a few extra metadata.
type snapPeer struct {
*snap.Peer

ethDrop *time.Timer // Connection dropper if `eth` doesn't connect in time
lock sync.RWMutex // Mutex protecting the internal fields
}

// info gathers and returns some `snap` protocol metadata known about a peer.
Expand Down
Loading

0 comments on commit e3430ac

Please sign in to comment.