-
Notifications
You must be signed in to change notification settings - Fork 225
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Better RT refresh & mantainence #465
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -5,10 +5,12 @@ import ( | |
"fmt" | ||
"time" | ||
|
||
"github.com/libp2p/go-libp2p-core/event" | ||
"github.com/libp2p/go-libp2p-core/routing" | ||
|
||
multierror "github.com/hashicorp/go-multierror" | ||
process "github.com/jbenet/goprocess" | ||
processctx "github.com/jbenet/goprocess/context" | ||
"github.com/libp2p/go-libp2p-core/routing" | ||
"github.com/multiformats/go-multiaddr" | ||
_ "github.com/multiformats/go-multiaddr-dns" | ||
) | ||
|
@@ -17,7 +19,7 @@ var DefaultBootstrapPeers []multiaddr.Multiaddr | |
|
||
// Minimum number of peers in the routing table. If we drop below this and we | ||
// see a new peer, we trigger a bootstrap round. | ||
var minRTRefreshThreshold = 4 | ||
var minRTRefreshThreshold = 20 | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this seems like a high minimum for triggering new bootstraps. was this for testing, or intentional? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is intentional. The current interval for a refresh is too much (1 hour) & we want to be more proactive in doing refreshes when needed rather than doing them to often without needing to do them. But really, we can always tune these params if we run into problems later. |
||
|
||
func init() { | ||
for _, s := range []string{ | ||
|
@@ -35,6 +37,50 @@ func init() { | |
} | ||
} | ||
|
||
// startSelfLookup starts a go-routine that listens for requests to trigger a self walk on a dedicated channel | ||
// and then sends the error status back on the error channel sent along with the request. | ||
// if multiple callers "simultaneously" ask for a self walk, it performs ONLY one self walk and sends the same error status to all of them. | ||
func (dht *IpfsDHT) startSelfLookup() error { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Revisiting this, I'm not sure if we need something quite this complicated. We can probably get away with just calling (although we'll need to do it asynchronously inside the "address changed" loop) Note: this code looks fine. I'm just not sure if the complexity is warranted. |
||
dht.proc.Go(func(proc process.Process) { | ||
ctx := processctx.OnClosingContext(proc) | ||
for { | ||
var waiting []chan<- error | ||
select { | ||
case res := <-dht.triggerSelfLookup: | ||
if res != nil { | ||
waiting = append(waiting, res) | ||
} | ||
case <-ctx.Done(): | ||
return | ||
} | ||
|
||
// batch multiple refresh requests if they're all waiting at the same time. | ||
waiting = append(waiting, collectWaitingChannels(dht.triggerSelfLookup)...) | ||
|
||
// Do a self walk | ||
queryCtx, cancel := context.WithTimeout(ctx, dht.rtRefreshQueryTimeout) | ||
defer cancel() | ||
_, err := dht.FindPeer(queryCtx, dht.self) | ||
if err == routing.ErrNotFound { | ||
err = nil | ||
} else if err != nil { | ||
err = fmt.Errorf("failed to query self during routing table refresh: %s", err) | ||
} | ||
|
||
// send back the error status | ||
for _, w := range waiting { | ||
w <- err | ||
close(w) | ||
} | ||
if err != nil { | ||
logger.Warning(err) | ||
} | ||
} | ||
}) | ||
|
||
return nil | ||
} | ||
|
||
// Start the refresh worker. | ||
func (dht *IpfsDHT) startRefreshing() error { | ||
// scan the RT table periodically & do a random walk for cpl's that haven't been queried since the given period | ||
|
@@ -65,17 +111,7 @@ func (dht *IpfsDHT) startRefreshing() error { | |
} | ||
|
||
// Batch multiple refresh requests if they're all waiting at the same time. | ||
collectWaiting: | ||
for { | ||
select { | ||
case res := <-dht.triggerRtRefresh: | ||
if res != nil { | ||
waiting = append(waiting, res) | ||
} | ||
default: | ||
break collectWaiting | ||
} | ||
} | ||
waiting = append(waiting, collectWaitingChannels(dht.triggerSelfLookup)...) | ||
|
||
err := dht.doRefresh(ctx) | ||
for _, w := range waiting { | ||
|
@@ -88,14 +124,66 @@ func (dht *IpfsDHT) startRefreshing() error { | |
} | ||
}) | ||
|
||
// when our address changes, we should proactively tell our closest peers about it so | ||
// we become discoverable quickly. The Identify protocol will push a signed peer record | ||
// with our new address to all peers we are connected to. However, we might not necessarily be connected | ||
// to our closet peers & so in the true spirit of Zen, searching for ourself in the network really is the best way | ||
// to to forge connections with those matter. | ||
dht.proc.Go(func(proc process.Process) { | ||
for { | ||
select { | ||
case evt, more := <-dht.subscriptions.everPeerAddressChanged.Out(): | ||
if !more { | ||
return | ||
} | ||
if _, ok := evt.(event.EvtLocalAddressesUpdated); ok { | ||
// our address has changed, trigger a self walk so our closet peers know about it | ||
select { | ||
case dht.triggerSelfLookup <- nil: | ||
default: | ||
|
||
} | ||
} else { | ||
logger.Error("should not get an event other than EvtLocalAddressesUpdated on that subscription") | ||
} | ||
case <-proc.Closing(): | ||
return | ||
} | ||
} | ||
}) | ||
|
||
return nil | ||
} | ||
|
||
func collectWaitingChannels(source chan chan<- error) []chan<- error { | ||
var waiting []chan<- error | ||
for { | ||
select { | ||
case res := <-source: | ||
if res != nil { | ||
waiting = append(waiting, res) | ||
} | ||
default: | ||
return waiting | ||
} | ||
} | ||
} | ||
|
||
func (dht *IpfsDHT) doRefresh(ctx context.Context) error { | ||
var merr error | ||
if err := dht.selfWalk(ctx); err != nil { | ||
merr = multierror.Append(merr, err) | ||
|
||
// wait for self walk result | ||
selfWalkres := make(chan error, 1) | ||
dht.triggerSelfLookup <- selfWalkres | ||
select { | ||
case err := <-selfWalkres: | ||
if err != nil { | ||
merr = multierror.Append(merr, err) | ||
} | ||
case <-ctx.Done(): | ||
return ctx.Err() | ||
} | ||
|
||
if err := dht.refreshCpls(ctx); err != nil { | ||
merr = multierror.Append(merr, err) | ||
} | ||
|
@@ -127,6 +215,12 @@ func (dht *IpfsDHT) refreshCpls(ctx context.Context) error { | |
if time.Since(tcpl.LastRefreshAt) <= dht.rtRefreshPeriod { | ||
continue | ||
} | ||
|
||
// do not refresh if bucket is full | ||
if dht.routingTable.IsBucketFull(tcpl.Cpl) { | ||
continue | ||
} | ||
|
||
// gen rand peer with the cpl | ||
randPeer, err := dht.routingTable.GenRandPeerID(tcpl.Cpl) | ||
if err != nil { | ||
|
@@ -153,17 +247,6 @@ func (dht *IpfsDHT) refreshCpls(ctx context.Context) error { | |
return merr | ||
} | ||
|
||
// Traverse the DHT toward the self ID | ||
func (dht *IpfsDHT) selfWalk(ctx context.Context) error { | ||
queryCtx, cancel := context.WithTimeout(ctx, dht.rtRefreshQueryTimeout) | ||
defer cancel() | ||
_, err := dht.FindPeer(queryCtx, dht.self) | ||
if err == routing.ErrNotFound { | ||
return nil | ||
} | ||
return fmt.Errorf("failed to query self during routing table refresh: %s", err) | ||
} | ||
|
||
// Bootstrap tells the DHT to get into a bootstrapped state satisfying the | ||
// IpfsRouter interface. | ||
// | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should this also trigger an update?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@willscott Great spot ! It will after this code in Adin's PR goes in.