Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bitswap #32

Merged
merged 7 commits into from
Aug 30, 2014
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
*.swp
.ipfsconfig
*.out
*.test
9 changes: 9 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,15 @@ Guidelines:
- if you'd like to work on ipfs part-time (20+ hrs/wk) or full-time (40+ hrs/wk), contact [@jbenet](https://github.com/jbenet)
- have fun!

## Todo

Ipfs is still under heavy development, there is a lot to be done!

- [ ] Finish Bitswap
- [ ] Connect fuse interface to Blockservice
- [ ] Write tests for bitswap
- [ ] Come up with more TODO items

## Development Dependencies

If you make changes to the protocol buffers, you will need to install the [protoc compiler](https://code.google.com/p/protobuf/downloads/list).
Expand Down
180 changes: 172 additions & 8 deletions bitswap/bitswap.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
package bitswap

import (
"code.google.com/p/goprotobuf/proto"
blocks "github.com/jbenet/go-ipfs/blocks"
peer "github.com/jbenet/go-ipfs/peer"
routing "github.com/jbenet/go-ipfs/routing"
dht "github.com/jbenet/go-ipfs/routing/dht"
swarm "github.com/jbenet/go-ipfs/swarm"
u "github.com/jbenet/go-ipfs/util"

ds "github.com/jbenet/datastore.go"

"errors"
"time"
)

Expand All @@ -27,12 +29,18 @@ type BitSwap struct {
peer *peer.Peer

// net holds the connections to all peers.
net swarm.Network
net swarm.Network
meschan *swarm.Chan

// datastore is the local database
// Ledgers of known
datastore ds.Datastore

// routing interface for communication
routing *dht.IpfsDHT

listener *swarm.MesListener

// partners is a map of currently active bitswap relationships.
// The Ledger has the peer.ID, and the peer connection works through net.
// Ledgers of known relationships (active or inactive) stored in datastore.
Expand All @@ -44,27 +52,183 @@ type BitSwap struct {

// wantList is the set of keys we want values for. a map for fast lookups.
wantList KeySet

haltChan chan struct{}
}

// NewBitSwap creates a new BitSwap instance. It does not check its parameters.
func NewBitSwap(p *peer.Peer, net swarm.Network, d ds.Datastore) *BitSwap {
return &BitSwap{
func NewBitSwap(p *peer.Peer, net swarm.Network, d ds.Datastore, r routing.IpfsRouting) *BitSwap {
bs := &BitSwap{
peer: p,
net: net,
datastore: d,
partners: LedgerMap{},
wantList: KeySet{},
routing: r.(*dht.IpfsDHT),
meschan: net.GetChannel(swarm.PBWrapper_BITSWAP),
haltChan: make(chan struct{}),
listener: swarm.NewMesListener(),
}

go bs.handleMessages()
return bs
}

// GetBlock attempts to retrieve a particular block from peers, within timeout.
func (s *BitSwap) GetBlock(k u.Key, timeout time.Time) (
func (bs *BitSwap) GetBlock(k u.Key, timeout time.Duration) (
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should BitSwap.GetBlock check datastore? or should the block service? not sure.

Also, this will be determined once we see how fast the DHT is in practice, but may want to just broadcast the new wanted key to existing bs.partners

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

right now, the block service is accessing the datastore, so checking that in the bitswap would be rather... redundant and time wasteful.

As for broadcasting the new key, right now, GetBlock sends a request out, and then waits for the other peers to respond with the block. We could change this to just broadcasting the want list, and waiting for the other peers to decide to send those blocks back. This is something we need to discuss before we go much further.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

right now, the block service is accessing the datastore, so checking that in the bitswap would be rather... redundant and time wasteful.

Yeah, I'd mean do it here instead. shrug

GetBlock sends a request out, and then waits for the other peers to respond with the block. We could change this to just broadcasting the want list, and waiting for the other peers to decide to send those blocks back. This is something we need to discuss before we go much further.

It really depends on how big the wantlist is. if it spans many packets, then makes sense to send a condensed version. yeah let's discuss

*blocks.Block, error) {
return nil, errors.New("not implemented")
begin := time.Now()
tleft := timeout - time.Now().Sub(begin)
provs_ch := bs.routing.FindProvidersAsync(k, 20, timeout)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wondering if timeout here should be a fraction (given that after routing returns, still need to go and use bitswap peers)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe... but my worry is that shortening that timeout may result in it actually timing out, and if it times out, then we have no reason to actually continue with the rest of the function call, which... makes this function "timeout" before the given timeout.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fair. either way doesn't matter much atm.


valchan := make(chan []byte)
after := time.After(tleft)

// TODO: when the data is received, shut down this for loop
go func() {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure why this outer go func() { is needed? can't we just go into the for-loop (which makes goroutines)?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The outer goroutine is needed because the for loop is looping over the range of a channel, the channel comes from a FindProviders call that is sending providers in on the channel as it recieves them from the dht. So if the first provider we try gives us our value, we want to be able to finish and return from the function before the rest of the providers come in off the channel.

for p := range provs_ch {
go func(pr *peer.Peer) {
ledger := bs.GetLedger(pr.Key())
blk, err := bs.getBlock(k, pr, tleft)
if err != nil {
u.PErr("getBlock returned: %v\n", err)
return
}
// NOTE: this credits everyone who sends us a block,
// even if we dont use it
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, TRTTD. we can worry about rewarding low latency later.

ledger.ReceivedBytes(uint64(len(blk)))
select {
case valchan <- blk:
default:
}
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what does the select do here? why not just valchan <- blk? (i don't know Go well enough to know there isn't some subtlety here)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, will default happen if valchan is closed?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeap, this basically says that once we have recieved from valchan once, the default path will occur. because if you try to send on a closed channel, go panics.

}(p)
}
}()

select {
case blkdata := <-valchan:
close(valchan)
return blocks.NewBlock(blkdata)
case <-after:
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Go is so cool.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It satisfies me so much to be able to write code in patterns like this.

return nil, u.ErrTimeout
}
}

func (bs *BitSwap) getBlock(k u.Key, p *peer.Peer, timeout time.Duration) ([]byte, error) {
u.DOut("[%s] getBlock '%s' from [%s]\n", bs.peer.ID.Pretty(), k.Pretty(), p.ID.Pretty())
//
mes := new(PBMessage)
mes.Id = proto.Uint64(swarm.GenerateMessageID())
mes.Key = proto.String(string(k))
typ := PBMessage_GET_BLOCK
mes.Type = &typ
//

after := time.After(timeout)
resp := bs.listener.Listen(mes.GetId(), 1, timeout)
smes := swarm.NewMessage(p, mes)
bs.meschan.Outgoing <- smes

select {
case resp_mes := <-resp:
pmes := new(PBMessage)
err := proto.Unmarshal(resp_mes.Data, pmes)
if err != nil {
return nil, err
}
if pmes.GetSuccess() {
return pmes.GetValue(), nil
}
return nil, u.ErrNotFound
case <-after:
u.PErr("getBlock for '%s' timed out.\n", k)
return nil, u.ErrTimeout
}
}

// HaveBlock announces the existance of a block to BitSwap, potentially sending
// it to peers (Partners) whose WantLists include it.
func (s *BitSwap) HaveBlock(k u.Key) (*blocks.Block, error) {
return nil, errors.New("not implemented")
func (bs *BitSwap) HaveBlock(k u.Key) error {
return bs.routing.Provide(k)
}
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing here is that we could be tracking our partners last-received wantlists, so we could see who wants this block and send it right away. I'm not too sure about this, may be premature optimization, because it could waste lots of bandwidth for reducing latency. this will be impt to test/model.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agreed. We can probably just spin off a goroutine to check and manage sending all that information.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(fix to above: ledger already has ledger.WantList)


func (bs *BitSwap) handleMessages() {
for {
select {
case mes := <-bs.meschan.Incoming:
pmes := new(PBMessage)
err := proto.Unmarshal(mes.Data, pmes)
if err != nil {
u.PErr("%v\n", err)
continue
}
if pmes.GetResponse() {
bs.listener.Respond(pmes.GetId(), mes)
continue
}

switch pmes.GetType() {
case PBMessage_GET_BLOCK:
go bs.handleGetBlock(mes.Peer, pmes)
default:
u.PErr("Invalid message type.\n")
}
case <-bs.haltChan:
return
}
}
}

func (bs *BitSwap) handleGetBlock(p *peer.Peer, pmes *PBMessage) {
u.DOut("handleGetBlock.\n")
ledger := bs.GetLedger(p.Key())

u.DOut("finding [%s] in datastore.\n", u.Key(pmes.GetKey()).Pretty())
idata, err := bs.datastore.Get(ds.NewKey(pmes.GetKey()))
if err != nil {
u.PErr("handleGetBlock datastore returned: %v\n", err)
if err == ds.ErrNotFound {
return
}
return
}

u.DOut("found value!\n")
data, ok := idata.([]byte)
if !ok {
u.PErr("Failed casting data from datastore.")
return
}

if ledger.ShouldSend() {
u.DOut("Sending value back!\n")
resp := &Message{
Value: data,
Response: true,
ID: pmes.GetId(),
Type: PBMessage_GET_BLOCK,
Success: true,
}
bs.meschan.Outgoing <- swarm.NewMessage(p, resp.ToProtobuf())
ledger.SentBytes(uint64(len(data)))
} else {
u.DOut("Ledger decided not to send anything...\n")
}
}

func (bs *BitSwap) GetLedger(k u.Key) *Ledger {
l, ok := bs.partners[k]
if ok {
return l
}

l = new(Ledger)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the ledger may be in the datastore, should check there first before making a new one. (partners is only for currently active connections, not all ledgers known)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what do we want to use as a key for the ledgers?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could be something like /ledger/<peerid>

l.Strategy = StandardStrategy
l.Partner = peer.ID(k)
bs.partners[k] = l
return l
}

func (bs *BitSwap) Halt() {
bs.haltChan <- struct{}{}
}
21 changes: 16 additions & 5 deletions bitswap/ledger.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,8 @@ type Ledger struct {
// Partner is the ID of the remote Peer.
Partner peer.ID

// BytesSent counts the number of bytes the local peer sent to Partner
BytesSent uint64

// BytesReceived counts the number of bytes local peer received from Partner
BytesReceived uint64
// Accounting tracks bytes sent and recieved.
Accounting debtRatio

// FirstExchnage is the time of the first data exchange.
FirstExchange *time.Time
Expand All @@ -27,7 +24,21 @@ type Ledger struct {

// WantList is a (bounded, small) set of keys that Partner desires.
WantList KeySet

Strategy StrategyFunc
}

// LedgerMap lists Ledgers by their Partner key.
type LedgerMap map[u.Key]*Ledger

func (l *Ledger) ShouldSend() bool {
return l.Strategy(l.Accounting)
}

func (l *Ledger) SentBytes(n uint64) {
l.Accounting.BytesSent += n
}

func (l *Ledger) ReceivedBytes(n uint64) {
l.Accounting.BytesRecv += n
}
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should these functions commit to the datastore? what happens if there's a crash right after? our ledgers are unsynced.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(or maybe commit outside of them, when called, since ledger doesn't have a datastore ptr)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, im not sure how to store the ledger in the datastore yet.

32 changes: 32 additions & 0 deletions bitswap/message.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package bitswap

import (
"code.google.com/p/goprotobuf/proto"
u "github.com/jbenet/go-ipfs/util"
)

type Message struct {
Type PBMessage_MessageType
ID uint64
Response bool
Key u.Key
Value []byte
Success bool
}

func (m *Message) ToProtobuf() *PBMessage {
pmes := new(PBMessage)
pmes.Id = &m.ID
pmes.Type = &m.Type
if m.Response {
pmes.Response = proto.Bool(true)
}

if m.Success {
pmes.Success = proto.Bool(true)
}

pmes.Key = proto.String(string(m.Key))
pmes.Value = m.Value
return pmes
}
Loading