Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(bitswap) extract notifications #53

Closed
wants to merge 8 commits into from
Prev Previous commit
Next Next commit
refactor(bitswap) meslistener -> notifications
  • Loading branch information
Brian Tiger Chow committed Sep 14, 2014
commit 8ce7ff1f421ef1669b3e20faf959333e37423b4d
52 changes: 24 additions & 28 deletions bitswap/bitswap.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package bitswap
import (
"time"

context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
proto "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/goprotobuf/proto"
ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/datastore.go"

Expand Down Expand Up @@ -38,7 +39,7 @@ type BitSwap struct {
// routing interface for communication
routing *dht.IpfsDHT

listener *swarm.MessageListener
notifications *notifications

// partners is a map of currently active bitswap relationships.
// The Ledger has the peer.ID, and the peer connection works through net.
Expand All @@ -60,15 +61,15 @@ type BitSwap struct {
// NewBitSwap creates a new BitSwap instance. It does not check its parameters.
func NewBitSwap(p *peer.Peer, net swarm.Network, d ds.Datastore, r routing.IpfsRouting) *BitSwap {
bs := &BitSwap{
peer: p,
net: net,
datastore: d,
partners: LedgerMap{},
wantList: KeySet{},
routing: r.(*dht.IpfsDHT),
meschan: net.GetChannel(swarm.PBWrapper_BITSWAP),
haltChan: make(chan struct{}),
listener: swarm.NewMessageListener(),
peer: p,
net: net,
datastore: d,
partners: LedgerMap{},
wantList: KeySet{},
routing: r.(*dht.IpfsDHT),
meschan: net.GetChannel(swarm.PBWrapper_BITSWAP),
haltChan: make(chan struct{}),
notifications: newNotifications(),
}

go bs.handleMessages()
Expand All @@ -83,7 +84,7 @@ func (bs *BitSwap) GetBlock(k u.Key, timeout time.Duration) (
tleft := timeout - time.Now().Sub(begin)
provs_ch := bs.routing.FindProvidersAsync(k, 20, timeout)

valchan := make(chan []byte)
blockChannel := make(chan *blocks.Block)
after := time.After(tleft)

// TODO: when the data is received, shut down this for loop ASAP
Expand All @@ -96,39 +97,38 @@ func (bs *BitSwap) GetBlock(k u.Key, timeout time.Duration) (
return
}
select {
case valchan <- blk:
case blockChannel <- blk:
default:
}
}(p)
}
}()

select {
case blkdata := <-valchan:
close(valchan)
return blocks.NewBlock(blkdata)
case block := <-blockChannel:
close(blockChannel)
return block, nil
case <-after:
return nil, u.ErrTimeout
}
}

func (bs *BitSwap) getBlock(k u.Key, p *peer.Peer, timeout time.Duration) ([]byte, error) {
func (bs *BitSwap) getBlock(k u.Key, p *peer.Peer, timeout time.Duration) (*blocks.Block, error) {
u.DOut("[%s] getBlock '%s' from [%s]\n", bs.peer.ID.Pretty(), k.Pretty(), p.ID.Pretty())

ctx, _ := context.WithTimeout(context.Background(), timeout)
blockChannel := bs.notifications.Subscribe(ctx, k)

message := newMessage()
message.AppendWanted(k)

after := time.After(timeout)
resp := bs.listener.Listen(string(k), 1, timeout)
bs.meschan.Outgoing <- message.ToSwarm(p)

select {
case resp_mes := <-resp:
return resp_mes.Data, nil
case <-after:
block, ok := <-blockChannel
if !ok {
u.PErr("getBlock for '%s' timed out.\n", k.Pretty())
return nil, u.ErrTimeout
}
return block, nil
}

// HaveBlock announces the existance of a block to BitSwap, potentially sending
Expand Down Expand Up @@ -229,11 +229,7 @@ func (bs *BitSwap) blockReceive(p *peer.Peer, blk *blocks.Block) {
return
}

mes := &swarm.Message{
Peer: p,
Data: blk.Data,
}
bs.listener.Respond(string(blk.Key()), mes)
bs.notifications.Publish(blk)

ledger := bs.getLedger(p)
ledger.ReceivedBytes(len(blk.Data))
Expand Down