Skip to content

Commit

Permalink
Update Pubsub Library To Use Gossipsub 1.2 (prysmaticlabs#14428)
Browse files Browse the repository at this point in the history
* Update Libp2p Packages

* Update CHANGELOG

* Finally Fix All Tests

* Fix Build

* Fix Build

* Fix TestP2P Connection Initialization

* Fix TestP2P Host Options

* Fix Test By Removing WaitGroup
  • Loading branch information
nisdas authored Sep 24, 2024
1 parent 315c05b commit 7e5738b
Show file tree
Hide file tree
Showing 9 changed files with 207 additions and 233 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ The format is based on Keep a Changelog, and this project adheres to Semantic Ve
- Updated k8s-io/client-go to v0.30.4 and k8s-io/apimachinery to v0.30.4
- Migrated tracing library from opencensus to opentelemetry for both the beacon node and validator.
- Refactored light client code to make it more readable and make future PRs easier.
- Updated Libp2p Dependencies to allow prysm to use gossipsub v1.2 .

### Deprecated
- `--disable-grpc-gateway` flag is deprecated due to grpc gateway removal.
Expand Down
2 changes: 0 additions & 2 deletions beacon-chain/p2p/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -179,8 +179,6 @@ go_test(
"@com_github_libp2p_go_libp2p//core/network:go_default_library",
"@com_github_libp2p_go_libp2p//core/peer:go_default_library",
"@com_github_libp2p_go_libp2p//core/protocol:go_default_library",
"@com_github_libp2p_go_libp2p//p2p/host/blank:go_default_library",
"@com_github_libp2p_go_libp2p//p2p/net/swarm/testing:go_default_library",
"@com_github_libp2p_go_libp2p//p2p/security/noise:go_default_library",
"@com_github_libp2p_go_libp2p_pubsub//:go_default_library",
"@com_github_libp2p_go_libp2p_pubsub//pb:go_default_library",
Expand Down
10 changes: 6 additions & 4 deletions beacon-chain/p2p/dial_relay_node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ import (
"fmt"
"testing"

bh "github.com/libp2p/go-libp2p/p2p/host/blank"
swarmt "github.com/libp2p/go-libp2p/p2p/net/swarm/testing"
"github.com/libp2p/go-libp2p"
"github.com/libp2p/go-libp2p/core/network"
"github.com/prysmaticlabs/prysm/v5/testing/assert"
"github.com/prysmaticlabs/prysm/v5/testing/require"
)
Expand All @@ -29,8 +29,10 @@ func TestDialRelayNode_InvalidPeerString(t *testing.T) {

func TestDialRelayNode_OK(t *testing.T) {
ctx := context.Background()
relay := bh.NewBlankHost(swarmt.GenSwarm(t))
host := bh.NewBlankHost(swarmt.GenSwarm(t))
relay, err := libp2p.New(libp2p.ResourceManager(&network.NullResourceManager{}))
require.NoError(t, err)
host, err := libp2p.New(libp2p.ResourceManager(&network.NullResourceManager{}))
require.NoError(t, err)
relayAddr := fmt.Sprintf("%s/p2p/%s", relay.Addrs()[0], relay.ID().String())

assert.NoError(t, dialRelayNode(ctx, host, relayAddr), "Unexpected error when dialing relay node")
Expand Down
5 changes: 3 additions & 2 deletions beacon-chain/p2p/testing/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,11 @@ go_library(
"//beacon-chain/p2p/peers/scorers:go_default_library",
"//proto/prysm/v1alpha1:go_default_library",
"//proto/prysm/v1alpha1/metadata:go_default_library",
"//testing/require:go_default_library",
"@com_github_ethereum_go_ethereum//crypto:go_default_library",
"@com_github_ethereum_go_ethereum//p2p/enode:go_default_library",
"@com_github_ethereum_go_ethereum//p2p/enr:go_default_library",
"@com_github_libp2p_go_libp2p//:go_default_library",
"@com_github_libp2p_go_libp2p//core:go_default_library",
"@com_github_libp2p_go_libp2p//core/connmgr:go_default_library",
"@com_github_libp2p_go_libp2p//core/control:go_default_library",
Expand All @@ -34,8 +36,7 @@ go_library(
"@com_github_libp2p_go_libp2p//core/peer:go_default_library",
"@com_github_libp2p_go_libp2p//core/peerstore:go_default_library",
"@com_github_libp2p_go_libp2p//core/protocol:go_default_library",
"@com_github_libp2p_go_libp2p//p2p/host/blank:go_default_library",
"@com_github_libp2p_go_libp2p//p2p/net/swarm/testing:go_default_library",
"@com_github_libp2p_go_libp2p//p2p/transport/tcp:go_default_library",
"@com_github_libp2p_go_libp2p_pubsub//:go_default_library",
"@com_github_multiformats_go_multiaddr//:go_default_library",
"@com_github_prysmaticlabs_fastssz//:go_default_library",
Expand Down
19 changes: 13 additions & 6 deletions beacon-chain/p2p/testing/p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,22 +11,23 @@ import (
"time"

"github.com/ethereum/go-ethereum/p2p/enr"
"github.com/libp2p/go-libp2p"
pubsub "github.com/libp2p/go-libp2p-pubsub"
core "github.com/libp2p/go-libp2p/core"
"github.com/libp2p/go-libp2p/core/control"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/protocol"
bhost "github.com/libp2p/go-libp2p/p2p/host/blank"
swarmt "github.com/libp2p/go-libp2p/p2p/net/swarm/testing"
"github.com/libp2p/go-libp2p/p2p/transport/tcp"
"github.com/multiformats/go-multiaddr"
ssz "github.com/prysmaticlabs/fastssz"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p/encoder"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p/peers"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p/peers/scorers"
ethpb "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1"
"github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1/metadata"
"github.com/prysmaticlabs/prysm/v5/testing/require"
"github.com/sirupsen/logrus"
"google.golang.org/protobuf/proto"
)
Expand All @@ -52,7 +53,8 @@ type TestP2P struct {
// NewTestP2P initializes a new p2p test service.
func NewTestP2P(t *testing.T) *TestP2P {
ctx := context.Background()
h := bhost.NewBlankHost(swarmt.GenSwarm(t, swarmt.OptDisableQUIC))
h, err := libp2p.New(libp2p.ResourceManager(&network.NullResourceManager{}), libp2p.Transport(tcp.NewTCPTransport), libp2p.DefaultListenAddrs)
require.NoError(t, err)
ps, err := pubsub.NewFloodSub(ctx, h,
pubsub.WithMessageSigning(false),
pubsub.WithStrictSignatureVerification(false),
Expand Down Expand Up @@ -86,13 +88,17 @@ func (p *TestP2P) Connect(b *TestP2P) {
}

func connect(a, b host.Host) error {
pinfo := b.Peerstore().PeerInfo(b.ID())
pinfo := peer.AddrInfo{
ID: b.ID(),
Addrs: b.Addrs(),
}
return a.Connect(context.Background(), pinfo)
}

// ReceiveRPC simulates an incoming RPC.
func (p *TestP2P) ReceiveRPC(topic string, msg proto.Message) {
h := bhost.NewBlankHost(swarmt.GenSwarm(p.t))
h, err := libp2p.New(libp2p.ResourceManager(&network.NullResourceManager{}))
require.NoError(p.t, err)
if err := connect(h, p.BHost); err != nil {
p.t.Fatalf("Failed to connect two peers for RPC: %v", err)
}
Expand Down Expand Up @@ -122,7 +128,8 @@ func (p *TestP2P) ReceiveRPC(topic string, msg proto.Message) {

// ReceivePubSub simulates an incoming message over pubsub on a given topic.
func (p *TestP2P) ReceivePubSub(topic string, msg proto.Message) {
h := bhost.NewBlankHost(swarmt.GenSwarm(p.t))
h, err := libp2p.New(libp2p.ResourceManager(&network.NullResourceManager{}))
require.NoError(p.t, err)
ps, err := pubsub.NewFloodSub(context.Background(), h,
pubsub.WithMessageSigning(false),
pubsub.WithStrictSignatureVerification(false),
Expand Down
12 changes: 6 additions & 6 deletions beacon-chain/sync/rpc_beacon_blocks_by_range_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -433,11 +433,12 @@ func TestRPCBeaconBlocksByRange_RPCHandlerRateLimitOverflow(t *testing.T) {
}
sendRequest := func(p1, p2 *p2ptest.TestP2P, r *Service,
req *ethpb.BeaconBlocksByRangeRequest, validateBlocks bool, success bool) error {
var wg sync.WaitGroup
wg.Add(1)
pcl := protocol.ID(p2p.RPCBlocksByRangeTopicV1)
reqAnswered := false
p2.BHost.SetStreamHandler(pcl, func(stream network.Stream) {
defer wg.Done()
defer func() {
reqAnswered = true
}()
if !validateBlocks {
return
}
Expand All @@ -458,9 +459,8 @@ func TestRPCBeaconBlocksByRange_RPCHandlerRateLimitOverflow(t *testing.T) {
if err := r.beaconBlocksByRangeRPCHandler(context.Background(), req, stream); err != nil {
return err
}
if util.WaitTimeout(&wg, 1*time.Second) {
t.Fatal("Did not receive stream within 1 sec")
}
time.Sleep(100 * time.Millisecond)
assert.Equal(t, reqAnswered, true)
return nil
}

Expand Down
Loading

0 comments on commit 7e5738b

Please sign in to comment.