Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Better RT refresh & mantainence #465

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 64 additions & 24 deletions dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
)

var logger = logging.Logger("dht")
var rtPvLogger = logging.Logger("dht/rt/peer-validation")

type DHTMode int

Expand Down Expand Up @@ -85,13 +86,15 @@ type IpfsDHT struct {
bucketSize int

subscriptions struct {
evtPeerIdentification event.Subscription
evtPeerIdentification event.Subscription
everPeerAddressChanged event.Subscription
}

autoRefresh bool
rtRefreshQueryTimeout time.Duration
rtRefreshPeriod time.Duration
triggerRtRefresh chan chan<- error
triggerSelfLookup chan chan<- error

maxRecordAge time.Duration

Expand All @@ -118,7 +121,10 @@ func New(ctx context.Context, h host.Host, options ...opts.Option) (*IpfsDHT, er
if err := cfg.Apply(append([]opts.Option{opts.Defaults}, options...)...); err != nil {
return nil, err
}
dht := makeDHT(ctx, h, cfg.Datastore, cfg.Protocols, cfg.BucketSize)
dht, err := makeDHT(ctx, h, &cfg)
if err != nil {
return nil, err
}
dht.autoRefresh = cfg.RoutingTable.AutoRefresh
dht.rtRefreshPeriod = cfg.RoutingTable.RefreshPeriod
dht.rtRefreshQueryTimeout = cfg.RoutingTable.RefreshQueryTimeout
Expand All @@ -140,6 +146,10 @@ func New(ctx context.Context, h host.Host, options ...opts.Option) (*IpfsDHT, er
if dht.subscriptions.evtPeerIdentification != nil {
_ = dht.subscriptions.evtPeerIdentification.Close()
}

if dht.subscriptions.everPeerAddressChanged != nil {
_ = dht.subscriptions.everPeerAddressChanged.Close()
}
return nil
})

Expand All @@ -152,6 +162,7 @@ func New(ctx context.Context, h host.Host, options ...opts.Option) (*IpfsDHT, er
// handle providers
dht.proc.AddChild(dht.providers.Process())

dht.startSelfLookup()
dht.startRefreshing()
return dht, nil
}
Expand Down Expand Up @@ -179,9 +190,24 @@ func NewDHTClient(ctx context.Context, h host.Host, dstore ds.Batching) *IpfsDHT
return dht
}

func makeDHT(ctx context.Context, h host.Host, dstore ds.Batching, protocols []protocol.ID, bucketSize int) *IpfsDHT {
func makeRoutingTable(h host.Host, cfg *opts.Options) (*kb.RoutingTable, error) {
self := kb.ConvertPeerID(h.ID())
rt := kb.NewRoutingTable(bucketSize, self, time.Minute, h.Peerstore())
// construct the routing table with a peer validation function
pvF := func(c context.Context, p peer.ID) bool {
if err := h.Connect(c, peer.AddrInfo{ID: p}); err != nil {
rtPvLogger.Errorf("failed to connect to peer %s for validation, err=%s", p, err)
return false
}
return true
}

rtOpts := []kb.Option{kb.PeerValidationFnc(pvF)}
if !(cfg.RoutingTableCleanup.Interval == 0) {
rtOpts = append(rtOpts, kb.TableCleanupInterval(cfg.RoutingTableCleanup.Interval))
}

rt, err := kb.NewRoutingTable(cfg.BucketSize, self, time.Minute, h.Peerstore(),
rtOpts...)
cmgr := h.ConnManager()

rt.PeerAdded = func(p peer.ID) {
Expand All @@ -193,31 +219,45 @@ func makeDHT(ctx context.Context, h host.Host, dstore ds.Batching, protocols []p
cmgr.UntagPeer(p, "kbucket")
}

return rt, err
}

func makeDHT(ctx context.Context, h host.Host, cfg *opts.Options) (*IpfsDHT, error) {
rt, err := makeRoutingTable(h, cfg)
if err != nil {
return nil, fmt.Errorf("failed to construct routing table,err=%s", err)
}

dht := &IpfsDHT{
datastore: dstore,
self: h.ID(),
peerstore: h.Peerstore(),
host: h,
strmap: make(map[peer.ID]*messageSender),
ctx: ctx,
providers: providers.NewProviderManager(ctx, h.ID(), dstore),
birth: time.Now(),
routingTable: rt,
protocols: protocols,
bucketSize: bucketSize,
triggerRtRefresh: make(chan chan<- error),
}

var err error
datastore: cfg.Datastore,
self: h.ID(),
peerstore: h.Peerstore(),
host: h,
strmap: make(map[peer.ID]*messageSender),
ctx: ctx,
providers: providers.NewProviderManager(ctx, h.ID(), cfg.Datastore),
birth: time.Now(),
routingTable: rt,
protocols: cfg.Protocols,
bucketSize: cfg.BucketSize,
triggerRtRefresh: make(chan chan<- error),
triggerSelfLookup: make(chan chan<- error),
}

evts := []interface{}{&event.EvtPeerIdentificationCompleted{}, &event.EvtPeerIdentificationFailed{}}
dht.subscriptions.evtPeerIdentification, err = h.EventBus().Subscribe(evts, eventbus.BufSize(256))
if err != nil {
logger.Errorf("dht not subscribed to peer identification events; things will fail; err: %s", err)
}

dht.subscriptions.everPeerAddressChanged, err = h.EventBus().Subscribe(&event.EvtLocalAddressesUpdated{}, eventbus.BufSize(256))
if err != nil {
logger.Errorf("dht not subscribed to peer address changed event; err=%s ", err)
}

dht.ctx = dht.newContextWithLocalTags(ctx)

return dht
return dht, nil
}

// TODO Implement RT seeding as described in https://github.com/libp2p/go-libp2p-kad-dht/pull/384#discussion_r320994340 OR
Expand Down Expand Up @@ -372,7 +412,7 @@ func (dht *IpfsDHT) Update(ctx context.Context, p peer.ID) {
logger.Event(ctx, "updatePeer", p)
for _, c := range dht.host.Network().ConnsToPeer(p) {
if dht.shouldAddPeerToRoutingTable(c) {
dht.routingTable.Update(c.RemotePeer())
dht.routingTable.HandlePeerAlive(c.RemotePeer())
return
}
}
Expand All @@ -383,7 +423,7 @@ func (dht *IpfsDHT) UpdateConn(ctx context.Context, c network.Conn) {
return
}
logger.Event(ctx, "updatePeer", c.RemotePeer())
dht.routingTable.Update(c.RemotePeer())
dht.routingTable.HandlePeerAlive(c.RemotePeer())
}

func (dht *IpfsDHT) shouldAddPeerToRoutingTable(c network.Conn) bool {
Expand Down Expand Up @@ -710,9 +750,9 @@ func (dht *IpfsDHT) handleProtocolChanges(proc goprocess.Process) {
// TODO: discuss how to handle this case
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this also trigger an update?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@willscott Great spot ! It will after this code in Adin's PR goes in.

logger.Warning("peer adding and dropping dht protocols? odd")
} else if add {
dht.RoutingTable().Update(e.Peer)
dht.RoutingTable().HandlePeerAlive(e.Peer)
} else if drop {
dht.RoutingTable().Remove(e.Peer)
dht.RoutingTable().HandlePeerDead(e.Peer)
}
case <-proc.Closing():
return
Expand Down
135 changes: 109 additions & 26 deletions dht_bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@ import (
"fmt"
"time"

"github.com/libp2p/go-libp2p-core/event"
"github.com/libp2p/go-libp2p-core/routing"

multierror "github.com/hashicorp/go-multierror"
process "github.com/jbenet/goprocess"
processctx "github.com/jbenet/goprocess/context"
"github.com/libp2p/go-libp2p-core/routing"
"github.com/multiformats/go-multiaddr"
_ "github.com/multiformats/go-multiaddr-dns"
)
Expand All @@ -17,7 +19,7 @@ var DefaultBootstrapPeers []multiaddr.Multiaddr

// Minimum number of peers in the routing table. If we drop below this and we
// see a new peer, we trigger a bootstrap round.
var minRTRefreshThreshold = 4
var minRTRefreshThreshold = 20
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this seems like a high minimum for triggering new bootstraps. was this for testing, or intentional?

Copy link
Contributor Author

@aarshkshah1992 aarshkshah1992 Mar 5, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is intentional. The current interval for a refresh is too much (1 hour) & we want to be more proactive in doing refreshes when needed rather than doing them to often without needing to do them. But really, we can always tune these params if we run into problems later.


func init() {
for _, s := range []string{
Expand All @@ -35,6 +37,50 @@ func init() {
}
}

// startSelfLookup starts a go-routine that listens for requests to trigger a self walk on a dedicated channel
// and then sends the error status back on the error channel sent along with the request.
// if multiple callers "simultaneously" ask for a self walk, it performs ONLY one self walk and sends the same error status to all of them.
func (dht *IpfsDHT) startSelfLookup() error {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Revisiting this, I'm not sure if we need something quite this complicated. We can probably get away with just calling selfWalk from within both the refresh function and the "address changed" loop.

(although we'll need to do it asynchronously inside the "address changed" loop)

Note: this code looks fine. I'm just not sure if the complexity is warranted.

dht.proc.Go(func(proc process.Process) {
ctx := processctx.OnClosingContext(proc)
for {
var waiting []chan<- error
select {
case res := <-dht.triggerSelfLookup:
if res != nil {
waiting = append(waiting, res)
}
case <-ctx.Done():
return
}

// batch multiple refresh requests if they're all waiting at the same time.
waiting = append(waiting, collectWaitingChannels(dht.triggerSelfLookup)...)

// Do a self walk
queryCtx, cancel := context.WithTimeout(ctx, dht.rtRefreshQueryTimeout)
defer cancel()
_, err := dht.FindPeer(queryCtx, dht.self)
if err == routing.ErrNotFound {
err = nil
} else if err != nil {
err = fmt.Errorf("failed to query self during routing table refresh: %s", err)
}

// send back the error status
for _, w := range waiting {
w <- err
close(w)
}
if err != nil {
logger.Warning(err)
}
}
})

return nil
}

// Start the refresh worker.
func (dht *IpfsDHT) startRefreshing() error {
// scan the RT table periodically & do a random walk for cpl's that haven't been queried since the given period
Expand Down Expand Up @@ -65,17 +111,7 @@ func (dht *IpfsDHT) startRefreshing() error {
}

// Batch multiple refresh requests if they're all waiting at the same time.
collectWaiting:
for {
select {
case res := <-dht.triggerRtRefresh:
if res != nil {
waiting = append(waiting, res)
}
default:
break collectWaiting
}
}
waiting = append(waiting, collectWaitingChannels(dht.triggerSelfLookup)...)

err := dht.doRefresh(ctx)
for _, w := range waiting {
Expand All @@ -88,14 +124,66 @@ func (dht *IpfsDHT) startRefreshing() error {
}
})

// when our address changes, we should proactively tell our closest peers about it so
// we become discoverable quickly. The Identify protocol will push a signed peer record
// with our new address to all peers we are connected to. However, we might not necessarily be connected
// to our closet peers & so in the true spirit of Zen, searching for ourself in the network really is the best way
// to to forge connections with those matter.
dht.proc.Go(func(proc process.Process) {
for {
select {
case evt, more := <-dht.subscriptions.everPeerAddressChanged.Out():
if !more {
return
}
if _, ok := evt.(event.EvtLocalAddressesUpdated); ok {
// our address has changed, trigger a self walk so our closet peers know about it
select {
case dht.triggerSelfLookup <- nil:
default:

}
} else {
logger.Error("should not get an event other than EvtLocalAddressesUpdated on that subscription")
}
case <-proc.Closing():
return
}
}
})

return nil
}

func collectWaitingChannels(source chan chan<- error) []chan<- error {
var waiting []chan<- error
for {
select {
case res := <-source:
if res != nil {
waiting = append(waiting, res)
}
default:
return waiting
}
}
}

func (dht *IpfsDHT) doRefresh(ctx context.Context) error {
var merr error
if err := dht.selfWalk(ctx); err != nil {
merr = multierror.Append(merr, err)

// wait for self walk result
selfWalkres := make(chan error, 1)
dht.triggerSelfLookup <- selfWalkres
select {
case err := <-selfWalkres:
if err != nil {
merr = multierror.Append(merr, err)
}
case <-ctx.Done():
return ctx.Err()
}

if err := dht.refreshCpls(ctx); err != nil {
merr = multierror.Append(merr, err)
}
Expand Down Expand Up @@ -127,6 +215,12 @@ func (dht *IpfsDHT) refreshCpls(ctx context.Context) error {
if time.Since(tcpl.LastRefreshAt) <= dht.rtRefreshPeriod {
continue
}

// do not refresh if bucket is full
if dht.routingTable.IsBucketFull(tcpl.Cpl) {
continue
}

// gen rand peer with the cpl
randPeer, err := dht.routingTable.GenRandPeerID(tcpl.Cpl)
if err != nil {
Expand All @@ -153,17 +247,6 @@ func (dht *IpfsDHT) refreshCpls(ctx context.Context) error {
return merr
}

// Traverse the DHT toward the self ID
func (dht *IpfsDHT) selfWalk(ctx context.Context) error {
queryCtx, cancel := context.WithTimeout(ctx, dht.rtRefreshQueryTimeout)
defer cancel()
_, err := dht.FindPeer(queryCtx, dht.self)
if err == routing.ErrNotFound {
return nil
}
return fmt.Errorf("failed to query self during routing table refresh: %s", err)
}

// Bootstrap tells the DHT to get into a bootstrapped state satisfying the
// IpfsRouter interface.
//
Expand Down
Loading