Skip to content

Commit

Permalink
Removed OpenedStream/ClosedStream since they were removed from networ…
Browse files Browse the repository at this point in the history
…k.Notifee
  • Loading branch information
alexsporn committed Sep 14, 2022
1 parent 5b35e2a commit f6ab41b
Show file tree
Hide file tree
Showing 2 changed files with 0 additions and 42 deletions.
2 changes: 0 additions & 2 deletions pkg/p2p/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -1099,5 +1099,3 @@ func (m *netNotifiee) Disconnected(net network.Network, conn network.Conn) {
}
m.disconnectedChan <- &disconnectmsg{net: net, conn: conn, reason: errors.New("connection closed by libp2p network event")}
}
func (m *netNotifiee) OpenedStream(_ network.Network, _ network.Stream) {}
func (m *netNotifiee) ClosedStream(_ network.Network, _ network.Stream) {}
40 changes: 0 additions & 40 deletions pkg/protocol/gossip/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/protocol"
"github.com/multiformats/go-multiaddr"
"github.com/pkg/errors"

"github.com/iotaledger/hive.go/core/events"
Expand Down Expand Up @@ -176,7 +175,6 @@ type Service struct {
connectedChan chan *connectionmsg
closeStreamChan chan *closestreammsg
disconnectedChan chan *connectionmsg
streamClosedChan chan *streamclosedmsg
relationUpdatedChan chan *relationupdatedmsg
streamReqChan chan *streamreqmsg
forEachChan chan *foreachmsg
Expand Down Expand Up @@ -224,7 +222,6 @@ func NewService(
connectedChan: make(chan *connectionmsg, 10),
closeStreamChan: make(chan *closestreammsg, 10),
disconnectedChan: make(chan *connectionmsg, 10),
streamClosedChan: make(chan *streamclosedmsg, 10),
relationUpdatedChan: make(chan *relationupdatedmsg, 10),
streamReqChan: make(chan *streamreqmsg, 10),
forEachChan: make(chan *foreachmsg, 10),
Expand Down Expand Up @@ -304,17 +301,11 @@ func (s *Service) Start(ctx context.Context) {
s.inboundStreamChan <- stream
})

// manage libp2p network events
s.host.Network().Notify((*netNotifiee)(s))

s.eventLoop(ctx)

// libp2p stream handler
s.host.RemoveStreamHandler(s.protocol)

// de-register libp2p network events
s.host.Network().StopNotify((*netNotifiee)(s))

s.detachEvents()
s.peeringMngWP.Stop()
}
Expand All @@ -338,8 +329,6 @@ drainLoop:

case <-s.disconnectedChan:

case <-s.streamClosedChan:

case <-s.relationUpdatedChan:

case streamReqMsg := <-s.streamReqChan:
Expand Down Expand Up @@ -369,11 +358,6 @@ type streamreqmsg struct {
back chan *Protocol
}

type streamclosedmsg struct {
peerID peer.ID
stream network.Stream
}

type relationupdatedmsg struct {
peer *p2p.Peer
oldRelation p2p.PeerRelation
Expand Down Expand Up @@ -410,11 +394,6 @@ func (s *Service) eventLoop(ctx context.Context) {
s.Events.Error.Trigger(err)
}

case streamClosedMsg := <-s.streamClosedChan:
if err := s.deregisterProtocol(streamClosedMsg.peerID); err != nil && !errors.Is(err, ErrProtocolDoesNotExist) {
s.Events.Error.Trigger(err)
}

case relationUpdatedMsg := <-s.relationUpdatedChan:
s.handleRelationUpdated(ctx, relationUpdatedMsg.peer, relationUpdatedMsg.oldRelation)

Expand Down Expand Up @@ -729,22 +708,3 @@ func (s *Service) detachEvents() {
s.Events.InboundStreamCanceled.Detach(s.onGossipServiceInboundStreamCanceled)
s.Events.Error.Detach(s.onGossipServiceError)
}

// lets Service implement network.Notifiee in order to automatically
// clean up ongoing reset streams.
type netNotifiee Service

func (m *netNotifiee) Listen(net network.Network, multiaddr multiaddr.Multiaddr) {}
func (m *netNotifiee) ListenClose(net network.Network, multiaddr multiaddr.Multiaddr) {}
func (m *netNotifiee) Connected(net network.Network, conn network.Conn) {}
func (m *netNotifiee) Disconnected(net network.Network, conn network.Conn) {}
func (m *netNotifiee) OpenedStream(net network.Network, stream network.Stream) {}
func (m *netNotifiee) ClosedStream(net network.Network, stream network.Stream) {
if stream.Protocol() != m.protocol {
return
}
if m.stopped.IsSet() {
return
}
m.streamClosedChan <- &streamclosedmsg{peerID: stream.Conn().RemotePeer(), stream: stream}
}

0 comments on commit f6ab41b

Please sign in to comment.