Skip to content

Commit

Permalink
Dialer for dht
Browse files Browse the repository at this point in the history
dht doesn't need the whole network interface, only needs a Dialer.
(much reduced surface of possible errors)
  • Loading branch information
jbenet committed Oct 21, 2014
1 parent 93942d8 commit 7f22750
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 17 deletions.
9 changes: 9 additions & 0 deletions net/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,3 +48,12 @@ type Handler srv.Handler

// Service interface for network resources.
type Service srv.Service

// Dialer service that can dial to peers
// (this is usually just a Network, but other services may not need the whole
// thing, and thus it becomes easier to mock)
type Dialer interface {

// DialPeer attempts to establish a connection to a given peer
DialPeer(peer.Peer) error
}
14 changes: 7 additions & 7 deletions routing/dht/dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,9 @@ type IpfsDHT struct {
// NOTE: (currently, only a single table is used)
routingTables []*kb.RoutingTable

// the network interface. service
network inet.Network
sender inet.Sender
// the network services we need
dialer inet.Dialer
sender inet.Sender

// Local peer (yourself)
self peer.Peer
Expand All @@ -59,9 +59,9 @@ type IpfsDHT struct {
}

// NewDHT creates a new DHT object with the given peer as the 'local' host
func NewDHT(ctx context.Context, p peer.Peer, ps peer.Peerstore, net inet.Network, sender inet.Sender, dstore ds.Datastore) *IpfsDHT {
func NewDHT(ctx context.Context, p peer.Peer, ps peer.Peerstore, dialer inet.Dialer, sender inet.Sender, dstore ds.Datastore) *IpfsDHT {
dht := new(IpfsDHT)
dht.network = net
dht.dialer = dialer
dht.sender = sender
dht.datastore = dstore
dht.self = p
Expand Down Expand Up @@ -95,7 +95,7 @@ func (dht *IpfsDHT) Connect(ctx context.Context, npeer peer.Peer) (peer.Peer, er
//
// /ip4/10.20.30.40/tcp/1234/ipfs/Qxhxxchxzcncxnzcnxzcxzm
//
err := dht.network.DialPeer(npeer)
err := dht.dialer.DialPeer(npeer)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -499,7 +499,7 @@ func (dht *IpfsDHT) ensureConnectedToPeer(pbp *Message_Peer) (peer.Peer, error)
}

// dial connection
err = dht.network.DialPeer(p)
err = dht.dialer.DialPeer(p)
return p, err
}

Expand Down
10 changes: 3 additions & 7 deletions routing/dht/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package dht
import (
"sync"

inet "github.com/jbenet/go-ipfs/net"
peer "github.com/jbenet/go-ipfs/peer"
queue "github.com/jbenet/go-ipfs/peer/queue"
kb "github.com/jbenet/go-ipfs/routing/kbucket"
Expand All @@ -14,17 +15,12 @@ import (

var maxQueryConcurrency = AlphaValue

type dhtDialer interface {
// DialPeer attempts to establish a connection to a given peer
DialPeer(peer.Peer) error
}

type dhtQuery struct {
// the key we're querying for
key u.Key

// dialer used to ensure we're connected to peers
dialer dhtDialer
dialer inet.Dialer

// the function to execute per peer
qfunc queryFunc
Expand All @@ -42,7 +38,7 @@ type dhtQueryResult struct {
}

// constructs query
func newQuery(k u.Key, d dhtDialer, f queryFunc) *dhtQuery {
func newQuery(k u.Key, d inet.Dialer, f queryFunc) *dhtQuery {
return &dhtQuery{
key: k,
dialer: d,
Expand Down
6 changes: 3 additions & 3 deletions routing/dht/routing.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func (dht *IpfsDHT) PutValue(ctx context.Context, key u.Key, value []byte) error
peers = append(peers, npeers...)
}

query := newQuery(key, dht.network, func(ctx context.Context, p peer.Peer) (*dhtQueryResult, error) {
query := newQuery(key, dht.dialer, func(ctx context.Context, p peer.Peer) (*dhtQueryResult, error) {
log.Debug("%s PutValue qry part %v", dht.self, p)
err := dht.putValueToNetwork(ctx, p, string(key), value)
if err != nil {
Expand Down Expand Up @@ -65,7 +65,7 @@ func (dht *IpfsDHT) GetValue(ctx context.Context, key u.Key) ([]byte, error) {
}

// setup the Query
query := newQuery(key, dht.network, func(ctx context.Context, p peer.Peer) (*dhtQueryResult, error) {
query := newQuery(key, dht.dialer, func(ctx context.Context, p peer.Peer) (*dhtQueryResult, error) {

val, peers, err := dht.getValueOrPeers(ctx, p, key, routeLevel)
if err != nil {
Expand Down Expand Up @@ -230,7 +230,7 @@ func (dht *IpfsDHT) findPeerMultiple(ctx context.Context, id peer.ID) (peer.Peer
}

// setup query function
query := newQuery(u.Key(id), dht.network, func(ctx context.Context, p peer.Peer) (*dhtQueryResult, error) {
query := newQuery(u.Key(id), dht.dialer, func(ctx context.Context, p peer.Peer) (*dhtQueryResult, error) {
pmes, err := dht.findPeerSingle(ctx, p, id, routeLevel)
if err != nil {
log.Error("%s getPeer error: %v", dht.self, err)
Expand Down

0 comments on commit 7f22750

Please sign in to comment.