Skip to content

Commit

Permalink
perf(p2p/secretconn): Buffer secret connection writes cometbft#3346 (#…
Browse files Browse the repository at this point in the history
…115)

* Buffer secret connection writes

* Add changelog

* Add changelog v2
  • Loading branch information
ValarDragon authored Jun 26, 2024
1 parent 158bcbe commit 9f773de
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 21 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
- `[p2p/secretconn]` Speedup secretconnection large writes, by buffering the write to the underlying connection.
([\#3346](https://github.com/cometbft/cometbft/pull/3346))
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
# CHANGELOG

## v0.37.4-v25-osmo-10

* [#115](https://github.com/osmosis-labs/cometbft/pull/115) perf(p2p/secretconn): Buffer secret connection writes (#3346)


## v0.37.4-v25-osmo-9

* [#112](https://github.com/osmosis-labs/cometbft/pull/112) perf(mempool): Remove expensive debug logs + repeated hashing in mempool. Fix some v0.37.x line hard-to-reach race bugs. (Not in upstream)
Expand Down
15 changes: 9 additions & 6 deletions p2p/conn/evil_secret_connection_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package conn

import (
"bufio"
"bytes"
"errors"
"io"
Expand Down Expand Up @@ -221,12 +222,14 @@ func (c *evilConn) signChallenge() []byte {

b := &buffer{}
c.secretConn = &SecretConnection{
conn: b,
recvBuffer: nil,
recvNonce: new([aeadNonceSize]byte),
sendNonce: new([aeadNonceSize]byte),
recvAead: recvAead,
sendAead: sendAead,
underlyingConn: b,
connReader: b,
connWriter: bufio.NewWriterSize(b, 65536),
recvBuffer: nil,
recvNonce: new([aeadNonceSize]byte),
sendNonce: new([aeadNonceSize]byte),
recvAead: recvAead,
sendAead: sendAead,
}
c.buffer = b

Expand Down
44 changes: 29 additions & 15 deletions p2p/conn/secret_connection.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package conn

import (
"bufio"
"bytes"
"crypto/cipher"
crand "crypto/rand"
Expand Down Expand Up @@ -43,6 +44,9 @@ const (
labelEphemeralUpperPublicKey = "EPHEMERAL_UPPER_PUBLIC_KEY"
labelDHSecret = "DH_SECRET"
labelSecretConnectionMac = "SECRET_CONNECTION_MAC"

defaultWriteBufferSize = 1024 * 1024
defaultReadBufferSize = 65536
)

var (
Expand All @@ -67,7 +71,10 @@ type SecretConnection struct {
sendAead cipher.AEAD

remPubKey crypto.PubKey
conn io.ReadWriteCloser

underlyingConn io.ReadWriteCloser
connWriter *bufio.Writer
connReader io.Reader

// net.Conn must be thread safe:
// https://golang.org/pkg/net/#Conn.
Expand Down Expand Up @@ -144,12 +151,14 @@ func MakeSecretConnection(conn io.ReadWriteCloser, locPrivKey crypto.PrivKey) (*
}

sc := &SecretConnection{
conn: conn,
recvBuffer: nil,
recvNonce: new([aeadNonceSize]byte),
sendNonce: new([aeadNonceSize]byte),
recvAead: recvAead,
sendAead: sendAead,
underlyingConn: conn,
connWriter: bufio.NewWriterSize(conn, defaultWriteBufferSize),
connReader: conn,
recvBuffer: nil,
recvNonce: new([aeadNonceSize]byte),
sendNonce: new([aeadNonceSize]byte),
recvAead: recvAead,
sendAead: sendAead,
}

// Sign the challenge bytes for authentication.
Expand Down Expand Up @@ -213,7 +222,7 @@ func (sc *SecretConnection) Write(data []byte) (n int, err error) {
incrNonce(sc.sendNonce)
// end encryption

_, err = sc.conn.Write(sealedFrame)
_, err = sc.connWriter.Write(sealedFrame)
if err != nil {
return err
}
Expand All @@ -223,6 +232,7 @@ func (sc *SecretConnection) Write(data []byte) (n int, err error) {
return n, err
}
}
sc.connWriter.Flush()
return n, err
}

Expand All @@ -241,7 +251,7 @@ func (sc *SecretConnection) Read(data []byte) (n int, err error) {
// read off the conn
var sealedFrame = pool.Get(aeadSizeOverhead + totalFrameSize)
defer pool.Put(sealedFrame)
_, err = io.ReadFull(sc.conn, sealedFrame)
_, err = io.ReadFull(sc.connReader, sealedFrame)
if err != nil {
return
}
Expand Down Expand Up @@ -273,15 +283,19 @@ func (sc *SecretConnection) Read(data []byte) (n int, err error) {
}

// Implements net.Conn
func (sc *SecretConnection) Close() error { return sc.conn.Close() }
func (sc *SecretConnection) LocalAddr() net.Addr { return sc.conn.(net.Conn).LocalAddr() }
func (sc *SecretConnection) RemoteAddr() net.Addr { return sc.conn.(net.Conn).RemoteAddr() }
func (sc *SecretConnection) SetDeadline(t time.Time) error { return sc.conn.(net.Conn).SetDeadline(t) }
func (sc *SecretConnection) Close() error { return sc.underlyingConn.Close() }
func (sc *SecretConnection) LocalAddr() net.Addr { return sc.underlyingConn.(net.Conn).LocalAddr() }
func (sc *SecretConnection) RemoteAddr() net.Addr { return sc.underlyingConn.(net.Conn).RemoteAddr() }
func (sc *SecretConnection) SetDeadline(t time.Time) error {
return sc.underlyingConn.(net.Conn).SetDeadline(t)
}

func (sc *SecretConnection) SetReadDeadline(t time.Time) error {
return sc.conn.(net.Conn).SetReadDeadline(t)
return sc.underlyingConn.(net.Conn).SetReadDeadline(t)
}

func (sc *SecretConnection) SetWriteDeadline(t time.Time) error {
return sc.conn.(net.Conn).SetWriteDeadline(t)
return sc.underlyingConn.(net.Conn).SetWriteDeadline(t)
}

func genEphKeys() (ephPub, ephPriv *[32]byte) {
Expand Down

0 comments on commit 9f773de

Please sign in to comment.