diff --git a/config/other.go b/config/other.go index c6d5542..a570c2c 100644 --- a/config/other.go +++ b/config/other.go @@ -142,7 +142,7 @@ func setDefaults(conf *Config, bus awlevent.Bus) { conf.P2pNode.BootstrapPeers = make([]string, 0) } if conf.P2pNode.ReconnectionIntervalSec == 0 { - conf.P2pNode.ReconnectionIntervalSec = 10 + conf.P2pNode.ReconnectionIntervalSec = 20 } if conf.P2pNode.ParallelSendingStreamsCount == 0 { conf.P2pNode.ParallelSendingStreamsCount = 1 diff --git a/p2p/p2p.go b/p2p/p2p.go index cfd3c2d..b77808f 100644 --- a/p2p/p2p.go +++ b/p2p/p2p.go @@ -193,6 +193,11 @@ func (p *P2p) ConnectPeer(ctx context.Context, peerID peer.ID) error { if p.IsConnected(peerID) { return nil } + + // FindPeer runs until peer is found in DHT or context is cancelled, so a timeout is mandatory + ctx, cancel := context.WithTimeout(p.ctx, 10*time.Second) + defer cancel() + peerInfo, err := p.FindPeer(ctx, peerID) if err != nil { return fmt.Errorf("could not find peer %s: %v", peerID.String(), err) @@ -299,17 +304,18 @@ func (p *P2p) MaintainBackgroundConnections(ctx context.Context, interval time.D } p.connectToKnownPeers(ctx, interval, knownPeersIdsFunc()) - t := time.NewTicker(interval) - defer t.Stop() + ticker := time.NewTicker(interval) + defer ticker.Stop() for { select { case <-ctx.Done(): return - case <-t.C: + case <-ticker.C: } p.connectToKnownPeers(ctx, interval, knownPeersIdsFunc()) + ticker.Reset(interval) } } diff --git a/service/auth_status.go b/service/auth_status.go index 4fd3af8..cd89ffa 100644 --- a/service/auth_status.go +++ b/service/auth_status.go @@ -3,6 +3,7 @@ package service import ( "context" "fmt" + "maps" "strings" "sync" "time" @@ -91,7 +92,7 @@ func (s *AuthStatus) StatusStreamHandler(stream network.Stream) { s.logger.Errorf("sending status info to %s as an answer: %v", peerID, err) } - s.logger.Infof("successfully exchanged status info with %s (%s)", knownPeer.DisplayName(), peerID) + s.logger.Infof("successfully exchanged status info (inbound) with %s (%s)", knownPeer.DisplayName(), peerID) if isBlocked { return } @@ -133,6 +134,7 @@ func (s *AuthStatus) ExchangeNewStatusInfo(ctx context.Context, remotePeerID pee return fmt.Errorf("receiving status info: %v", err) } + s.logger.Infof("successfully exchanged status info (outbound) with %s (%s)", knownPeer.DisplayName(), remotePeerID.String()) if isBlocked { return nil } @@ -346,10 +348,7 @@ func (s *AuthStatus) ExchangeStatusInfoWithAllKnownPeers(ctx context.Context) { func (s *AuthStatus) BackgroundRetryAuthRequests(ctx context.Context) { f := func() { s.authsLock.RLock() - outgoingAuthsCopy := make(map[peer.ID]protocol.AuthPeer, len(s.outgoingAuths)) - for key, val := range s.outgoingAuths { - outgoingAuthsCopy[key] = val - } + outgoingAuthsCopy := maps.Clone(s.outgoingAuths) s.authsLock.RUnlock() for peerID, auth := range outgoingAuthsCopy { @@ -366,6 +365,7 @@ func (s *AuthStatus) BackgroundRetryAuthRequests(ctx context.Context) { return case <-ticker.C: f() + ticker.Reset(backgroundRetryAuthRequests) } } } @@ -380,6 +380,7 @@ func (s *AuthStatus) BackgroundExchangeStatusInfo(ctx context.Context) { return case <-ticker.C: s.ExchangeStatusInfoWithAllKnownPeers(ctx) + ticker.Reset(backgroundExchangeStatusInfoInterval) } } }