-
-
Notifications
You must be signed in to change notification settings - Fork 3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Bitswap #32
Bitswap #32
Changes from 4 commits
c5e7273
91e4675
678db4f
cfdf01d
af2f04a
fcff5a5
691d1b3
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,4 @@ | ||
*.swp | ||
.ipfsconfig | ||
*.out | ||
*.test |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,14 +1,16 @@ | ||
package bitswap | ||
|
||
import ( | ||
"code.google.com/p/goprotobuf/proto" | ||
blocks "github.com/jbenet/go-ipfs/blocks" | ||
peer "github.com/jbenet/go-ipfs/peer" | ||
routing "github.com/jbenet/go-ipfs/routing" | ||
dht "github.com/jbenet/go-ipfs/routing/dht" | ||
swarm "github.com/jbenet/go-ipfs/swarm" | ||
u "github.com/jbenet/go-ipfs/util" | ||
|
||
ds "github.com/jbenet/datastore.go" | ||
|
||
"errors" | ||
"time" | ||
) | ||
|
||
|
@@ -27,12 +29,18 @@ type BitSwap struct { | |
peer *peer.Peer | ||
|
||
// net holds the connections to all peers. | ||
net swarm.Network | ||
net swarm.Network | ||
meschan *swarm.Chan | ||
|
||
// datastore is the local database | ||
// Ledgers of known | ||
datastore ds.Datastore | ||
|
||
// routing interface for communication | ||
routing *dht.IpfsDHT | ||
|
||
listener *swarm.MesListener | ||
|
||
// partners is a map of currently active bitswap relationships. | ||
// The Ledger has the peer.ID, and the peer connection works through net. | ||
// Ledgers of known relationships (active or inactive) stored in datastore. | ||
|
@@ -44,27 +52,183 @@ type BitSwap struct { | |
|
||
// wantList is the set of keys we want values for. a map for fast lookups. | ||
wantList KeySet | ||
|
||
haltChan chan struct{} | ||
} | ||
|
||
// NewBitSwap creates a new BitSwap instance. It does not check its parameters. | ||
func NewBitSwap(p *peer.Peer, net swarm.Network, d ds.Datastore) *BitSwap { | ||
return &BitSwap{ | ||
func NewBitSwap(p *peer.Peer, net swarm.Network, d ds.Datastore, r routing.IpfsRouting) *BitSwap { | ||
bs := &BitSwap{ | ||
peer: p, | ||
net: net, | ||
datastore: d, | ||
partners: LedgerMap{}, | ||
wantList: KeySet{}, | ||
routing: r.(*dht.IpfsDHT), | ||
meschan: net.GetChannel(swarm.PBWrapper_BITSWAP), | ||
haltChan: make(chan struct{}), | ||
listener: swarm.NewMesListener(), | ||
} | ||
|
||
go bs.handleMessages() | ||
return bs | ||
} | ||
|
||
// GetBlock attempts to retrieve a particular block from peers, within timeout. | ||
func (s *BitSwap) GetBlock(k u.Key, timeout time.Time) ( | ||
func (bs *BitSwap) GetBlock(k u.Key, timeout time.Duration) ( | ||
*blocks.Block, error) { | ||
return nil, errors.New("not implemented") | ||
begin := time.Now() | ||
tleft := timeout - time.Now().Sub(begin) | ||
provs_ch := bs.routing.FindProvidersAsync(k, 20, timeout) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. wondering if There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. maybe... but my worry is that shortening that timeout may result in it actually timing out, and if it times out, then we have no reason to actually continue with the rest of the function call, which... makes this function "timeout" before the given timeout. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. fair. either way doesn't matter much atm. |
||
|
||
valchan := make(chan []byte) | ||
after := time.After(tleft) | ||
|
||
// TODO: when the data is received, shut down this for loop | ||
go func() { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. not sure why this outer There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The outer goroutine is needed because the for loop is looping over the range of a channel, the channel comes from a FindProviders call that is sending providers in on the channel as it recieves them from the dht. So if the first provider we try gives us our value, we want to be able to finish and return from the function before the rest of the providers come in off the channel. |
||
for p := range provs_ch { | ||
go func(pr *peer.Peer) { | ||
ledger := bs.GetLedger(pr.Key()) | ||
blk, err := bs.getBlock(k, pr, tleft) | ||
if err != nil { | ||
u.PErr("getBlock returned: %v\n", err) | ||
return | ||
} | ||
// NOTE: this credits everyone who sends us a block, | ||
// even if we dont use it | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yeah, TRTTD. we can worry about rewarding low latency later. |
||
ledger.ReceivedBytes(uint64(len(blk))) | ||
select { | ||
case valchan <- blk: | ||
default: | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. what does the select do here? why not just There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oh, will There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yeap, this basically says that once we have recieved from valchan once, the default path will occur. because if you try to send on a closed channel, go panics. |
||
}(p) | ||
} | ||
}() | ||
|
||
select { | ||
case blkdata := <-valchan: | ||
close(valchan) | ||
return blocks.NewBlock(blkdata) | ||
case <-after: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Go is so cool. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It satisfies me so much to be able to write code in patterns like this. |
||
return nil, u.ErrTimeout | ||
} | ||
} | ||
|
||
func (bs *BitSwap) getBlock(k u.Key, p *peer.Peer, timeout time.Duration) ([]byte, error) { | ||
u.DOut("[%s] getBlock '%s' from [%s]\n", bs.peer.ID.Pretty(), k.Pretty(), p.ID.Pretty()) | ||
// | ||
mes := new(PBMessage) | ||
mes.Id = proto.Uint64(swarm.GenerateMessageID()) | ||
mes.Key = proto.String(string(k)) | ||
typ := PBMessage_GET_BLOCK | ||
mes.Type = &typ | ||
// | ||
|
||
after := time.After(timeout) | ||
resp := bs.listener.Listen(mes.GetId(), 1, timeout) | ||
smes := swarm.NewMessage(p, mes) | ||
bs.meschan.Outgoing <- smes | ||
|
||
select { | ||
case resp_mes := <-resp: | ||
pmes := new(PBMessage) | ||
err := proto.Unmarshal(resp_mes.Data, pmes) | ||
if err != nil { | ||
return nil, err | ||
} | ||
if pmes.GetSuccess() { | ||
return pmes.GetValue(), nil | ||
} | ||
return nil, u.ErrNotFound | ||
case <-after: | ||
u.PErr("getBlock for '%s' timed out.\n", k) | ||
return nil, u.ErrTimeout | ||
} | ||
} | ||
|
||
// HaveBlock announces the existance of a block to BitSwap, potentially sending | ||
// it to peers (Partners) whose WantLists include it. | ||
func (s *BitSwap) HaveBlock(k u.Key) (*blocks.Block, error) { | ||
return nil, errors.New("not implemented") | ||
func (bs *BitSwap) HaveBlock(k u.Key) error { | ||
return bs.routing.Provide(k) | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Missing here is that we could be tracking our There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. agreed. We can probably just spin off a goroutine to check and manage sending all that information. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. (fix to above: ledger already has |
||
|
||
func (bs *BitSwap) handleMessages() { | ||
for { | ||
select { | ||
case mes := <-bs.meschan.Incoming: | ||
pmes := new(PBMessage) | ||
err := proto.Unmarshal(mes.Data, pmes) | ||
if err != nil { | ||
u.PErr("%v\n", err) | ||
continue | ||
} | ||
if pmes.GetResponse() { | ||
bs.listener.Respond(pmes.GetId(), mes) | ||
continue | ||
} | ||
|
||
switch pmes.GetType() { | ||
case PBMessage_GET_BLOCK: | ||
go bs.handleGetBlock(mes.Peer, pmes) | ||
default: | ||
u.PErr("Invalid message type.\n") | ||
} | ||
case <-bs.haltChan: | ||
return | ||
} | ||
} | ||
} | ||
|
||
func (bs *BitSwap) handleGetBlock(p *peer.Peer, pmes *PBMessage) { | ||
u.DOut("handleGetBlock.\n") | ||
ledger := bs.GetLedger(p.Key()) | ||
|
||
u.DOut("finding [%s] in datastore.\n", u.Key(pmes.GetKey()).Pretty()) | ||
idata, err := bs.datastore.Get(ds.NewKey(pmes.GetKey())) | ||
if err != nil { | ||
u.PErr("handleGetBlock datastore returned: %v\n", err) | ||
if err == ds.ErrNotFound { | ||
return | ||
} | ||
return | ||
} | ||
|
||
u.DOut("found value!\n") | ||
data, ok := idata.([]byte) | ||
if !ok { | ||
u.PErr("Failed casting data from datastore.") | ||
return | ||
} | ||
|
||
if ledger.ShouldSend() { | ||
u.DOut("Sending value back!\n") | ||
resp := &Message{ | ||
Value: data, | ||
Response: true, | ||
ID: pmes.GetId(), | ||
Type: PBMessage_GET_BLOCK, | ||
Success: true, | ||
} | ||
bs.meschan.Outgoing <- swarm.NewMessage(p, resp.ToProtobuf()) | ||
ledger.SentBytes(uint64(len(data))) | ||
} else { | ||
u.DOut("Ledger decided not to send anything...\n") | ||
} | ||
} | ||
|
||
func (bs *BitSwap) GetLedger(k u.Key) *Ledger { | ||
l, ok := bs.partners[k] | ||
if ok { | ||
return l | ||
} | ||
|
||
l = new(Ledger) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. the ledger may be in the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. what do we want to use as a key for the ledgers? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. could be something like |
||
l.Strategy = StandardStrategy | ||
l.Partner = peer.ID(k) | ||
bs.partners[k] = l | ||
return l | ||
} | ||
|
||
func (bs *BitSwap) Halt() { | ||
bs.haltChan <- struct{}{} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -13,11 +13,8 @@ type Ledger struct { | |
// Partner is the ID of the remote Peer. | ||
Partner peer.ID | ||
|
||
// BytesSent counts the number of bytes the local peer sent to Partner | ||
BytesSent uint64 | ||
|
||
// BytesReceived counts the number of bytes local peer received from Partner | ||
BytesReceived uint64 | ||
// Accounting tracks bytes sent and recieved. | ||
Accounting debtRatio | ||
|
||
// FirstExchnage is the time of the first data exchange. | ||
FirstExchange *time.Time | ||
|
@@ -27,7 +24,21 @@ type Ledger struct { | |
|
||
// WantList is a (bounded, small) set of keys that Partner desires. | ||
WantList KeySet | ||
|
||
Strategy StrategyFunc | ||
} | ||
|
||
// LedgerMap lists Ledgers by their Partner key. | ||
type LedgerMap map[u.Key]*Ledger | ||
|
||
func (l *Ledger) ShouldSend() bool { | ||
return l.Strategy(l.Accounting) | ||
} | ||
|
||
func (l *Ledger) SentBytes(n uint64) { | ||
l.Accounting.BytesSent += n | ||
} | ||
|
||
func (l *Ledger) ReceivedBytes(n uint64) { | ||
l.Accounting.BytesRecv += n | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should these functions commit to the datastore? what happens if there's a crash right after? our ledgers are unsynced. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. (or maybe commit outside of them, when called, since ledger doesn't have a datastore ptr) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, im not sure how to store the ledger in the datastore yet. |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,32 @@ | ||
package bitswap | ||
|
||
import ( | ||
"code.google.com/p/goprotobuf/proto" | ||
u "github.com/jbenet/go-ipfs/util" | ||
) | ||
|
||
type Message struct { | ||
Type PBMessage_MessageType | ||
ID uint64 | ||
Response bool | ||
Key u.Key | ||
Value []byte | ||
Success bool | ||
} | ||
|
||
func (m *Message) ToProtobuf() *PBMessage { | ||
pmes := new(PBMessage) | ||
pmes.Id = &m.ID | ||
pmes.Type = &m.Type | ||
if m.Response { | ||
pmes.Response = proto.Bool(true) | ||
} | ||
|
||
if m.Success { | ||
pmes.Success = proto.Bool(true) | ||
} | ||
|
||
pmes.Key = proto.String(string(m.Key)) | ||
pmes.Value = m.Value | ||
return pmes | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should
BitSwap.GetBlock
check datastore? or should the block service? not sure.Also, this will be determined once we see how fast the DHT is in practice, but may want to just broadcast the new wanted key to existing
bs.partners
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
right now, the block service is accessing the datastore, so checking that in the bitswap would be rather... redundant and time wasteful.
As for broadcasting the new key, right now, GetBlock sends a request out, and then waits for the other peers to respond with the block. We could change this to just broadcasting the want list, and waiting for the other peers to decide to send those blocks back. This is something we need to discuss before we go much further.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I'd mean do it here instead. shrug
It really depends on how big the wantlist is. if it spans many packets, then makes sense to send a condensed version. yeah let's discuss