Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Persisting/seeding a routing table #383

Closed
Closed
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 81 additions & 21 deletions dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,14 @@ import (
"sync"
"time"

"github.com/jbenet/goprocess/periodic"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/peerstore"
"github.com/libp2p/go-libp2p-core/protocol"
"github.com/libp2p/go-libp2p-core/routing"

"github.com/libp2p/go-libp2p-kad-dht/persist"
"go.opencensus.io/tag"
"golang.org/x/xerrors"

Expand All @@ -33,10 +34,15 @@ import (
record "github.com/libp2p/go-libp2p-record"
recpb "github.com/libp2p/go-libp2p-record/pb"
"github.com/multiformats/go-base32"
pkgerr "github.com/pkg/errors"
)

var logger = logging.Logger("dht")

// NumBootstrapQueries defines the number of random dht queries to do to
// collect members of the routing table.
const NumBootstrapQueries = 5

const BaseConnMgrScore = 5

// IpfsDHT is an implementation of Kademlia with S/Kademlia modifications.
Expand Down Expand Up @@ -72,8 +78,17 @@ type IpfsDHT struct {
autoRefresh bool
rtRefreshQueryTimeout time.Duration
rtRefreshPeriod time.Duration
bootstrapCfg opts.BootstrapConfig
triggerRtRefresh chan chan<- error

triggerBootstrap chan struct{}
aschmahmann marked this conversation as resolved.
Show resolved Hide resolved

seedsProposer persist.SeedsProposer
seederRTSizeTarget int
seederDialTimeout time.Duration
seederConcurrentDials int
totalSeederTimeout time.Duration

maxRecordAge time.Duration

// Allows disabling dht subsystems. These should _only_ be set on
Expand All @@ -94,31 +109,61 @@ var (

// New creates a new DHT with the specified host and options.
func New(ctx context.Context, h host.Host, options ...opts.Option) (*IpfsDHT, error) {
var cfg opts.Options
cfg := &opts.Options{}
cfg.BucketSize = KValue
if err := cfg.Apply(append([]opts.Option{opts.Defaults}, options...)...); err != nil {
return nil, err
}
dht := makeDHT(ctx, h, cfg.Datastore, cfg.Protocols, cfg.BucketSize)
dht.autoRefresh = cfg.RoutingTable.AutoRefresh
dht.rtRefreshPeriod = cfg.RoutingTable.RefreshPeriod
dht.rtRefreshQueryTimeout = cfg.RoutingTable.RefreshQueryTimeout

dht.maxRecordAge = cfg.MaxRecordAge
dht.enableProviders = cfg.EnableProviders
dht.enableValues = cfg.EnableValues
// set seedsProposer, snapshotter & fallback peers if not set
if cfg.Persistence.SeedsProposer == nil {
cfg.Persistence.SeedsProposer = persist.NewRandomSeedsProposer()
}
aarshkshah1992 marked this conversation as resolved.
Show resolved Hide resolved
snapshotter := cfg.Persistence.Snapshotter
if snapshotter == nil {
s, err := persist.NewDatastoreSnapshotter(cfg.Datastore, persist.DefaultSnapshotNS)
// should never happen
if err != nil {
logger.Errorf("failed to initialize the default datastore backed snapshotter, err: %s", err)
return nil, pkgerr.WithMessage(err, "failed to initialize the default datastore backed snapshotter")
}
snapshotter = s
}

// register for network notifs.
dht.host.Network().Notify((*netNotifiee)(dht))
if len(cfg.Persistence.FallbackPeers) == 0 {
cfg.Persistence.FallbackPeers = getDefaultBootstrapPeerIDs()
}
raulk marked this conversation as resolved.
Show resolved Hide resolved

dht := makeDHT(ctx, h, cfg)

dht.proc = goprocessctx.WithContextAndTeardown(ctx, func() error {
// remove ourselves from network notifs.
dht.host.Network().StopNotify((*netNotifiee)(dht))
return nil
})

// fetch the last snapshot & try to seed RT
candidates, err := snapshotter.Load()
if err != nil {
logger.Warningf("error while loading snapshot of DHT routing table: %s, cannot seed dht", err)
} else if err := dht.seedRoutingTable(candidates, cfg.Persistence.FallbackPeers); err != nil {
logger.Warningf("error while seeding candidates to the routing table: %s", err)
}

// schedule periodic snapshots
sproc := periodicproc.Tick(cfg.Persistence.SnapshotInterval, func(proc goprocess.Process) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd really love to store the routing table before we shut down. However, because we don't control the order in which libp2p components shut down, we might end up storing the routing table after other components that are also running their shutdown logic have disconnected peers. As a result, we'd end up storing a crippled routing table.

On the other hand, I guess a degree of randomness here is good. Otherwise, if an attacker found a way to both poison the table and force a shutdown, they could permanently bork the routing table if the peer saved the poisoned one every time.

Copy link
Contributor Author

@aarshkshah1992 aarshkshah1992 Oct 12, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see what you mean. So, in it's current form, this code could persist an empty RT if the snapshotting go-routine fires after all peers have been dropped from the RT, but Dht hasn't been closed yet.

However, this can also prove to be a blessing in disguise because storing an empty RT & then seeding with bootstrap peers after we restart could save us from ALWAYS storing a poisoned RT if an attacker messed up our RT and found a way to immediately shut us down.

So, let's see how the current implementation works in practise & fix it if required ?

logger.Debugf("storing snapshot of DHT routing table")
err := snapshotter.Store(dht.routingTable)
if err != nil {
logger.Warningf("error while storing snapshot of DHT routing table snapshot: %s", err)
}
})
dht.proc.AddChild(sproc)

// register for network notifs.
dht.host.Network().Notify((*netNotifiee)(dht))

dht.proc.AddChild(dht.providers.Process())
dht.Validator = cfg.Validator

if !cfg.Client {
for _, p := range cfg.Protocols {
Expand Down Expand Up @@ -152,9 +197,9 @@ func NewDHTClient(ctx context.Context, h host.Host, dstore ds.Batching) *IpfsDHT
return dht
}

func makeDHT(ctx context.Context, h host.Host, dstore ds.Batching, protocols []protocol.ID, bucketSize int) *IpfsDHT {
func makeDHT(ctx context.Context, h host.Host, cfg *opts.Options) *IpfsDHT {
self := kb.ConvertPeerID(h.ID())
rt := kb.NewRoutingTable(bucketSize, self, time.Minute, h.Peerstore())
rt := kb.NewRoutingTable(cfg.BucketSize, self, time.Minute, h.Peerstore())
cmgr := h.ConnManager()

rt.PeerAdded = func(p peer.ID) {
Expand All @@ -167,22 +212,38 @@ func makeDHT(ctx context.Context, h host.Host, dstore ds.Batching, protocols []p
}

dht := &IpfsDHT{
datastore: dstore,
datastore: cfg.Datastore,
self: h.ID(),
peerstore: h.Peerstore(),
host: h,
strmap: make(map[peer.ID]*messageSender),
ctx: ctx,
providers: providers.NewProviderManager(ctx, h.ID(), dstore),
providers: providers.NewProviderManager(ctx, h.ID(), cfg.Datastore),
birth: time.Now(),
routingTable: rt,
protocols: protocols,
bucketSize: bucketSize,
protocols: cfg.Protocols,
bucketSize: cfg.BucketSize,
triggerRtRefresh: make(chan chan<- error),
}

dht.ctx = dht.newContextWithLocalTags(ctx)

dht.Validator = cfg.Validator
dht.autoRefresh = cfg.RoutingTable.AutoRefresh
dht.rtRefreshPeriod = cfg.RoutingTable.RefreshPeriod
dht.rtRefreshQueryTimeout = cfg.RoutingTable.RefreshQueryTimeout

dht.maxRecordAge = cfg.MaxRecordAge
dht.enableProviders = cfg.EnableProviders
dht.enableValues = cfg.EnableValues
dht.bootstrapCfg = cfg.BootstrapConfig

dht.seedsProposer = cfg.Persistence.SeedsProposer
dht.seederDialTimeout = cfg.Persistence.SeederDialTimeout
dht.seederConcurrentDials = cfg.Persistence.SeederConcurrentDials
dht.seederRTSizeTarget = cfg.Persistence.SeederRTSizeTarget
dht.totalSeederTimeout = cfg.Persistence.TotalSeederTimeout

return dht
}

Expand All @@ -193,7 +254,7 @@ func makeDHT(ctx context.Context, h host.Host, dstore ds.Batching, protocols []p
writeResp := func(errorChan chan error, err error) {
select {
case <-proc.Closing():
case errorChan <- errChan:
case errorChan <- err:
}
close(errorChan)
}
Expand All @@ -203,7 +264,7 @@ func makeDHT(ctx context.Context, h host.Host, dstore ds.Batching, protocols []p
case req := <-dht.rtRecoveryChan:
if dht.routingTable.Size() == 0 {
logger.Infof("rt recovery proc: received request with reqID=%s, RT is empty. initiating recovery", req.id)
// TODO Call Seeder with default bootstrap peers here once #383 is merged
// TODO Call SeedsProposer with default bootstrap peers here once #383 is merged
if dht.routingTable.Size() > 0 {
logger.Infof("rt recovery proc: successfully recovered RT for reqID=%s, RT size is now %d", req.id, dht.routingTable.Size())
go writeResp(req.errorChan, nil)
Expand All @@ -223,7 +284,6 @@ func makeDHT(ctx context.Context, h host.Host, dstore ds.Batching, protocols []p

// putValueToPeer stores the given key/value pair at the peer 'p'
func (dht *IpfsDHT) putValueToPeer(ctx context.Context, p peer.ID, rec *recpb.Record) error {

pmes := pb.NewMessage(pb.Message_PUT_VALUE, rec.Key, 0)
pmes.Record = rec
rpmes, err := dht.sendRequest(ctx, p, pmes)
Expand Down
14 changes: 14 additions & 0 deletions dht_bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
multierror "github.com/hashicorp/go-multierror"
process "github.com/jbenet/goprocess"
processctx "github.com/jbenet/goprocess/context"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/routing"
"github.com/multiformats/go-multiaddr"
_ "github.com/multiformats/go-multiaddr-dns"
Expand Down Expand Up @@ -43,6 +44,19 @@ func init() {
}
}

func getDefaultBootstrapPeerIDs() []peer.ID {
var defaultBootstrapPeerIDs []peer.ID
for i := range DefaultBootstrapPeers {
info, err := peer.AddrInfoFromP2pAddr(DefaultBootstrapPeers[i])
if err != nil {
logger.Errorf("failed to get peerID for peer with multiaddress %s: error is %s", DefaultBootstrapPeers[i].String(), err)
continue
}
defaultBootstrapPeerIDs = append(defaultBootstrapPeerIDs, info.ID)
}
return defaultBootstrapPeerIDs
}

// Start the refresh worker.
func (dht *IpfsDHT) startRefreshing() error {
// scan the RT table periodically & do a random walk on k-buckets that haven't been queried since the given bucket period
Expand Down
110 changes: 110 additions & 0 deletions dht_rt_seeder.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
package dht

import (
"context"
"sync"

"github.com/libp2p/go-libp2p-core/peer"
)

func (dht *IpfsDHT) seedRoutingTable(candidates, fallbacks []peer.ID) error {
seederCtx, cancel := context.WithTimeout(dht.ctx, dht.totalSeederTimeout)
defer cancel()

// filter out peers that are either NOT in the peer store OR already in the RT
findEligible := func(peers []peer.ID) []peer.ID {
var eligiblePeers []peer.ID
for _, p := range peers {

if dht.routingTable.Find(p) == p {
logger.Info("discarding candidate as it is already in the RT: %s", p)
continue
}

if addrs := dht.host.Peerstore().Addrs(p); len(addrs) == 0 {
logger.Infof("discarding candidate as we no longer have addresses: %s", p)
continue
}

eligiblePeers = append(eligiblePeers, p)
}
return eligiblePeers
}

// result of a dial attempt
type result struct {
p peer.ID
err error
}

// rate-limit dials
semaphore := make(chan struct{}, dht.seederConcurrentDials)

// attempts to dial to a given peer to verify it's available
dialFn := func(ctx context.Context, p peer.ID, res chan<- result) {
childCtx, cancel := context.WithTimeout(ctx, dht.seederDialTimeout)
defer cancel()
_, err := dht.host.Network().DialPeer(childCtx, p)
select {
case <-ctx.Done(): // caller has already hung up & gone away
case res <- result{p, err}:
}
}

// ask the proposer to start proposing peers & write them on the peer channel
peersChan := dht.seedsProposer.Propose(seederCtx, dht.routingTable, findEligible(candidates), findEligible(fallbacks))

resCh := make(chan result) // dial results.

// start dialing to the peers received on the result channel
go func() {
defer close(resCh)

var wg sync.WaitGroup
for p := range peersChan {
select {
case <-seederCtx.Done():
return
default:
// start dialing
semaphore <- struct{}{}
wg.Add(1)
go func(p peer.ID, res chan<- result) {
dialFn(seederCtx, p, res)
<-semaphore
wg.Done()
}(p, resCh)
}
}
wg.Wait()

}()

LOOP:
for {
select {
case res, hasMore := <-resCh:
if !hasMore {
logger.Infof("dht rt seeder: finished seeding RT with proposed peer set; RT size is now %d ",
dht.routingTable.Size())
break LOOP
}
if res.err != nil {
logger.Infof("dht rt seeder: discarded proposed peer due to dial error; peer ID: %s, err: %s", res.p, res.err)
} else {
if _, err := dht.routingTable.Update(res.p); err != nil {
logger.Warningf("dht rt seeder: failed to add proposed peer to routing table; peer ID: %s, err: %s", res.p, err)
}
if dht.routingTable.Size() >= dht.seederRTSizeTarget {
break LOOP
}
}

case <-seederCtx.Done():
logger.Info("dht rt seeder: finishing as we have exceeded the seeder timeout")
break LOOP
}
}

return nil
}
Loading