Skip to content

Commit

Permalink
perf(p2p/connection): Lower wasted re-allocations in sendRoutine (bac…
Browse files Browse the repository at this point in the history
…kport cometbft#2986) (cometbft#2995)

This PR makes sending packet messages re-use the protowriter for writing
to the channel, rather than remaking it in `writePacketMsgTo`.

In a 1 hour sync benchmark, this saves 10% of the time spent in the
`sendRoutine` (6s), and saves 13GB of heap allocation.

This is a simple fix, so I think its worth doing. Later on, I think we
should move this proto-marshalling to `mConnection.Send`, but that
change will require more robust testing, as it would be a tradeoff of
increasing the CPU time of gossipVotesRoutine and
gossipBlockPartRoutine. (I personally think it will be worth it / were
anyways lowering the CPU time of these routines in total) Will be
writing this later direction idea into an issue.

---

#### PR checklist

- [x] Tests written/updated - should be covered by existing tests
- [ ] Changelog entry added in `.changelog` (we use
[unclog](https://github.com/informalsystems/unclog) to manage our
changelog)
- [x] Updated relevant documentation (`docs/` or `spec/`) and code
comments - I actually this is simpler/dont see anything to update
- [x] Title follows the [Conventional
Commits](https://www.conventionalcommits.org/en/v1.0.0/) spec
<hr>This is an automatic backport of pull request cometbft#2986 done by
[Mergify](https://mergify.com).

---------

Co-authored-by: Dev Ojha <ValarDragon@users.noreply.github.com>
Co-authored-by: Anton Kaliaev <anton.kalyaev@gmail.com>
  • Loading branch information
3 people committed May 4, 2024
1 parent dce1bf0 commit 1c81cc9
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 15 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
- `[p2p/conn]` Speedup connection.WritePacketMsgTo, by reusing internal buffers rather than re-allocating.
([\#2986](https://github.com/cometbft/cometbft/pull/2986))
32 changes: 17 additions & 15 deletions p2p/conn/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,9 +283,10 @@ func (c *MConnection) FlushStop() {
// Send and flush all pending msgs.
// Since sendRoutine has exited, we can call this
// safely
eof := c.sendSomePacketMsgs()
w := protoio.NewDelimitedWriter(c.bufConnWriter)
eof := c.sendSomePacketMsgs(w)
for !eof {
eof = c.sendSomePacketMsgs()
eof = c.sendSomePacketMsgs(w)
}
c.flush()

Expand Down Expand Up @@ -474,7 +475,7 @@ FOR_LOOP:
break FOR_LOOP
case <-c.send:
// Send some PacketMsgs
eof := c.sendSomePacketMsgs()
eof := c.sendSomePacketMsgs(protoWriter)
if !eof {
// Keep sendRoutine awake.
select {
Expand All @@ -501,26 +502,26 @@ FOR_LOOP:

// Returns true if messages from channels were exhausted.
// Blocks in accordance to .sendMonitor throttling.
func (c *MConnection) sendSomePacketMsgs() bool {
func (c *MConnection) sendSomePacketMsgs(w protoio.Writer) bool {
// Block until .sendMonitor says we can write.
// Once we're ready we send more than we asked for,
// but amortized it should even out.
c.sendMonitor.Limit(c._maxPacketMsgSize, atomic.LoadInt64(&c.config.SendRate), true)

// Now send some PacketMsgs.
return c.sendBatchPacketMsgs(numBatchPacketMsgs)
return c.sendBatchPacketMsgs(w, numBatchPacketMsgs)
}

// Returns true if messages from channels were exhausted.
func (c *MConnection) sendBatchPacketMsgs(batchSize int) bool {
func (c *MConnection) sendBatchPacketMsgs(w protoio.Writer, batchSize int) bool {
// Send a batch of PacketMsgs.
for i := 0; i < batchSize; i++ {
channel := selectChannelToGossipOn(c.channels)
// nothing to send across any channel.
if channel == nil {
return true
}
err := c.sendPacketMsgOnChannel(channel)
err := c.sendPacketMsgOnChannel(w, channel)
if err {
return true
}
Expand Down Expand Up @@ -555,11 +556,9 @@ func selectChannelToGossipOn(channels []*Channel) *Channel {
return leastChannel
}

func (c *MConnection) sendPacketMsgOnChannel(sendChannel *Channel) bool {
// c.Logger.Info("Found a msgPacket to send")

func (c *MConnection) sendPacketMsgOnChannel(w protoio.Writer, sendChannel *Channel) bool {
// Make & send a PacketMsg from this channel
_n, err := sendChannel.writePacketMsgTo(c.bufConnWriter)
_n, err := sendChannel.writePacketMsgTo(w)
if err != nil {
c.Logger.Error("Failed to write PacketMsg", "err", err)
c.stopForError(err)
Expand Down Expand Up @@ -861,12 +860,15 @@ func (ch *Channel) nextPacketMsg() tmp2p.PacketMsg {
}

// Writes next PacketMsg to w and updates c.recentlySent.
// Not goroutine-safe
func (ch *Channel) writePacketMsgTo(w io.Writer) (n int, err error) {
// Not goroutine-safe.
func (ch *Channel) writePacketMsgTo(w protoio.Writer) (n int, err error) {
packet := ch.nextPacketMsg()
n, err = protoio.NewDelimitedWriter(w).WriteMsg(mustWrapPacket(&packet))
n, err = w.WriteMsg(mustWrapPacket(&packet))
if err != nil {
return 0, err
}
atomic.AddInt64(&ch.recentlySent, int64(n))
return
return n, nil
}

// Handles incoming PacketMsgs. It returns a message bytes if message is
Expand Down

0 comments on commit 1c81cc9

Please sign in to comment.