Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/correct bootstrapping #384

Merged
merged 9 commits into from
Oct 11, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 61 additions & 23 deletions dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,26 +21,22 @@ import (
"github.com/libp2p/go-libp2p-kad-dht/metrics"
opts "github.com/libp2p/go-libp2p-kad-dht/opts"
pb "github.com/libp2p/go-libp2p-kad-dht/pb"
providers "github.com/libp2p/go-libp2p-kad-dht/providers"
"github.com/libp2p/go-libp2p-kad-dht/providers"

proto "github.com/gogo/protobuf/proto"
cid "github.com/ipfs/go-cid"
"github.com/gogo/protobuf/proto"
"github.com/ipfs/go-cid"
ds "github.com/ipfs/go-datastore"
logging "github.com/ipfs/go-log"
goprocess "github.com/jbenet/goprocess"
goprocessctx "github.com/jbenet/goprocess/context"
"github.com/jbenet/goprocess"
"github.com/jbenet/goprocess/context"
kb "github.com/libp2p/go-libp2p-kbucket"
record "github.com/libp2p/go-libp2p-record"
"github.com/libp2p/go-libp2p-record"
recpb "github.com/libp2p/go-libp2p-record/pb"
base32 "github.com/whyrusleeping/base32"
"github.com/whyrusleeping/base32"
)

var logger = logging.Logger("dht")

// NumBootstrapQueries defines the number of random dht queries to do to
// collect members of the routing table.
const NumBootstrapQueries = 5

// IpfsDHT is an implementation of Kademlia with S/Kademlia modifications.
// It is used to implement the base Routing module.
type IpfsDHT struct {
Expand Down Expand Up @@ -70,6 +66,10 @@ type IpfsDHT struct {
protocols []protocol.ID // DHT protocols

bucketSize int

bootstrapCfg opts.BootstrapConfig

triggerBootstrap chan struct{}
}

// Assert that IPFS assumptions about interfaces aren't broken. These aren't a
Expand All @@ -90,6 +90,7 @@ func New(ctx context.Context, h host.Host, options ...opts.Option) (*IpfsDHT, er
return nil, err
}
dht := makeDHT(ctx, h, cfg.Datastore, cfg.Protocols, cfg.BucketSize)
dht.bootstrapCfg = cfg.BootstrapConfig

// register for network notifs.
dht.host.Network().Notify((*netNotifiee)(dht))
Expand Down Expand Up @@ -136,34 +137,71 @@ func NewDHTClient(ctx context.Context, h host.Host, dstore ds.Batching) *IpfsDHT

func makeDHT(ctx context.Context, h host.Host, dstore ds.Batching, protocols []protocol.ID, bucketSize int) *IpfsDHT {
rt := kb.NewRoutingTable(bucketSize, kb.ConvertPeerID(h.ID()), time.Minute, h.Peerstore())

cmgr := h.ConnManager()

rt.PeerAdded = func(p peer.ID) {
cmgr.TagPeer(p, "kbucket", 5)
}

rt.PeerRemoved = func(p peer.ID) {
cmgr.UntagPeer(p, "kbucket")
}

dht := &IpfsDHT{
datastore: dstore,
self: h.ID(),
peerstore: h.Peerstore(),
host: h,
strmap: make(map[peer.ID]*messageSender),
ctx: ctx,
providers: providers.NewProviderManager(ctx, h.ID(), dstore),
birth: time.Now(),
routingTable: rt,
protocols: protocols,
bucketSize: bucketSize,
datastore: dstore,
self: h.ID(),
peerstore: h.Peerstore(),
host: h,
strmap: make(map[peer.ID]*messageSender),
ctx: ctx,
providers: providers.NewProviderManager(ctx, h.ID(), dstore),
birth: time.Now(),
routingTable: rt,
protocols: protocols,
bucketSize: bucketSize,
triggerBootstrap: make(chan struct{}),
}

dht.ctx = dht.newContextWithLocalTags(ctx)

return dht
}

// TODO Implement RT seeding as described in https://github.com/libp2p/go-libp2p-kad-dht/pull/384#discussion_r320994340 OR
// come up with an alternative solution.
// issue is being tracked at https://github.com/libp2p/go-libp2p-kad-dht/issues/387
/*func (dht *IpfsDHT) rtRecovery(proc goprocess.Process) {
writeResp := func(errorChan chan error, err error) {
select {
case <-proc.Closing():
case errorChan <- err:
}
close(errorChan)
}

for {
select {
case req := <-dht.rtRecoveryChan:
if dht.routingTable.Size() == 0 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this check seems redundant, as both processes that send recovery requests down the channel check before sending

edit: i'm actually a bit confused by this code generally. i get the idea of serializing recovery requests, but this will work its way through them in a pretty tight loop, so i could see multiple recovery request being dispatched without error in a row.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in fact, it seems like all this process does is serialize checks of the routing table, but does nothing to actually bootstrap it. it's up to the func that actually creates a rtRecoveryReq to do any work, and i see a few instances where this is essentially just an info log.

Copy link
Contributor Author

@aarshkshah1992 aarshkshah1992 Sep 4, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this check seems redundant, as both processes that send recovery requests down the channel check before sending

edit: i'm actually a bit confused by this code generally. i get the idea of serializing recovery requests, but this will work its way through them in a pretty tight loop, so i could see multiple recovery request being dispatched without error in a row.

The check is to ensure that if multiple callers 'simultaneously' observe that the RT has become empty & send a request to the channel, only one request results in the RT being seeded & the remaining become no-op.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in fact, it seems like all this process does is serialize checks of the routing table, but does nothing to actually bootstrap it. it's up to the func that actually creates a rtRecoveryReq to do any work, and i see a few instances where this is essentially just an info log.

Apologies if this wasn't clear. Yes, we need to add a call to the default seeder implementation in #383 to seed the RT with the 'default bootstrap peers'/'known peers'. There is a TODO for this in the code(line 214) & in the TODO list on this PR.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@aarshkshah1992 totally! it really felt like something was missing! definitely missed the comment. with that in mind, the rest are just small comments/docs improvements. generally looking good :)

logger.Infof("rt recovery proc: received request with reqID=%s, RT is empty. initiating recovery", req.id)
// TODO Call Seeder with default bootstrap peers here once #383 is merged
if dht.routingTable.Size() > 0 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unreachable?

Copy link
Contributor Author

@aarshkshah1992 aarshkshah1992 Sep 4, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Answered above

logger.Infof("rt recovery proc: successfully recovered RT for reqID=%s, RT size is now %d", req.id, dht.routingTable.Size())
go writeResp(req.errorChan, nil)
} else {
logger.Errorf("rt recovery proc: failed to recover RT for reqID=%s, RT is still empty", req.id)
go writeResp(req.errorChan, errors.New("RT empty after seed attempt"))
}
} else {
logger.Infof("rt recovery proc: RT is not empty, no need to act on request with reqID=%s", req.id)
go writeResp(req.errorChan, nil)
}
case <-proc.Closing():
return
}
}
}*/

// putValueToPeer stores the given key/value pair at the peer 'p'
func (dht *IpfsDHT) putValueToPeer(ctx context.Context, p peer.ID, rec *recpb.Record) error {

Expand Down
Loading