Skip to content

Commit

Permalink
query: fix error "leak"
Browse files Browse the repository at this point in the history
Long-running queries can build up large error sets that we never actually use.
This is exacerbated by libp2p/go-libp2p-swarm#115.

fixes libp2p/go-libp2p-swarm#119
  • Loading branch information
Stebalien committed Apr 24, 2019
1 parent ca611b1 commit 57e1af7
Showing 1 changed file with 8 additions and 16 deletions.
24 changes: 8 additions & 16 deletions query.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ package dht

import (
"context"
"errors"
"sync"

u "github.com/ipfs/go-ipfs-util"
logging "github.com/ipfs/go-log"
todoctr "github.com/ipfs/go-todocounter"
process "github.com/jbenet/goprocess"
Expand All @@ -18,6 +18,9 @@ import (
notif "github.com/libp2p/go-libp2p-routing/notifications"
)

// ErrNoPeersQueried is returned when we failed to connect to any peers.
var ErrNoPeersQueried = errors.New("failed to query any peers")

var maxQueryConcurrency = AlphaValue

type dhtQuery struct {
Expand Down Expand Up @@ -77,7 +80,6 @@ type dhtQueryRunner struct {
peersRemaining todoctr.Counter // peersToQuery + currently processing

result *dhtQueryResult // query result
errs u.MultiErr // result errors. maybe should be a map[peer.ID]error

rateLimit chan struct{} // processing semaphore
log logging.EventLogger
Expand Down Expand Up @@ -158,12 +160,10 @@ func (r *dhtQueryRunner) Run(ctx context.Context, peers []peer.ID) (*dhtQueryRes
r.RLock()
defer r.RUnlock()

err = routing.ErrNotFound

// if every query to every peer failed, something must be very wrong.
if len(r.errs) > 0 && len(r.errs) == r.peersSeen.Size() {
logger.Debugf("query errs: %s", r.errs)
err = r.errs[0]
if r.peersQueried.Size() == 0 {
err = ErrNoPeersQueried
} else {
err = routing.ErrNotFound
}

case <-r.proc.Closed():
Expand Down Expand Up @@ -257,10 +257,6 @@ func (r *dhtQueryRunner) dialPeer(ctx context.Context, p peer.ID) error {
ID: p,
})

r.Lock()
r.errs = append(r.errs, err)
r.Unlock()

// This peer is dropping out of the race.
r.peersRemaining.Decrement(1)
return err
Expand Down Expand Up @@ -289,10 +285,6 @@ func (r *dhtQueryRunner) queryPeer(proc process.Process, p peer.ID) {

if err != nil {
logger.Debugf("ERROR worker for: %v %v", p, err)
r.Lock()
r.errs = append(r.errs, err)
r.Unlock()

} else if res.success {
logger.Debugf("SUCCESS worker for: %v %s", p, res)
r.Lock()
Expand Down

0 comments on commit 57e1af7

Please sign in to comment.