Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions p2p/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"errors"
"fmt"
"io"
"log"
"sync/atomic"
"time"

Expand Down Expand Up @@ -68,12 +67,14 @@ func (msg Msg) String() string {
return fmt.Sprintf("msg #%v (%v bytes)", msg.Code, msg.Size)
}

// Discard reads any remaining payload data into a black hole.
func (msg Msg) Discard() {
_, err := io.Copy(io.Discard, msg.Payload)
if err != nil {
log.Fatal(err)
// Discard reads any remaining payload data into a black hole and reports failures
// so callers can decide how to react instead of killing the entire process.
func (msg Msg) Discard() error {
if msg.Payload == nil {
return nil
}
_, err := io.Copy(io.Discard, msg.Payload)
return err
}

func (msg Msg) Time() time.Time {
Expand Down Expand Up @@ -238,8 +239,7 @@ func ExpectMsg(r MsgReader, code uint64, content interface{}) error {
return fmt.Errorf("message code mismatch: got %d, expected %d", msg.Code, code)
}
if content == nil {
msg.Discard()
return nil
return msg.Discard()
}
contentEnc, err := rlp.EncodeToBytes(content)
if err != nil {
Expand Down
8 changes: 6 additions & 2 deletions p2p/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,9 @@ func (p *Peer) readLoop(errc chan<- error) {
func (p *Peer) handle(msg Msg) error {
switch {
case msg.Code == pingMsg:
msg.Discard()
if err := msg.Discard(); err != nil {
return NewPeerError(PeerErrorInvalidMessage, DiscNetworkError, err, "failed to discard ping payload")
}
select {
case p.pingRecv <- struct{}{}:
case <-p.closed:
Expand All @@ -372,7 +374,9 @@ func (p *Peer) handle(msg Msg) error {
return reason
case msg.Code < baseProtocolLength:
// ignore other base protocol messages
msg.Discard()
if err := msg.Discard(); err != nil {
return NewPeerError(PeerErrorInvalidMessage, DiscNetworkError, err, "failed to discard base protocol payload")
}
return nil
default:
// it's a subprotocol message
Expand Down
4 changes: 3 additions & 1 deletion p2p/peer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,9 @@ var discard = Protocol{
return NewPeerError(PeerErrorTest, DiscProtocolError, err, "peer_test: 'discard' protocol ReadMsg error")
}
fmt.Printf("discarding %d\n", msg.Code)
msg.Discard()
if err := msg.Discard(); err != nil {
return NewPeerError(PeerErrorInvalidMessage, DiscNetworkError, err, "peer_test: failed to discard message payload")
}
}
},
}
Expand Down
4 changes: 3 additions & 1 deletion p2p/sentry/eth_handshake.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,9 @@ func readAndValidatePeerStatus[T StatusPacket](
if err != nil {
return zero, p2p.NewPeerError(p2p.PeerErrorStatusReceive, p2p.DiscNetworkError, err, "readAndValidatePeerStatus rw.ReadMsg error")
}
defer msg.Discard()
defer func() {
_ = msg.Discard()
}()
if msg.Code != eth.StatusMsg {
return zero, p2p.NewPeerError(p2p.PeerErrorStatusDecode, p2p.DiscProtocolError, fmt.Errorf("first msg has code %x (!= %x)", msg.Code, eth.StatusMsg), "readAndValidatePeerStatus wrong code")
}
Expand Down
16 changes: 12 additions & 4 deletions p2p/sentry/sentry_grpc_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -430,14 +430,18 @@ func runPeer(
}

if msg.Size > eth.ProtocolMaxMsgSize {
msg.Discard()
if err := msg.Discard(); err != nil {
return p2p.NewPeerError(p2p.PeerErrorMessageSizeLimit, p2p.DiscNetworkError, fmt.Errorf("sentry.runPeer: discard oversized payload: %w", err), "sentry.runPeer: failed to discard oversized message payload")
}
return p2p.NewPeerError(p2p.PeerErrorMessageSizeLimit, p2p.DiscSubprotocolError, nil, fmt.Sprintf("sentry.runPeer: message is too large %d, limit %d", msg.Size, eth.ProtocolMaxMsgSize))
}

givePermit := false
switch msg.Code {
case eth.StatusMsg:
msg.Discard()
if err := msg.Discard(); err != nil {
return p2p.NewPeerError(p2p.PeerErrorStatusUnexpected, p2p.DiscNetworkError, fmt.Errorf("sentry.runPeer: discard unexpected status payload: %w", err), "sentry.runPeer: failed to discard unexpected status message")
}
// Status messages should never arrive after the handshake
return p2p.NewPeerError(p2p.PeerErrorStatusUnexpected, p2p.DiscSubprotocolError, nil, "sentry.runPeer: unexpected status message")
case eth.GetBlockHeadersMsg:
Expand Down Expand Up @@ -578,7 +582,9 @@ func runPeer(

trackPeerStatistics(peerInfo.peer.Fullname(), peerInfo.peer.ID().String(), true, msgType.String(), msgCap, int(msg.Size))

msg.Discard()
if err := msg.Discard(); err != nil {
return p2p.NewPeerError(p2p.PeerErrorInvalidMessage, p2p.DiscNetworkError, fmt.Errorf("sentry.runPeer: discard payload: %w", err), "sentry.runPeer: failed to discard message payload")
}
peerInfo.ClearDeadlines(time.Now(), givePermit)
}
}
Expand Down Expand Up @@ -630,7 +636,9 @@ func runWitPeer(
}

if msg.Size > wit.MaxMessageSize {
msg.Discard()
if err := msg.Discard(); err != nil {
return p2p.NewPeerError(p2p.PeerErrorMessageSizeLimit, p2p.DiscNetworkError, fmt.Errorf("sentry.runPeer: discard oversized payload: %w", err), "sentry.runPeer: failed to discard oversized wit message payload")
}
return p2p.NewPeerError(p2p.PeerErrorMessageSizeLimit, p2p.DiscSubprotocolError, nil, fmt.Sprintf("sentry.runPeer: message is too large %d, limit %d", msg.Size, eth.ProtocolMaxMsgSize))
}

Expand Down