diff --git a/core/network/network.go b/core/network/network.go index 66b0a1cd34..9148fc3309 100644 --- a/core/network/network.go +++ b/core/network/network.go @@ -61,10 +61,13 @@ const ( // CannotConnect means recently attempted connecting but failed to connect. // (should signal "made effort, failed") CannotConnect + + // Transient means we have a transient connection to the peer, but aren't fully connected. + Transient ) func (c Connectedness) String() string { - str := [...]string{"NotConnected", "Connected", "CanConnect", "CannotConnect"} + str := [...]string{"NotConnected", "Connected", "CanConnect", "CannotConnect", "Transient"} if c < 0 || int(c) >= len(str) { return unrecognized } diff --git a/p2p/host/basic/basic_host.go b/p2p/host/basic/basic_host.go index 8e6e8efe7c..5c75ae751b 100644 --- a/p2p/host/basic/basic_host.go +++ b/p2p/host/basic/basic_host.go @@ -723,8 +723,10 @@ func (h *BasicHost) Connect(ctx context.Context, pi peer.AddrInfo) error { h.Peerstore().AddAddrs(pi.ID, pi.Addrs, peerstore.TempAddrTTL) forceDirect, _ := network.GetForceDirectDial(ctx) + canUseTransient, _ := network.GetUseTransient(ctx) if !forceDirect { - if h.Network().Connectedness(pi.ID) == network.Connected { + connectedness := rh.Network().Connectedness(pi.ID) + if connectedness == network.Connected || (canUseTransient && connectedness == network.Transient) { return nil } } diff --git a/p2p/host/pstoremanager/pstoremanager.go b/p2p/host/pstoremanager/pstoremanager.go index 2a22b2caee..82c55e8cbf 100644 --- a/p2p/host/pstoremanager/pstoremanager.go +++ b/p2p/host/pstoremanager/pstoremanager.go @@ -103,15 +103,16 @@ func (m *PeerstoreManager) background(ctx context.Context, sub event.Subscriptio ev := e.(event.EvtPeerConnectednessChanged) p := ev.Peer switch ev.Connectedness { - case network.NotConnected: + case network.Connected, network.Transient: + // If we reconnect to the peer before we've cleared the information, + // keep it. This is an optimization to keep the disconnected map + // small. We still need to check that a peer is actually + // disconnected before removing it from the peer store. + delete(disconnected, p) + default: if _, ok := disconnected[p]; !ok { disconnected[p] = time.Now() } - case network.Connected: - // If we reconnect to the peer before we've cleared the information, keep it. - // This is an optimization to keep the disconnected map small. - // We still need to check that a peer is actually disconnected before removing it from the peer store. - delete(disconnected, p) } case <-ticker.C: now := time.Now() diff --git a/p2p/host/routed/routed.go b/p2p/host/routed/routed.go index eb8e58ee7f..a6e43703c9 100644 --- a/p2p/host/routed/routed.go +++ b/p2p/host/routed/routed.go @@ -48,8 +48,10 @@ func Wrap(h host.Host, r Routing) *RoutedHost { func (rh *RoutedHost) Connect(ctx context.Context, pi peer.AddrInfo) error { // first, check if we're already connected unless force direct dial. forceDirect, _ := network.GetForceDirectDial(ctx) + canUseTransient, _ := network.GetUseTransient(ctx) if !forceDirect { - if rh.Network().Connectedness(pi.ID) == network.Connected { + connectedness := rh.Network().Connectedness(pi.ID) + if connectedness == network.Connected || (canUseTransient && connectedness == network.Transient) { return nil } } diff --git a/p2p/net/swarm/swarm.go b/p2p/net/swarm/swarm.go index a76edce6ce..935c29626c 100644 --- a/p2p/net/swarm/swarm.go +++ b/p2p/net/swarm/swarm.go @@ -344,6 +344,7 @@ func (s *Swarm) addConn(tc transport.CapableConn, dir network.Direction) (*Conn, } stat.Direction = dir stat.Opened = time.Now() + isTransient := stat.Transient // Wrap and register the connection. c := &Conn{ @@ -383,8 +384,9 @@ func (s *Swarm) addConn(tc transport.CapableConn, dir network.Direction) (*Conn, return nil, ErrSwarmClosed } + oldState := s.connectednessUnlocked(p) + c.streams.m = make(map[*Stream]struct{}) - isFirstConnection := len(s.conns.m[p]) == 0 s.conns.m[p] = append(s.conns.m[p], c) // Add two swarm refs: @@ -397,8 +399,12 @@ func (s *Swarm) addConn(tc transport.CapableConn, dir network.Direction) (*Conn, c.notifyLk.Lock() s.conns.Unlock() - // Notify goroutines waiting for a direct connection - if !c.Stat().Transient { + newState := network.Transient + if !isTransient { + newState = network.Connected + + // Notify goroutines waiting for a direct connection + // // Go routines interested in waiting for direct connection first acquire this lock // and then acquire s.conns.RLock. Do not acquire this lock before conns.Unlock to // prevent deadlock. @@ -412,10 +418,10 @@ func (s *Swarm) addConn(tc transport.CapableConn, dir network.Direction) (*Conn, // Emit event after releasing `s.conns` lock so that a consumer can still // use swarm methods that need the `s.conns` lock. - if isFirstConnection { + if oldState != newState { s.emitter.Emit(event.EvtPeerConnectednessChanged{ Peer: p, - Connectedness: network.Connected, + Connectedness: newState, }) } @@ -646,10 +652,30 @@ func isDirectConn(c *Conn) bool { // To check if we have an open connection, use `s.Connectedness(p) == // network.Connected`. func (s *Swarm) Connectedness(p peer.ID) network.Connectedness { - if s.bestConnToPeer(p) != nil { - return network.Connected + s.conns.RLock() + defer s.conns.RUnlock() + + s.connectednessUnlocked(p) +} + +func (s *Swarm) connectednessUnlocked(p peer.ID) network.Connectedness { + var haveTransient bool + for _, c := range s.conns.m[p] { + if c.conn.IsClosed() { + // We *will* garbage collect this soon anyways. + continue + } + if c.Stat().Transient { + haveTransient = true + } else { + return network.Connected + } + } + if haveTransient { + return network.Transient + } else { + return network.NotConnected } - return network.NotConnected } // Conns returns a slice of all connections. diff --git a/p2p/protocol/identify/id.go b/p2p/protocol/identify/id.go index 6c07dbdc8e..e3a36843f3 100644 --- a/p2p/protocol/identify/id.go +++ b/p2p/protocol/identify/id.go @@ -746,7 +746,8 @@ func (ids *idService) consumeMessage(mes *pb.Identify, c network.Conn, isPush bo // Taking the lock ensures that we don't concurrently process a disconnect. ids.addrMu.Lock() ttl := peerstore.RecentlyConnectedAddrTTL - if ids.Host.Network().Connectedness(p) == network.Connected { + switch ids.Host.Network().Connectedness(p) { + case network.Transient, network.Connected: ttl = peerstore.ConnectedAddrTTL } @@ -975,13 +976,15 @@ func (nn *netNotifiee) Disconnected(_ network.Network, c network.Conn) { delete(ids.conns, c) ids.connsMu.Unlock() - if ids.Host.Network().Connectedness(c.RemotePeer()) != network.Connected { - // Last disconnect. - // Undo the setting of addresses to peer.ConnectedAddrTTL we did - ids.addrMu.Lock() - defer ids.addrMu.Unlock() - ids.Host.Peerstore().UpdateAddrs(c.RemotePeer(), peerstore.ConnectedAddrTTL, peerstore.RecentlyConnectedAddrTTL) + switch ids.Host.Network().Connectedness(c.RemotePeer()) { + case network.Connected, network.Transient: + return } + // Last disconnect. + // Undo the setting of addresses to peer.ConnectedAddrTTL we did + ids.addrMu.Lock() + defer ids.addrMu.Unlock() + ids.Host.Peerstore().UpdateAddrs(c.RemotePeer(), peerstore.ConnectedAddrTTL, peerstore.RecentlyConnectedAddrTTL) } func (nn *netNotifiee) Listen(n network.Network, a ma.Multiaddr) {}