diff --git a/p2p/net/mock/mock_conn.go b/p2p/net/mock/mock_conn.go index f50d1015fe..8c3dc87299 100644 --- a/p2p/net/mock/mock_conn.go +++ b/p2p/net/mock/mock_conn.go @@ -86,20 +86,12 @@ func (c *conn) Close() error { return nil } -func (c *conn) teardown() error { +func (c *conn) teardown() { for _, s := range c.allStreams() { s.Reset() } - c.net.removeConn(c) - go func() { - c.notifLk.Lock() - defer c.notifLk.Unlock() - c.net.notifyAll(func(n network.Notifiee) { - n.Disconnected(c.net, c) - }) - }() - return nil + c.net.removeConn(c) } func (c *conn) addStream(s *stream) { diff --git a/p2p/net/mock/mock_net.go b/p2p/net/mock/mock_net.go index c31bb38713..cde4052369 100644 --- a/p2p/net/mock/mock_net.go +++ b/p2p/net/mock/mock_net.go @@ -14,6 +14,7 @@ import ( "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/peerstore" bhost "github.com/libp2p/go-libp2p/p2p/host/basic" + "github.com/libp2p/go-libp2p/p2p/host/eventbus" "github.com/libp2p/go-libp2p/p2p/host/peerstore/pstoremem" ma "github.com/multiformats/go-multiaddr" @@ -108,7 +109,8 @@ func (mn *mocknet) AddPeer(k ic.PrivKey, a ma.Multiaddr) (host.Host, error) { } func (mn *mocknet) AddPeerWithPeerstore(p peer.ID, ps peerstore.Peerstore) (host.Host, error) { - n, err := newPeernet(mn, p, ps) + bus := eventbus.NewBus() + n, err := newPeernet(mn, p, ps, bus) if err != nil { return nil, err } @@ -116,6 +118,7 @@ func (mn *mocknet) AddPeerWithPeerstore(p peer.ID, ps peerstore.Peerstore) (host opts := &bhost.HostOpts{ NegotiationTimeout: -1, DisableSignedPeerRecord: true, + EventBus: bus, } h, err := bhost.NewHost(n, opts) diff --git a/p2p/net/mock/mock_peernet.go b/p2p/net/mock/mock_peernet.go index 85e636788f..a46ee8ddc9 100644 --- a/p2p/net/mock/mock_peernet.go +++ b/p2p/net/mock/mock_peernet.go @@ -7,10 +7,10 @@ import ( "math/rand" "sync" + "github.com/libp2p/go-libp2p/core/event" "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/peerstore" - ma "github.com/multiformats/go-multiaddr" ) @@ -18,8 +18,9 @@ import ( type peernet struct { mocknet *mocknet // parent - peer peer.ID - ps peerstore.Peerstore + peer peer.ID + ps peerstore.Peerstore + emitter event.Emitter // conns are actual live connections between peers. // many conns could run over each link. @@ -37,11 +38,17 @@ type peernet struct { } // newPeernet constructs a new peernet -func newPeernet(m *mocknet, p peer.ID, ps peerstore.Peerstore) (*peernet, error) { +func newPeernet(m *mocknet, p peer.ID, ps peerstore.Peerstore, bus event.Bus) (*peernet, error) { + emitter, err := bus.Emitter(&event.EvtPeerConnectednessChanged{}) + if err != nil { + return nil, err + } + n := &peernet{ mocknet: m, peer: p, ps: ps, + emitter: emitter, connsByPeer: map[peer.ID]map[*conn]struct{}{}, connsByLink: map[*link]map[*conn]struct{}{}, @@ -57,6 +64,7 @@ func (pn *peernet) Close() error { for _, c := range pn.allConns() { c.Close() } + pn.emitter.Close() return pn.ps.Close() } @@ -192,13 +200,16 @@ func (pn *peernet) addConn(c *conn) { pn.notifyAll(func(n network.Notifiee) { n.Connected(pn, c) }) + + pn.emitter.Emit(event.EvtPeerConnectednessChanged{ + Peer: c.remote, + Connectedness: network.Connected, + }) } // removeConn removes a given conn func (pn *peernet) removeConn(c *conn) { pn.Lock() - defer pn.Unlock() - cs, found := pn.connsByLink[c.link] if !found || len(cs) < 1 { panic(fmt.Sprintf("attempting to remove a conn that doesnt exist %p", c.link)) @@ -210,6 +221,22 @@ func (pn *peernet) removeConn(c *conn) { panic(fmt.Sprintf("attempting to remove a conn that doesnt exist %v", c.remote)) } delete(cs, c) + pn.Unlock() + + // notify asynchronously to mimic Swarm + // FIXME: IIRC, we wanted to make notify for Close synchronous + go func() { + c.notifLk.Lock() + defer c.notifLk.Unlock() + pn.notifyAll(func(n network.Notifiee) { + n.Disconnected(c.net, c) + }) + }() + + c.net.emitter.Emit(event.EvtPeerConnectednessChanged{ + Peer: c.remote, + Connectedness: network.NotConnected, + }) } // LocalPeer the network's LocalPeer @@ -355,18 +382,11 @@ func (pn *peernet) StopNotify(f network.Notifiee) { // notifyAll runs the notification function on all Notifiees func (pn *peernet) notifyAll(notification func(f network.Notifiee)) { pn.notifmu.Lock() - var wg sync.WaitGroup + // notify synchronously to mimic Swarm for n := range pn.notifs { - // make sure we dont block - // and they dont block each other. - wg.Add(1) - go func(n network.Notifiee) { - defer wg.Done() - notification(n) - }(n) + notification(n) } pn.notifmu.Unlock() - wg.Wait() } func (pn *peernet) ResourceManager() network.ResourceManager { diff --git a/p2p/net/mock/mock_test.go b/p2p/net/mock/mock_test.go index e188cfd802..2ea1bf18dd 100644 --- a/p2p/net/mock/mock_test.go +++ b/p2p/net/mock/mock_test.go @@ -12,6 +12,7 @@ import ( "time" "github.com/libp2p/go-libp2p/core/crypto" + "github.com/libp2p/go-libp2p/core/event" "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/protocol" @@ -600,3 +601,83 @@ func TestStreamsWithLatency(t *testing.T) { t.Fatalf("Expected write to take ~%s (+/- %s), but took %s", latency.String(), tolerance.String(), delta.String()) } } + +func TestEventBus(t *testing.T) { + const peers = 2 + + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + + mn, err := FullMeshLinked(peers) + if err != nil { + t.Fatal(err) + } + defer mn.Close() + + sub0, err := mn.Hosts()[0].EventBus().Subscribe(new(event.EvtPeerConnectednessChanged)) + if err != nil { + t.Fatal(err) + } + defer sub0.Close() + sub1, err := mn.Hosts()[1].EventBus().Subscribe(new(event.EvtPeerConnectednessChanged)) + if err != nil { + t.Fatal(err) + } + defer sub1.Close() + + id0, id1 := mn.Hosts()[0].ID(), mn.Hosts()[1].ID() + + _, err = mn.ConnectPeers(id0, id1) + if err != nil { + t.Fatal(err) + } + for range make([]int, peers) { + select { + case evt := <-sub0.Out(): + evnt := evt.(event.EvtPeerConnectednessChanged) + if evnt.Peer != id1 { + t.Fatal("wrong remote peer") + } + if evnt.Connectedness != network.Connected { + t.Fatal("wrong connectedness type") + } + case evt := <-sub1.Out(): + evnt := evt.(event.EvtPeerConnectednessChanged) + if evnt.Peer != id0 { + t.Fatal("wrong remote peer") + } + if evnt.Connectedness != network.Connected { + t.Fatal("wrong connectedness type") + } + case <-ctx.Done(): + t.Fatal("didn't get connectedness events in time") + } + } + + err = mn.DisconnectPeers(id0, id1) + if err != nil { + t.Fatal(err) + } + for range make([]int, peers) { + select { + case evt := <-sub0.Out(): + evnt := evt.(event.EvtPeerConnectednessChanged) + if evnt.Peer != id1 { + t.Fatal("wrong remote peer") + } + if evnt.Connectedness != network.NotConnected { + t.Fatal("wrong connectedness type") + } + case evt := <-sub1.Out(): + evnt := evt.(event.EvtPeerConnectednessChanged) + if evnt.Peer != id0 { + t.Fatal("wrong remote peer") + } + if evnt.Connectedness != network.NotConnected { + t.Fatal("wrong connectedness type") + } + case <-ctx.Done(): + t.Fatal("didn't get connectedness events in time") + } + } +}