Skip to content

Commit

Permalink
Don't remarshal within broadcast
Browse files Browse the repository at this point in the history
  • Loading branch information
ValarDragon committed Jul 4, 2024
1 parent 1dc2ac9 commit 0dfcdca
Show file tree
Hide file tree
Showing 5 changed files with 42 additions and 12 deletions.
11 changes: 6 additions & 5 deletions p2p/mock/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,12 @@ func NewPeer(ip net.IP) *Peer {
return mp
}

func (mp *Peer) FlushStop() { mp.Stop() } //nolint:errcheck //ignore error
func (mp *Peer) TrySendEnvelope(e p2p.Envelope) bool { return true }
func (mp *Peer) SendEnvelope(e p2p.Envelope) bool { return true }
func (mp *Peer) TrySend(_ byte, _ []byte) bool { return true }
func (mp *Peer) Send(_ byte, _ []byte) bool { return true }
func (mp *Peer) FlushStop() { mp.Stop() } //nolint:errcheck //ignore error
func (mp *Peer) TrySendEnvelope(e p2p.Envelope) bool { return true }
func (mp *Peer) TrySendMarshalled(e p2p.MarshalledEnvelope) bool { return true }
func (mp *Peer) SendEnvelope(e p2p.Envelope) bool { return true }
func (mp *Peer) TrySend(_ byte, _ []byte) bool { return true }
func (mp *Peer) Send(_ byte, _ []byte) bool { return true }
func (mp *Peer) NodeInfo() p2p.NodeInfo {
return p2p.DefaultNodeInfo{
DefaultNodeID: mp.addr.ID,
Expand Down
19 changes: 14 additions & 5 deletions p2p/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ type Peer interface {

SendEnvelope(Envelope) bool
TrySendEnvelope(Envelope) bool
TrySendMarshalled(MarshalledEnvelope) bool

Set(string, interface{})
Get(string) interface{}
Expand Down Expand Up @@ -261,18 +262,17 @@ func (p *peer) SendEnvelope(e Envelope) bool {
return p.send(e.ChannelID, e.Message, p.mconn.Send)
}

func (p *peer) TrySendMarshalled(e MarshalledEnvelope) bool {
return p.sendMarshalled(e.ChannelID, getMsgType(e.Message), e.MarshalledMessage, p.mconn.TrySend)
}

// TrySendEnvelope attempts to sends the message in the envelope on the channel specified by the
// envelope. Returns false immediately if the connection's internal queue is full
func (p *peer) TrySendEnvelope(e Envelope) bool {
return p.send(e.ChannelID, e.Message, p.mconn.TrySend)
}

func (p *peer) send(chID byte, msg proto.Message, sendFunc func(byte, []byte) bool) bool {
if !p.IsRunning() {
return false
} else if !p.hasChannel(chID) {
return false
}
msgType := getMsgType(msg)
if w, ok := msg.(Wrapper); ok {
msg = w.Wrap()
Expand All @@ -282,6 +282,15 @@ func (p *peer) send(chID byte, msg proto.Message, sendFunc func(byte, []byte) bo
p.Logger.Error("marshaling message to send", "error", err)
return false
}
return p.sendMarshalled(chID, msgType, msgBytes, sendFunc)
}

func (p *peer) sendMarshalled(chID byte, msgType reflect.Type, msgBytes []byte, sendFunc func(byte, []byte) bool) bool {
if !p.IsRunning() {
return false
} else if !p.hasChannel(chID) {
return false
}
res := sendFunc(chID, msgBytes)
if res {
p.pendingMetrics.AddPendingSendBytes(msgType, len(msgBytes))
Expand Down
4 changes: 3 additions & 1 deletion p2p/peer_set_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ type mockPeer struct {
id ID
}

func (mp *mockPeer) FlushStop() { mp.Stop() } //nolint:errcheck // ignore error
func (mp *mockPeer) FlushStop() { mp.Stop() } //nolint:errcheck // ignore error
func (mp *mockPeer) TrySendMarshalled(e MarshalledEnvelope) bool { return true }

func (mp *mockPeer) TrySendEnvelope(e Envelope) bool { return true }
func (mp *mockPeer) SendEnvelope(e Envelope) bool { return true }
func (mp *mockPeer) TrySend(_ byte, _ []byte) bool { return true }
Expand Down
14 changes: 13 additions & 1 deletion p2p/switch.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,10 +285,22 @@ func (sw *Switch) BroadcastEnvelope(e Envelope) {
func (sw *Switch) TryBroadcast(e Envelope) {
sw.Logger.Debug("TryBroadcast", "channel", e.ChannelID)

marshalMsg := e.Message
if wrapper, ok := e.Message.(Wrapper); ok {
marshalMsg = wrapper.Wrap()
}
marshalledMsg, err := proto.Marshal(marshalMsg)
if err != nil {
return
}
marshalledEnvelope := MarshalledEnvelope{
Envelope: e,
MarshalledMessage: marshalledMsg,
}
peers := sw.peers.List()
for _, peer := range peers {
go func(p Peer) {
p.TrySendEnvelope(e)
p.TrySendMarshalled(marshalledEnvelope)
}(peer)
}
}
Expand Down
6 changes: 6 additions & 0 deletions p2p/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,12 @@ type Envelope struct {
ChannelID byte
}

// MarshalledEnvelope contains a proto message, its marshalled message, with sender routing info.
type MarshalledEnvelope struct {
Envelope
MarshalledMessage []byte
}

// Unwrapper is a Protobuf message that can contain a variety of inner messages
// (e.g. via oneof fields). If a Channel's message type implements Unwrapper, the
// p2p layer will automatically unwrap inbound messages so that reactors do not have to do this themselves.
Expand Down

0 comments on commit 0dfcdca

Please sign in to comment.