Skip to content

Commit

Permalink
rework bitswap to reflect discussion on PR #32
Browse files Browse the repository at this point in the history
  • Loading branch information
whyrusleeping committed Aug 28, 2014
1 parent af2f04a commit fcff5a5
Show file tree
Hide file tree
Showing 11 changed files with 141 additions and 243 deletions.
159 changes: 89 additions & 70 deletions bitswap/bitswap.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ type BitSwap struct {
// wantList is the set of keys we want values for. a map for fast lookups.
wantList KeySet

strategy StrategyFunc

haltChan chan struct{}
}

Expand Down Expand Up @@ -87,15 +89,11 @@ func (bs *BitSwap) GetBlock(k u.Key, timeout time.Duration) (
go func() {
for p := range provs_ch {
go func(pr *peer.Peer) {
ledger := bs.GetLedger(pr)
blk, err := bs.getBlock(k, pr, tleft)
if err != nil {
u.PErr("getBlock returned: %v\n", err)
return
}
// NOTE: this credits everyone who sends us a block,
// even if we dont use it
ledger.ReceivedBytes(uint64(len(blk)))
select {
case valchan <- blk:
default:
Expand All @@ -115,30 +113,18 @@ func (bs *BitSwap) GetBlock(k u.Key, timeout time.Duration) (

func (bs *BitSwap) getBlock(k u.Key, p *peer.Peer, timeout time.Duration) ([]byte, error) {
u.DOut("[%s] getBlock '%s' from [%s]\n", bs.peer.ID.Pretty(), k.Pretty(), p.ID.Pretty())
//
mes := new(PBMessage)
mes.Id = proto.Uint64(swarm.GenerateMessageID())
mes.Key = proto.String(string(k))
typ := PBMessage_GET_BLOCK
mes.Type = &typ
//

pmes := new(PBMessage)
pmes.Wantlist = []string{string(k)}

after := time.After(timeout)
resp := bs.listener.Listen(mes.GetId(), 1, timeout)
smes := swarm.NewMessage(p, mes)
resp := bs.listener.Listen(string(k), 1, timeout)
smes := swarm.NewMessage(p, pmes)
bs.meschan.Outgoing <- smes

select {
case resp_mes := <-resp:
pmes := new(PBMessage)
err := proto.Unmarshal(resp_mes.Data, pmes)
if err != nil {
return nil, err
}
if pmes.GetSuccess() {
return pmes.GetValue(), nil
}
return nil, u.ErrNotFound
return resp_mes.Data, nil
case <-after:
u.PErr("getBlock for '%s' timed out.\n", k)
return nil, u.ErrTimeout
Expand All @@ -147,8 +133,26 @@ func (bs *BitSwap) getBlock(k u.Key, p *peer.Peer, timeout time.Duration) ([]byt

// HaveBlock announces the existance of a block to BitSwap, potentially sending
// it to peers (Partners) whose WantLists include it.
func (bs *BitSwap) HaveBlock(k u.Key) error {
return bs.routing.Provide(k)
func (bs *BitSwap) HaveBlock(blk *blocks.Block) error {
go func() {
for _, ledger := range bs.partners {
if _, ok := ledger.WantList[blk.Key()]; ok {
//send block to node
if ledger.ShouldSend() {
bs.SendBlock(ledger.Partner, blk)
}
}
}
}()
return bs.routing.Provide(blk.Key())
}

func (bs *BitSwap) SendBlock(p *peer.Peer, b *blocks.Block) {
pmes := new(PBMessage)
pmes.Blocks = [][]byte{b.Data}

swarm_mes := swarm.NewMessage(p, pmes)
bs.meschan.Outgoing <- swarm_mes
}

func (bs *BitSwap) handleMessages() {
Expand All @@ -161,71 +165,81 @@ func (bs *BitSwap) handleMessages() {
u.PErr("%v\n", err)
continue
}
if pmes.GetResponse() {
bs.listener.Respond(pmes.GetId(), mes)
continue
if pmes.Blocks != nil {
for _, blkData := range pmes.Blocks {
blk, err := blocks.NewBlock(blkData)
if err != nil {
u.PErr("%v\n", err)
continue
}
go bs.blockReceive(mes.Peer, blk)
}
}

switch pmes.GetType() {
case PBMessage_GET_BLOCK:
go bs.handleGetBlock(mes.Peer, pmes)
case PBMessage_WANT_BLOCK:
go bs.handleWantBlock(mes.Peer, pmes)
default:
u.PErr("Invalid message type.\n")
if pmes.Wantlist != nil {
for _, want := range pmes.Wantlist {
go bs.peerWantsBlock(mes.Peer, want)
}
}
case <-bs.haltChan:
return
}
}
}

func (bs *BitSwap) handleWantBlock(p *peer.Peer, pmes *PBMessage) {
wants := pmes.GetWantlist()
// peerWantsBlock will check if we have the block in question,
// and then if we do, check the ledger for whether or not we should send it.
func (bs *BitSwap) peerWantsBlock(p *peer.Peer, want string) {
u.DOut("peer [%s] wants block [%s]\n", p.ID.Pretty(), u.Key(want).Pretty())
ledg := bs.GetLedger(p)
for _, s := range wants {
// TODO: this needs to be different. We need timeouts.
ledg.WantList[u.Key(s)] = struct{}{}
}
}

func (bs *BitSwap) handleGetBlock(p *peer.Peer, pmes *PBMessage) {
u.DOut("handleGetBlock.\n")
ledger := bs.GetLedger(p)

u.DOut("finding [%s] in datastore.\n", u.Key(pmes.GetKey()).Pretty())
idata, err := bs.datastore.Get(ds.NewKey(pmes.GetKey()))
dsk := ds.NewKey(want)
blk_i, err := bs.datastore.Get(dsk)
if err != nil {
u.PErr("handleGetBlock datastore returned: %v\n", err)
if err == ds.ErrNotFound {
return
// TODO: this needs to be different. We need timeouts.
ledg.WantList[u.Key(want)] = struct{}{}
}
u.PErr("datastore get error: %v\n", err)
return
}

u.DOut("found value!\n")
data, ok := idata.([]byte)
blk, ok := blk_i.([]byte)
if !ok {
u.PErr("Failed casting data from datastore.")
u.PErr("data conversion error.\n")
return
}

if ledger.ShouldSend() {
u.DOut("Sending value back!\n")
resp := &Message{
Value: data,
Response: true,
ID: pmes.GetId(),
Type: PBMessage_GET_BLOCK,
Success: true,
if ledg.ShouldSend() {
u.DOut("Sending block to peer.\n")
bblk, err := blocks.NewBlock(blk)
if err != nil {
u.PErr("newBlock error: %v\n", err)
return
}
bs.meschan.Outgoing <- swarm.NewMessage(p, resp.ToProtobuf())
ledger.SentBytes(uint64(len(data)))
} else {
u.DOut("Ledger decided not to send anything...\n")
bs.SendBlock(p, bblk)
ledg.SentBytes(len(blk))
}
}

func (bs *BitSwap) blockReceive(p *peer.Peer, blk *blocks.Block) {
u.DOut("blockReceive: %s\n", blk.Key().Pretty())
err := bs.datastore.Put(ds.NewKey(string(blk.Key())), blk.Data)
if err != nil {
u.PErr("blockReceive error: %v\n", err)
return
}

mes := &swarm.Message{
Peer: p,
Data: blk.Data,
}
bs.listener.Respond(string(blk.Key()), mes)

ledger := bs.GetLedger(p)
ledger.ReceivedBytes(len(blk.Data))
}

func (bs *BitSwap) GetLedger(p *peer.Peer) *Ledger {
l, ok := bs.partners[p.Key()]
if ok {
Expand All @@ -240,16 +254,14 @@ func (bs *BitSwap) GetLedger(p *peer.Peer) *Ledger {
}

func (bs *BitSwap) SendWantList(wl KeySet) error {
mes := Message{
ID: swarm.GenerateMessageID(),
Type: PBMessage_WANT_BLOCK,
WantList: bs.wantList,
pmes := new(PBMessage)
for k, _ := range wl {
pmes.Wantlist = append(pmes.Wantlist, string(k))
}

pbmes := mes.ToProtobuf()
// Lets just ping everybody all at once
for _, ledger := range bs.partners {
bs.meschan.Outgoing <- swarm.NewMessage(ledger.Partner, pbmes)
bs.meschan.Outgoing <- swarm.NewMessage(ledger.Partner, pmes)
}

return nil
Expand All @@ -258,3 +270,10 @@ func (bs *BitSwap) SendWantList(wl KeySet) error {
func (bs *BitSwap) Halt() {
bs.haltChan <- struct{}{}
}

func (bs *BitSwap) SetStrategy(sf StrategyFunc) {
bs.strategy = sf
for _, ledg := range bs.partners {
ledg.Strategy = sf
}
}
15 changes: 10 additions & 5 deletions bitswap/ledger.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ type Ledger struct {
// LastExchange is the time of the last data exchange.
LastExchange time.Time

// Number of exchanges with this peer
ExchangeCount uint64

// WantList is a (bounded, small) set of keys that Partner desires.
WantList KeySet

Expand All @@ -32,15 +35,17 @@ type Ledger struct {
type LedgerMap map[u.Key]*Ledger

func (l *Ledger) ShouldSend() bool {
return l.Strategy(l.Accounting)
return l.Strategy(l)
}

func (l *Ledger) SentBytes(n uint64) {
func (l *Ledger) SentBytes(n int) {
l.ExchangeCount++
l.LastExchange = time.Now()
l.Accounting.BytesSent += n
l.Accounting.BytesSent += uint64(n)
}

func (l *Ledger) ReceivedBytes(n uint64) {
func (l *Ledger) ReceivedBytes(n int) {
l.ExchangeCount++
l.LastExchange = time.Now()
l.Accounting.BytesRecv += n
l.Accounting.BytesRecv += uint64(n)
}
41 changes: 0 additions & 41 deletions bitswap/message.go

This file was deleted.

Loading

0 comments on commit fcff5a5

Please sign in to comment.