Skip to content

Commit

Permalink
refactor(p2p)!: Extract TCP transport (cometbft#4148)
Browse files Browse the repository at this point in the history
Closes cometbft#4301

This PR refactors the peer-to-peer (p2p) module by extracting the TCP
transport into its own component, laying groundwork for potential QUIC
transport integration. It introduces a breaking change to the Go API,
particularly in the handling of peer connections and transport
interfaces.

**Important: no logic has been changed; just moving stuff and updating
interfaces.**

# Changes

## Public API

* Updated `Transport` interface

```go
// Transport emits and connects to Peers. The implementation of Peer is left to
// the transport. Each transport is also responsible to filter establishing
// peers specific to its domain.
type Transport interface {
	// NetAddr returns the network address of the local node.
	NetAddr() na.Addr

	// Accept waits for and returns the next connection to the local node.
	Accept() (net.Conn, *na.Addr, error)

	// Dial dials the given address and returns a connection.
	Dial(addr na.Addr) (net.Conn, error)

	// Cleanup any resources associated with the given connection.
	//
	// Must be run when the peer is dropped for any reason.
	Cleanup(conn net.Conn) error
}
```

## New Go packages (extracted from `p2p`)

* `netaddress` - network address
* `nodeinfo` - node's info
*  `nodekey` - ID of the node
*  `transport/tcp` - TCP transport

## Internalized Go packages

* `fuzz` - fuzzing connection

#go-api-breaking

---------

Co-authored-by: Andy Nogueira <me@andynogueira.dev>
  • Loading branch information
melekes and andynog authored Oct 24, 2024
1 parent 88824cd commit eef3464
Show file tree
Hide file tree
Showing 98 changed files with 2,718 additions and 2,515 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
- `[p2p]` Extracted TCP transport into its own package - `transport/tcp`
* Updated `Transport` interface;
* Moved `NetAddress`, `NodeInfo` and `NodeKey` into separate packages -
`netaddress`, `nodeinfo`, `nodekey` accordingly;
* Internalized `fuzz` package.
[\#4301](https://github.com/cometbft/cometbft/issues/4301)
4 changes: 2 additions & 2 deletions abci/tutorials/abci-v2-forum-app/forum.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (
cmtflags "github.com/cometbft/cometbft/libs/cli/flags"
cmtlog "github.com/cometbft/cometbft/libs/log"
nm "github.com/cometbft/cometbft/node"
"github.com/cometbft/cometbft/p2p"
"github.com/cometbft/cometbft/p2p/nodekey"
"github.com/cometbft/cometbft/privval"
"github.com/cometbft/cometbft/proxy"
)
Expand Down Expand Up @@ -57,7 +57,7 @@ func main() {
panic(fmt.Errorf("failed to create Forum Application: %w", err))
}

nodeKey, err := p2p.LoadNodeKey(config.NodeKeyFile())
nodeKey, err := nodekey.LoadNodeKey(config.NodeKeyFile())
if err != nil {
panic(fmt.Errorf("failed to load node key: %w", err))
}
Expand Down
6 changes: 3 additions & 3 deletions cmd/cometbft/commands/gen_node_key.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
"github.com/spf13/cobra"

cmtos "github.com/cometbft/cometbft/internal/os"
"github.com/cometbft/cometbft/p2p"
"github.com/cometbft/cometbft/p2p/nodekey"
)

// GenNodeKeyCmd allows the generation of a node key. It prints node's ID to
Expand All @@ -24,10 +24,10 @@ func genNodeKey(*cobra.Command, []string) error {
return fmt.Errorf("node key at %s already exists", nodeKeyFile)
}

nodeKey, err := p2p.LoadOrGenNodeKey(nodeKeyFile)
nk, err := nodekey.LoadOrGenNodeKey(nodeKeyFile)
if err != nil {
return err
}
fmt.Println(nodeKey.ID())
fmt.Println(nk.ID())
return nil
}
4 changes: 2 additions & 2 deletions cmd/cometbft/commands/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
kt "github.com/cometbft/cometbft/internal/keytypes"
cmtos "github.com/cometbft/cometbft/internal/os"
cmtrand "github.com/cometbft/cometbft/internal/rand"
"github.com/cometbft/cometbft/p2p"
"github.com/cometbft/cometbft/p2p/nodekey"
"github.com/cometbft/cometbft/privval"
"github.com/cometbft/cometbft/types"
cmttime "github.com/cometbft/cometbft/types/time"
Expand Down Expand Up @@ -55,7 +55,7 @@ func initFilesWithConfig(config *cfg.Config) error {
if cmtos.FileExists(nodeKeyFile) {
logger.Info("Found node key", "path", nodeKeyFile)
} else {
if _, err := p2p.LoadOrGenNodeKey(nodeKeyFile); err != nil {
if _, err := nodekey.LoadOrGenNodeKey(nodeKeyFile); err != nil {
return err
}
logger.Info("Generated node key", "path", nodeKeyFile)
Expand Down
6 changes: 3 additions & 3 deletions cmd/cometbft/commands/show_node_id.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (

"github.com/spf13/cobra"

"github.com/cometbft/cometbft/p2p"
"github.com/cometbft/cometbft/p2p/nodekey"
)

// ShowNodeIDCmd dumps node's ID to the standard output.
Expand All @@ -17,11 +17,11 @@ var ShowNodeIDCmd = &cobra.Command{
}

func showNodeID(*cobra.Command, []string) error {
nodeKey, err := p2p.LoadNodeKey(config.NodeKeyFile())
nk, err := nodekey.LoadNodeKey(config.NodeKeyFile())
if err != nil {
return err
}

fmt.Println(nodeKey.ID())
fmt.Println(nk.ID())
return nil
}
7 changes: 4 additions & 3 deletions cmd/cometbft/commands/testnet.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ import (
cfg "github.com/cometbft/cometbft/config"
cmtrand "github.com/cometbft/cometbft/internal/rand"
"github.com/cometbft/cometbft/libs/bytes"
"github.com/cometbft/cometbft/p2p"
na "github.com/cometbft/cometbft/p2p/netaddress"
"github.com/cometbft/cometbft/p2p/nodekey"
"github.com/cometbft/cometbft/privval"
"github.com/cometbft/cometbft/types"
cmttime "github.com/cometbft/cometbft/types/time"
Expand Down Expand Up @@ -251,11 +252,11 @@ func persistentPeersString(config *cfg.Config) (string, error) {
for i := 0; i < nValidators+nNonValidators; i++ {
nodeDir := filepath.Join(outputDir, fmt.Sprintf("%s%d", nodeDirPrefix, i))
config.SetRoot(nodeDir)
nodeKey, err := p2p.LoadNodeKey(config.NodeKeyFile())
nk, err := nodekey.LoadNodeKey(config.NodeKeyFile())
if err != nil {
return "", err
}
persistentPeers[i] = p2p.IDAddressString(nodeKey.ID(), fmt.Sprintf("%s:%d", hostnameOrIP(i), p2pPort))
persistentPeers[i] = na.IDAddressString(nk.ID(), fmt.Sprintf("%s:%d", hostnameOrIP(i), p2pPort))
}
return strings.Join(persistentPeers, ","), nil
}
Expand Down
4 changes: 2 additions & 2 deletions docs/tutorials/forum-application/6.main.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func main() {
os.Exit(1)
}

nodeKey, err := p2p.LoadNodeKey(config.NodeKeyFile())
nodeKey, err := nodekey.LoadNodeKey(config.NodeKeyFile())
if err != nil {
log.Printf("failed to load node key: %v", err)
os.Exit(1)
Expand Down Expand Up @@ -144,7 +144,7 @@ The program then sets up logging using `cmtlog.NewTMLogger(cmtlog.NewSyncWriter(
using `cmtflags.ParseLogLevel(config.LogLevel, logger, cfg.DefaultLogLevel).` If there is an error during log level parsing,
it logs the failure and exits.

The program loads the node key using `p2p.LoadNodeKey(config.NodeKeyFile()).` If there is an error during the loading
The program loads the node key using `nodekey.LoadNodeKey(config.NodeKeyFile()).` If there is an error during the loading
of the node key, it logs the failure and exits.

Next, it loads the private validator using `privval.LoadFilePV(config.PrivValidatorKeyFile(), config.PrivValidatorStateFile()).`
Expand Down
4 changes: 2 additions & 2 deletions docs/tutorials/go-built-in.md
Original file line number Diff line number Diff line change
Expand Up @@ -649,7 +649,7 @@ func main() {
config.PrivValidatorStateFile(),
)

nodeKey, err := p2p.LoadNodeKey(config.NodeKeyFile())
nodeKey, err := nodekey.LoadNodeKey(config.NodeKeyFile())
if err != nil {
log.Fatalf("failed to load node's key: %v", err)
}
Expand Down Expand Up @@ -741,7 +741,7 @@ pv := privval.LoadFilePV(
`nodeKey` is needed to identify the node in a p2p network.

```go
nodeKey, err := p2p.LoadNodeKey(config.NodeKeyFile())
nodeKey, err := nodekey.LoadNodeKey(config.NodeKeyFile())
if err != nil {
return nil, fmt.Errorf("failed to load node's key: %w", err)
}
Expand Down
64 changes: 32 additions & 32 deletions internal/blocksync/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
"github.com/cometbft/cometbft/libs/log"
"github.com/cometbft/cometbft/libs/service"
cmtsync "github.com/cometbft/cometbft/libs/sync"
"github.com/cometbft/cometbft/p2p"
"github.com/cometbft/cometbft/p2p/nodekey"
"github.com/cometbft/cometbft/types"
cmttime "github.com/cometbft/cometbft/types/time"
)
Expand Down Expand Up @@ -79,8 +79,8 @@ type BlockPool struct {
requesters map[int64]*bpRequester
height int64 // the lowest key in requesters.
// peers
peers map[p2p.ID]*bpPeer
bannedPeers map[p2p.ID]time.Time
peers map[nodekey.ID]*bpPeer
bannedPeers map[nodekey.ID]time.Time
sortedPeers []*bpPeer // sorted by curRate, highest first
maxPeerHeight int64 // the biggest reported height

Expand All @@ -92,8 +92,8 @@ type BlockPool struct {
// requests and errors will be sent to requestsCh and errorsCh accordingly.
func NewBlockPool(start int64, requestsCh chan<- BlockRequest, errorsCh chan<- peerError) *BlockPool {
bp := &BlockPool{
peers: make(map[p2p.ID]*bpPeer),
bannedPeers: make(map[p2p.ID]time.Time),
peers: make(map[nodekey.ID]*bpPeer),
bannedPeers: make(map[nodekey.ID]time.Time),
requesters: make(map[int64]*bpRequester),
height: start,
startHeight: start,
Expand Down Expand Up @@ -255,7 +255,7 @@ func (pool *BlockPool) PopRequest() {
// RemovePeerAndRedoAllPeerRequests retries the request at the given height and
// all the requests made to the same peer. The peer is removed from the pool.
// Returns the ID of the removed peer.
func (pool *BlockPool) RemovePeerAndRedoAllPeerRequests(height int64) p2p.ID {
func (pool *BlockPool) RemovePeerAndRedoAllPeerRequests(height int64) nodekey.ID {
pool.mtx.Lock()
defer pool.mtx.Unlock()

Expand All @@ -269,7 +269,7 @@ func (pool *BlockPool) RemovePeerAndRedoAllPeerRequests(height int64) p2p.ID {

// RedoRequestFrom retries the request at the given height. It does not remove the
// peer.
func (pool *BlockPool) RedoRequestFrom(height int64, peerID p2p.ID) {
func (pool *BlockPool) RedoRequestFrom(height int64, peerID nodekey.ID) {
pool.mtx.Lock()
defer pool.mtx.Unlock()

Expand All @@ -281,7 +281,7 @@ func (pool *BlockPool) RedoRequestFrom(height int64, peerID p2p.ID) {
}

// Deprecated: use RemovePeerAndRedoAllPeerRequests instead.
func (pool *BlockPool) RedoRequest(height int64) p2p.ID {
func (pool *BlockPool) RedoRequest(height int64) nodekey.ID {
return pool.RemovePeerAndRedoAllPeerRequests(height)
}

Expand All @@ -293,7 +293,7 @@ func (pool *BlockPool) RedoRequest(height int64) p2p.ID {
// need to switch over from block sync to consensus at this height. If the
// height of the extended commit and the height of the block do not match, we
// do not add the block and return an error.
func (pool *BlockPool) AddBlock(peerID p2p.ID, block *types.Block, extCommit *types.ExtendedCommit, blockSize int) error {
func (pool *BlockPool) AddBlock(peerID nodekey.ID, block *types.Block, extCommit *types.ExtendedCommit, blockSize int) error {
if extCommit != nil && block.Height != extCommit.Height {
err := fmt.Errorf("block height %d != extCommit height %d", block.Height, extCommit.Height)
// Peer sent us an invalid block => remove it.
Expand Down Expand Up @@ -349,7 +349,7 @@ func (pool *BlockPool) MaxPeerHeight() int64 {
}

// SetPeerRange sets the peer's alleged blockchain base and height.
func (pool *BlockPool) SetPeerRange(peerID p2p.ID, base int64, height int64) {
func (pool *BlockPool) SetPeerRange(peerID nodekey.ID, base int64, height int64) {
pool.mtx.Lock()
defer pool.mtx.Unlock()

Expand Down Expand Up @@ -377,14 +377,14 @@ func (pool *BlockPool) SetPeerRange(peerID p2p.ID, base int64, height int64) {

// RemovePeer removes the peer with peerID from the pool. If there's no peer
// with peerID, function is a no-op.
func (pool *BlockPool) RemovePeer(peerID p2p.ID) {
func (pool *BlockPool) RemovePeer(peerID nodekey.ID) {
pool.mtx.Lock()
defer pool.mtx.Unlock()

pool.removePeer(peerID)
}

func (pool *BlockPool) removePeer(peerID p2p.ID) {
func (pool *BlockPool) removePeer(peerID nodekey.ID) {
for _, requester := range pool.requesters {
if requester.didRequestFrom(peerID) {
requester.redo(peerID)
Expand Down Expand Up @@ -424,18 +424,18 @@ func (pool *BlockPool) updateMaxPeerHeight() {
pool.maxPeerHeight = max
}

func (pool *BlockPool) isPeerBanned(peerID p2p.ID) bool {
func (pool *BlockPool) isPeerBanned(peerID nodekey.ID) bool {
return cmttime.Since(pool.bannedPeers[peerID]) < time.Second*60
}

func (pool *BlockPool) banPeer(peerID p2p.ID) {
func (pool *BlockPool) banPeer(peerID nodekey.ID) {
pool.Logger.Debug("Banning peer", peerID)
pool.bannedPeers[peerID] = cmttime.Now()
}

// Pick an available peer with the given height available.
// If no peers are available, returns nil.
func (pool *BlockPool) pickIncrAvailablePeer(height int64, excludePeerID p2p.ID) *bpPeer {
func (pool *BlockPool) pickIncrAvailablePeer(height int64, excludePeerID nodekey.ID) *bpPeer {
pool.mtx.Lock()
defer pool.mtx.Unlock()

Expand Down Expand Up @@ -481,15 +481,15 @@ func (pool *BlockPool) makeNextRequester(nextHeight int64) {
}

// thread-safe.
func (pool *BlockPool) sendRequest(height int64, peerID p2p.ID) {
func (pool *BlockPool) sendRequest(height int64, peerID nodekey.ID) {
if !pool.IsRunning() {
return
}
pool.requestsCh <- BlockRequest{height, peerID}
}

// thread-safe.
func (pool *BlockPool) sendError(err error, peerID p2p.ID) {
func (pool *BlockPool) sendError(err error, peerID nodekey.ID) {
if !pool.IsRunning() {
return
}
Expand Down Expand Up @@ -526,15 +526,15 @@ type bpPeer struct {
height int64
base int64
pool *BlockPool
id p2p.ID
id nodekey.ID
recvMonitor *flow.Monitor

timeout *time.Timer

logger log.Logger
}

func newBPPeer(pool *BlockPool, peerID p2p.ID, base int64, height int64) *bpPeer {
func newBPPeer(pool *BlockPool, peerID nodekey.ID, base int64, height int64) *bpPeer {
peer := &bpPeer{
pool: pool,
id: peerID,
Expand Down Expand Up @@ -608,13 +608,13 @@ type bpRequester struct {
pool *BlockPool
height int64
gotBlockCh chan struct{}
redoCh chan p2p.ID // redo may got multiple messages, add peerId to identify repeat
redoCh chan nodekey.ID // redo may got multiple messages, add peerId to identify repeat
newHeightCh chan int64

mtx cmtsync.Mutex
peerID p2p.ID
secondPeerID p2p.ID // alternative peer to request from (if close to pool's height)
gotBlockFrom p2p.ID
peerID nodekey.ID
secondPeerID nodekey.ID // alternative peer to request from (if close to pool's height)
gotBlockFrom nodekey.ID
block *types.Block
extCommit *types.ExtendedCommit
}
Expand All @@ -624,7 +624,7 @@ func newBPRequester(pool *BlockPool, height int64) *bpRequester {
pool: pool,
height: height,
gotBlockCh: make(chan struct{}, 1),
redoCh: make(chan p2p.ID, 1),
redoCh: make(chan nodekey.ID, 1),
newHeightCh: make(chan int64, 1),

peerID: "",
Expand All @@ -641,7 +641,7 @@ func (bpr *bpRequester) OnStart() error {
}

// Returns true if the peer(s) match and block doesn't already exist.
func (bpr *bpRequester) setBlock(block *types.Block, extCommit *types.ExtendedCommit, peerID p2p.ID) bool {
func (bpr *bpRequester) setBlock(block *types.Block, extCommit *types.ExtendedCommit, peerID nodekey.ID) bool {
bpr.mtx.Lock()
if bpr.peerID != peerID && bpr.secondPeerID != peerID {
bpr.mtx.Unlock()
Expand Down Expand Up @@ -677,10 +677,10 @@ func (bpr *bpRequester) getExtendedCommit() *types.ExtendedCommit {
}

// Returns the IDs of peers we've requested a block from.
func (bpr *bpRequester) requestedFrom() []p2p.ID {
func (bpr *bpRequester) requestedFrom() []nodekey.ID {
bpr.mtx.Lock()
defer bpr.mtx.Unlock()
peerIDs := make([]p2p.ID, 0, 2)
peerIDs := make([]nodekey.ID, 0, 2)
if bpr.peerID != "" {
peerIDs = append(peerIDs, bpr.peerID)
}
Expand All @@ -691,21 +691,21 @@ func (bpr *bpRequester) requestedFrom() []p2p.ID {
}

// Returns true if we've requested a block from the given peer.
func (bpr *bpRequester) didRequestFrom(peerID p2p.ID) bool {
func (bpr *bpRequester) didRequestFrom(peerID nodekey.ID) bool {
bpr.mtx.Lock()
defer bpr.mtx.Unlock()
return bpr.peerID == peerID || bpr.secondPeerID == peerID
}

// Returns the ID of the peer who sent us the block.
func (bpr *bpRequester) gotBlockFromPeerID() p2p.ID {
func (bpr *bpRequester) gotBlockFromPeerID() nodekey.ID {
bpr.mtx.Lock()
defer bpr.mtx.Unlock()
return bpr.gotBlockFrom
}

// Removes the block (IF we got it from the given peer) and resets the peer.
func (bpr *bpRequester) reset(peerID p2p.ID) (removedBlock bool) {
func (bpr *bpRequester) reset(peerID nodekey.ID) (removedBlock bool) {
bpr.mtx.Lock()
defer bpr.mtx.Unlock()

Expand All @@ -729,7 +729,7 @@ func (bpr *bpRequester) reset(peerID p2p.ID) (removedBlock bool) {
// Tells bpRequester to pick another peer and try again.
// NOTE: Nonblocking, and does nothing if another redo
// was already requested.
func (bpr *bpRequester) redo(peerID p2p.ID) {
func (bpr *bpRequester) redo(peerID nodekey.ID) {
select {
case bpr.redoCh <- peerID:
default:
Expand Down Expand Up @@ -864,5 +864,5 @@ OUTER_LOOP:
// delivering the block.
type BlockRequest struct {
Height int64
PeerID p2p.ID
PeerID nodekey.ID
}
Loading

0 comments on commit eef3464

Please sign in to comment.