Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bitswap #32

Merged
merged 7 commits into from
Aug 30, 2014
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
bitswap first working commit!
  • Loading branch information
whyrusleeping committed Aug 26, 2014
commit cfdf01d58a5e7b6de6e5b71a92f896083aeef74f
18 changes: 16 additions & 2 deletions bitswap/bitswap.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ func NewBitSwap(p *peer.Peer, net swarm.Network, d ds.Datastore, r routing.IpfsR
routing: r.(*dht.IpfsDHT),
meschan: net.GetChannel(swarm.PBWrapper_BITSWAP),
haltChan: make(chan struct{}),
listener: swarm.NewMesListener(),
}

go bs.handleMessages()
Expand All @@ -90,7 +91,7 @@ func (bs *BitSwap) GetBlock(k u.Key, timeout time.Duration) (
ledger := bs.GetLedger(pr.Key())
blk, err := bs.getBlock(k, pr, tleft)
if err != nil {
u.PErr("%v\n", err)
u.PErr("getBlock returned: %v\n", err)
return
}
// NOTE: this credits everyone who sends us a block,
Expand All @@ -106,13 +107,15 @@ func (bs *BitSwap) GetBlock(k u.Key, timeout time.Duration) (

select {
case blkdata := <-valchan:
close(valchan)
return blocks.NewBlock(blkdata)
case <-after:
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Go is so cool.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It satisfies me so much to be able to write code in patterns like this.

return nil, u.ErrTimeout
}
}

func (bs *BitSwap) getBlock(k u.Key, p *peer.Peer, timeout time.Duration) ([]byte, error) {
u.DOut("[%s] getBlock '%s' from [%s]\n", bs.peer.ID.Pretty(), k.Pretty(), p.ID.Pretty())
//
mes := new(PBMessage)
mes.Id = proto.Uint64(swarm.GenerateMessageID())
Expand Down Expand Up @@ -161,6 +164,7 @@ func (bs *BitSwap) handleMessages() {
}
if pmes.GetResponse() {
bs.listener.Respond(pmes.GetId(), mes)
continue
}

switch pmes.GetType() {
Expand All @@ -176,30 +180,39 @@ func (bs *BitSwap) handleMessages() {
}

func (bs *BitSwap) handleGetBlock(p *peer.Peer, pmes *PBMessage) {
u.DOut("handleGetBlock.\n")
ledger := bs.GetLedger(p.Key())

u.DOut("finding [%s] in datastore.\n", u.Key(pmes.GetKey()).Pretty())
idata, err := bs.datastore.Get(ds.NewKey(pmes.GetKey()))
if err != nil {
u.PErr("handleGetBlock datastore returned: %v\n", err)
if err == ds.ErrNotFound {
return
}
u.PErr("%v\n", err)
return
}

u.DOut("found value!\n")
data, ok := idata.([]byte)
if !ok {
u.PErr("Failed casting data from datastore.")
return
}

if ledger.ShouldSend() {
u.DOut("Sending value back!\n")
resp := &Message{
Value: data,
Response: true,
ID: pmes.GetId(),
Type: PBMessage_GET_BLOCK,
Success: true,
}
bs.meschan.Outgoing <- swarm.NewMessage(p, resp.ToProtobuf())
ledger.SentBytes(uint64(len(data)))
} else {
u.DOut("Ledger decided not to send anything...\n")
}
}

Expand All @@ -210,6 +223,7 @@ func (bs *BitSwap) GetLedger(k u.Key) *Ledger {
}

l = new(Ledger)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the ledger may be in the datastore, should check there first before making a new one. (partners is only for currently active connections, not all ledgers known)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what do we want to use as a key for the ledgers?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could be something like /ledger/<peerid>

l.Strategy = StandardStrategy
l.Partner = peer.ID(k)
bs.partners[k] = l
return l
Expand Down
6 changes: 3 additions & 3 deletions bitswap/ledger.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package bitswap

import (
"math/rand"

peer "github.com/jbenet/go-ipfs/peer"
u "github.com/jbenet/go-ipfs/util"

Expand All @@ -26,13 +24,15 @@ type Ledger struct {

// WantList is a (bounded, small) set of keys that Partner desires.
WantList KeySet

Strategy StrategyFunc
}

// LedgerMap lists Ledgers by their Partner key.
type LedgerMap map[u.Key]*Ledger

func (l *Ledger) ShouldSend() bool {
return rand.Float64() <= probabilitySend(l.Accounting.Value())
return l.Strategy(l.Accounting)
}

func (l *Ledger) SentBytes(n uint64) {
Expand Down
2 changes: 2 additions & 0 deletions bitswap/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
)

type Message struct {
Type PBMessage_MessageType
ID uint64
Response bool
Key u.Key
Expand All @@ -16,6 +17,7 @@ type Message struct {
func (m *Message) ToProtobuf() *PBMessage {
pmes := new(PBMessage)
pmes.Id = &m.ID
pmes.Type = &m.Type
if m.Response {
pmes.Response = proto.Bool(true)
}
Expand Down
11 changes: 11 additions & 0 deletions bitswap/strategy.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,19 @@ package bitswap

import (
"math"
"math/rand"
)

type StrategyFunc func(debtRatio) bool

func StandardStrategy(db debtRatio) bool {
return rand.Float64() <= probabilitySend(db.Value())
}

func YesManStrategy(db debtRatio) bool {
return true
}

func probabilitySend(ratio float64) float64 {
x := 1 + math.Exp(6-3*ratio)
y := 1 / x
Expand Down
45 changes: 30 additions & 15 deletions blockservice/blockservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package blockservice

import (
"fmt"
"time"

ds "github.com/jbenet/datastore.go"
bitswap "github.com/jbenet/go-ipfs/bitswap"
Expand All @@ -19,36 +20,50 @@ type BlockService struct {
}

// NewBlockService creates a BlockService with given datastore instance.
func NewBlockService(d ds.Datastore) (*BlockService, error) {
func NewBlockService(d ds.Datastore, rem *bitswap.BitSwap) (*BlockService, error) {
if d == nil {
return nil, fmt.Errorf("BlockService requires valid datastore")
}
return &BlockService{Datastore: d}, nil
if rem == nil {
return nil, fmt.Errorf("BlockService requires a valid bitswap")
}
return &BlockService{Datastore: d, Remote: rem}, nil
}

// AddBlock adds a particular block to the service, Putting it into the datastore.
func (s *BlockService) AddBlock(b *blocks.Block) (u.Key, error) {
k := b.Key()
dsk := ds.NewKey(string(k))
return k, s.Datastore.Put(dsk, b.Data)
u.DOut("storing [%s] in datastore\n", k.Pretty())
err := s.Datastore.Put(dsk, b.Data)
if err != nil {
return k, err
}
err = s.Remote.HaveBlock(b.Key())
return k, err
}

// GetBlock retrieves a particular block from the service,
// Getting it from the datastore using the key (hash).
func (s *BlockService) GetBlock(k u.Key) (*blocks.Block, error) {
dsk := ds.NewKey(string(k))
datai, err := s.Datastore.Get(dsk)
if err != nil {
return nil, err
if err == nil {
bdata, ok := datai.([]byte)
if !ok {
return nil, fmt.Errorf("data associated with %s is not a []byte", k)
}
return &blocks.Block{
Multihash: mh.Multihash(k),
Data: bdata,
}, nil
} else if err == ds.ErrNotFound {
blk, err := s.Remote.GetBlock(k, time.Second*5)
if err != nil {
return nil, err
}
return blk, nil
} else {
return nil, u.ErrNotFound
}

data, ok := datai.([]byte)
if !ok {
return nil, fmt.Errorf("data associated with %s is not a []byte", k)
}

return &blocks.Block{
Multihash: mh.Multihash(k),
Data: data,
}, nil
}
2 changes: 1 addition & 1 deletion core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func NewIpfsNode(cfg *config.Config) (*IpfsNode, error) {
return nil, err
}

bs, err := bserv.NewBlockService(d)
bs, err := bserv.NewBlockService(d, nil)
if err != nil {
return nil, err
}
Expand Down
22 changes: 14 additions & 8 deletions routing/dht/dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,11 @@ type IpfsDHT struct {
}

// NewDHT creates a new DHT object with the given peer as the 'local' host
func NewDHT(p *peer.Peer, net swarm.Network) *IpfsDHT {
func NewDHT(p *peer.Peer, net swarm.Network, dstore ds.Datastore) *IpfsDHT {
dht := new(IpfsDHT)
dht.network = net
dht.netChan = net.GetChannel(swarm.PBWrapper_DHT_MESSAGE)
dht.datastore = ds.NewMapDatastore()
dht.datastore = dstore
dht.self = p
dht.providers = NewProviderManager()
dht.shutdown = make(chan struct{})
Expand Down Expand Up @@ -322,6 +322,7 @@ type providerInfo struct {

func (dht *IpfsDHT) handleAddProvider(p *peer.Peer, pmes *PBDHTMessage) {
key := u.Key(pmes.GetKey())
u.DOut("[%s] Adding [%s] as a provider for '%s'\n", dht.self.ID.Pretty(), p.ID.Pretty(), peer.ID(key).Pretty())
dht.providers.AddProvider(key, p)
}

Expand Down Expand Up @@ -615,12 +616,8 @@ func (dht *IpfsDHT) addPeerList(key u.Key, peers []*PBDHTMessage_PBPeer) []*peer
p := dht.network.Find(u.Key(prov.GetId()))
if p == nil {
u.DOut("given provider %s was not in our network already.\n", peer.ID(prov.GetId()).Pretty())
maddr, err := ma.NewMultiaddr(prov.GetAddr())
if err != nil {
u.PErr("error connecting to new peer: %s\n", err)
continue
}
p, err = dht.network.GetConnection(peer.ID(prov.GetId()), maddr)
var err error
p, err = dht.peerFromInfo(prov)
if err != nil {
u.PErr("error connecting to new peer: %s\n", err)
continue
Expand All @@ -631,3 +628,12 @@ func (dht *IpfsDHT) addPeerList(key u.Key, peers []*PBDHTMessage_PBPeer) []*peer
}
return provArr
}

func (dht *IpfsDHT) peerFromInfo(pbp *PBDHTMessage_PBPeer) (*peer.Peer, error) {
maddr, err := ma.NewMultiaddr(pbp.GetAddr())
if err != nil {
return nil, err
}

return dht.network.GetConnection(peer.ID(pbp.GetId()), maddr)
}
11 changes: 6 additions & 5 deletions routing/dht/dht_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package dht
import (
"testing"

ds "github.com/jbenet/datastore.go"
peer "github.com/jbenet/go-ipfs/peer"
swarm "github.com/jbenet/go-ipfs/swarm"
u "github.com/jbenet/go-ipfs/util"
Expand Down Expand Up @@ -37,7 +38,7 @@ func setupDHTS(n int, t *testing.T) ([]*ma.Multiaddr, []*peer.Peer, []*IpfsDHT)
if err != nil {
t.Fatal(err)
}
d := NewDHT(peers[i], net)
d := NewDHT(peers[i], net, ds.NewMapDatastore())
dhts = append(dhts, d)
d.Start()
}
Expand Down Expand Up @@ -69,14 +70,14 @@ func TestPing(t *testing.T) {
if err != nil {
t.Fatal(err)
}
dhtA := NewDHT(peerA, neta)
dhtA := NewDHT(peerA, neta, ds.NewMapDatastore())

netb := swarm.NewSwarm(peerB)
err = netb.Listen()
if err != nil {
t.Fatal(err)
}
dhtB := NewDHT(peerB, netb)
dhtB := NewDHT(peerB, netb, ds.NewMapDatastore())

dhtA.Start()
dhtB.Start()
Expand Down Expand Up @@ -120,14 +121,14 @@ func TestValueGetSet(t *testing.T) {
if err != nil {
t.Fatal(err)
}
dhtA := NewDHT(peerA, neta)
dhtA := NewDHT(peerA, neta, ds.NewMapDatastore())

netb := swarm.NewSwarm(peerB)
err = netb.Listen()
if err != nil {
t.Fatal(err)
}
dhtB := NewDHT(peerB, netb)
dhtB := NewDHT(peerB, netb, ds.NewMapDatastore())

dhtA.Start()
dhtB.Start()
Expand Down
7 changes: 4 additions & 3 deletions routing/dht/ext_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

"code.google.com/p/goprotobuf/proto"

ds "github.com/jbenet/datastore.go"
peer "github.com/jbenet/go-ipfs/peer"
swarm "github.com/jbenet/go-ipfs/swarm"
u "github.com/jbenet/go-ipfs/util"
Expand Down Expand Up @@ -89,7 +90,7 @@ func TestGetFailures(t *testing.T) {
local := new(peer.Peer)
local.ID = peer.ID("test_peer")

d := NewDHT(local, fn)
d := NewDHT(local, fn, ds.NewMapDatastore())

other := &peer.Peer{ID: peer.ID("other_peer")}

Expand Down Expand Up @@ -177,7 +178,7 @@ func TestNotFound(t *testing.T) {
local := new(peer.Peer)
local.ID = peer.ID("test_peer")

d := NewDHT(local, fn)
d := NewDHT(local, fn, ds.NewMapDatastore())
d.Start()

var ps []*peer.Peer
Expand Down Expand Up @@ -239,7 +240,7 @@ func TestLessThanKResponses(t *testing.T) {
local := new(peer.Peer)
local.ID = peer.ID("test_peer")

d := NewDHT(local, fn)
d := NewDHT(local, fn, ds.NewMapDatastore())
d.Start()

var ps []*peer.Peer
Expand Down
Loading