diff --git a/p2p/mock/peer.go b/p2p/mock/peer.go index 664711dc63..e34f383f34 100644 --- a/p2p/mock/peer.go +++ b/p2p/mock/peer.go @@ -42,11 +42,12 @@ func NewPeer(ip net.IP) *Peer { return mp } -func (mp *Peer) FlushStop() { mp.Stop() } //nolint:errcheck //ignore error -func (mp *Peer) TrySendEnvelope(e p2p.Envelope) bool { return true } -func (mp *Peer) SendEnvelope(e p2p.Envelope) bool { return true } -func (mp *Peer) TrySend(_ byte, _ []byte) bool { return true } -func (mp *Peer) Send(_ byte, _ []byte) bool { return true } +func (mp *Peer) FlushStop() { mp.Stop() } //nolint:errcheck //ignore error +func (mp *Peer) TrySendEnvelope(e p2p.Envelope) bool { return true } +func (mp *Peer) TrySendMarshalled(e p2p.MarshalledEnvelope) bool { return true } +func (mp *Peer) SendEnvelope(e p2p.Envelope) bool { return true } +func (mp *Peer) TrySend(_ byte, _ []byte) bool { return true } +func (mp *Peer) Send(_ byte, _ []byte) bool { return true } func (mp *Peer) NodeInfo() p2p.NodeInfo { return p2p.DefaultNodeInfo{ DefaultNodeID: mp.addr.ID, diff --git a/p2p/peer.go b/p2p/peer.go index 32ece1e233..51ea82b904 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -38,6 +38,7 @@ type Peer interface { SendEnvelope(Envelope) bool TrySendEnvelope(Envelope) bool + TrySendMarshalled(MarshalledEnvelope) bool Set(string, interface{}) Get(string) interface{} @@ -261,6 +262,10 @@ func (p *peer) SendEnvelope(e Envelope) bool { return p.send(e.ChannelID, e.Message, p.mconn.Send) } +func (p *peer) TrySendMarshalled(e MarshalledEnvelope) bool { + return p.sendMarshalled(e.ChannelID, getMsgType(e.Message), e.MarshalledMessage, p.mconn.TrySend) +} + // TrySendEnvelope attempts to sends the message in the envelope on the channel specified by the // envelope. Returns false immediately if the connection's internal queue is full func (p *peer) TrySendEnvelope(e Envelope) bool { @@ -268,11 +273,6 @@ func (p *peer) TrySendEnvelope(e Envelope) bool { } func (p *peer) send(chID byte, msg proto.Message, sendFunc func(byte, []byte) bool) bool { - if !p.IsRunning() { - return false - } else if !p.hasChannel(chID) { - return false - } msgType := getMsgType(msg) if w, ok := msg.(Wrapper); ok { msg = w.Wrap() @@ -282,6 +282,15 @@ func (p *peer) send(chID byte, msg proto.Message, sendFunc func(byte, []byte) bo p.Logger.Error("marshaling message to send", "error", err) return false } + return p.sendMarshalled(chID, msgType, msgBytes, sendFunc) +} + +func (p *peer) sendMarshalled(chID byte, msgType reflect.Type, msgBytes []byte, sendFunc func(byte, []byte) bool) bool { + if !p.IsRunning() { + return false + } else if !p.hasChannel(chID) { + return false + } res := sendFunc(chID, msgBytes) if res { p.pendingMetrics.AddPendingSendBytes(msgType, len(msgBytes)) diff --git a/p2p/peer_set_test.go b/p2p/peer_set_test.go index 27126cc680..7a2554426c 100644 --- a/p2p/peer_set_test.go +++ b/p2p/peer_set_test.go @@ -18,7 +18,9 @@ type mockPeer struct { id ID } -func (mp *mockPeer) FlushStop() { mp.Stop() } //nolint:errcheck // ignore error +func (mp *mockPeer) FlushStop() { mp.Stop() } //nolint:errcheck // ignore error +func (mp *mockPeer) TrySendMarshalled(e MarshalledEnvelope) bool { return true } + func (mp *mockPeer) TrySendEnvelope(e Envelope) bool { return true } func (mp *mockPeer) SendEnvelope(e Envelope) bool { return true } func (mp *mockPeer) TrySend(_ byte, _ []byte) bool { return true } diff --git a/p2p/switch.go b/p2p/switch.go index 78fcb71855..a9e3419207 100644 --- a/p2p/switch.go +++ b/p2p/switch.go @@ -285,10 +285,22 @@ func (sw *Switch) BroadcastEnvelope(e Envelope) { func (sw *Switch) TryBroadcast(e Envelope) { sw.Logger.Debug("TryBroadcast", "channel", e.ChannelID) + marshalMsg := e.Message + if wrapper, ok := e.Message.(Wrapper); ok { + marshalMsg = wrapper.Wrap() + } + marshalledMsg, err := proto.Marshal(marshalMsg) + if err != nil { + return + } + marshalledEnvelope := MarshalledEnvelope{ + Envelope: e, + MarshalledMessage: marshalledMsg, + } peers := sw.peers.List() for _, peer := range peers { go func(p Peer) { - p.TrySendEnvelope(e) + p.TrySendMarshalled(marshalledEnvelope) }(peer) } } diff --git a/p2p/types.go b/p2p/types.go index fbca80660d..ca096fdba2 100644 --- a/p2p/types.go +++ b/p2p/types.go @@ -16,6 +16,12 @@ type Envelope struct { ChannelID byte } +// MarshalledEnvelope contains a proto message, its marshalled message, with sender routing info. +type MarshalledEnvelope struct { + Envelope + MarshalledMessage []byte +} + // Unwrapper is a Protobuf message that can contain a variety of inner messages // (e.g. via oneof fields). If a Channel's message type implements Unwrapper, the // p2p layer will automatically unwrap inbound messages so that reactors do not have to do this themselves.