Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(bitswap/message) duplicate entries #216

Merged
merged 9 commits into from
Oct 28, 2014
11 changes: 5 additions & 6 deletions exchange/bitswap/bitswap.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,8 @@ func (bs *bitswap) Block(parent context.Context, k u.Key) (*blocks.Block, error)
go func() {
message := bsmsg.New()
for _, wanted := range bs.wantlist.Keys() {
message.AppendWanted(wanted)
message.AddWanted(wanted)
}
message.AppendWanted(k)
for peerToQuery := range peersToQuery {
log.Debugf("bitswap got peersToQuery: %s", peerToQuery)
go func(p peer.Peer) {
Expand Down Expand Up @@ -168,7 +167,7 @@ func (bs *bitswap) ReceiveMessage(ctx context.Context, p peer.Peer, incoming bsm

message := bsmsg.New()
for _, wanted := range bs.wantlist.Keys() {
message.AppendWanted(wanted)
message.AddWanted(wanted)
}
for _, key := range incoming.Wantlist() {
// TODO: might be better to check if we have the block before checking
Expand All @@ -177,7 +176,7 @@ func (bs *bitswap) ReceiveMessage(ctx context.Context, p peer.Peer, incoming bsm
if block, errBlockNotFound := bs.blockstore.Get(key); errBlockNotFound != nil {
continue
} else {
message.AppendBlock(*block)
message.AddBlock(*block)
}
}
}
Expand Down Expand Up @@ -207,9 +206,9 @@ func (bs *bitswap) sendToPeersThatWant(ctx context.Context, block blocks.Block)
log.Debugf("%v wants %v", p, block.Key())
if bs.strategy.ShouldSendBlockToPeer(block.Key(), p) {
message := bsmsg.New()
message.AppendBlock(block)
message.AddBlock(block)
for _, wanted := range bs.wantlist.Keys() {
message.AppendWanted(wanted)
message.AddWanted(wanted)
}
go bs.send(ctx, p, message)
}
Expand Down
66 changes: 46 additions & 20 deletions exchange/bitswap/message/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,25 @@ import (
// TODO move bs/msg/internal/pb to bs/internal/pb and rename pb package to bitswap_pb

type BitSwapMessage interface {
// Wantlist returns a slice of unique keys that represent data wanted by
// the sender.
Wantlist() []u.Key

// Blocks returns a slice of unique blocks
Blocks() []blocks.Block
AppendWanted(k u.Key)
AppendBlock(b blocks.Block)

// AddWanted adds the key to the Wantlist.
//
// Insertion order determines priority. That is, earlier insertions are
// deemed higher priority than keys inserted later.
//
// t = 0, msg.AddWanted(A)
// t = 1, msg.AddWanted(B)
//
// implies Priority(A) > Priority(B)
AddWanted(u.Key)

AddBlock(blocks.Block)
Exportable
}

Expand All @@ -26,44 +41,55 @@ type Exportable interface {
ToNet(p peer.Peer) (nm.NetMessage, error)
}

// message wraps a proto message for convenience
type message struct {
wantlist []u.Key
blocks []blocks.Block
type impl struct {
existsInWantlist map[u.Key]struct{} // map to detect duplicates
wantlist []u.Key // slice to preserve ordering
blocks map[u.Key]blocks.Block // map to detect duplicates
}

func New() *message {
return new(message)
func New() BitSwapMessage {
return &impl{
blocks: make(map[u.Key]blocks.Block),
existsInWantlist: make(map[u.Key]struct{}),
wantlist: make([]u.Key, 0),
}
}

func newMessageFromProto(pbm pb.Message) BitSwapMessage {
m := New()
for _, s := range pbm.GetWantlist() {
m.AppendWanted(u.Key(s))
m.AddWanted(u.Key(s))
}
for _, d := range pbm.GetBlocks() {
b := blocks.NewBlock(d)
m.AppendBlock(*b)
m.AddBlock(*b)
}
return m
}

// TODO(brian): convert these into keys
func (m *message) Wantlist() []u.Key {
func (m *impl) Wantlist() []u.Key {
return m.wantlist
}

// TODO(brian): convert these into blocks
func (m *message) Blocks() []blocks.Block {
return m.blocks
func (m *impl) Blocks() []blocks.Block {
bs := make([]blocks.Block, 0)
for _, block := range m.blocks {
bs = append(bs, block)
}
return bs
}

func (m *message) AppendWanted(k u.Key) {
func (m *impl) AddWanted(k u.Key) {
_, exists := m.existsInWantlist[k]
if exists {
return
}
m.existsInWantlist[k] = struct{}{}
m.wantlist = append(m.wantlist, k)
}

func (m *message) AppendBlock(b blocks.Block) {
m.blocks = append(m.blocks, b)
func (m *impl) AddBlock(b blocks.Block) {
m.blocks[b.Key()] = b
}

func FromNet(nmsg netmsg.NetMessage) (BitSwapMessage, error) {
Expand All @@ -75,7 +101,7 @@ func FromNet(nmsg netmsg.NetMessage) (BitSwapMessage, error) {
return m, nil
}

func (m *message) ToProto() *pb.Message {
func (m *impl) ToProto() *pb.Message {
pb := new(pb.Message)
for _, k := range m.Wantlist() {
pb.Wantlist = append(pb.Wantlist, string(k))
Expand All @@ -86,6 +112,6 @@ func (m *message) ToProto() *pb.Message {
return pb
}

func (m *message) ToNet(p peer.Peer) (nm.NetMessage, error) {
func (m *impl) ToNet(p peer.Peer) (nm.NetMessage, error) {
return nm.FromObject(p, m.ToProto())
}
43 changes: 30 additions & 13 deletions exchange/bitswap/message/message_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
func TestAppendWanted(t *testing.T) {
const str = "foo"
m := New()
m.AppendWanted(u.Key(str))
m.AddWanted(u.Key(str))

if !contains(m.ToProto().GetWantlist(), str) {
t.Fail()
Expand Down Expand Up @@ -42,7 +42,7 @@ func TestAppendBlock(t *testing.T) {
m := New()
for _, str := range strs {
block := blocks.NewBlock([]byte(str))
m.AppendBlock(*block)
m.AddBlock(*block)
}

// assert strings are in proto message
Expand All @@ -58,7 +58,7 @@ func TestWantlist(t *testing.T) {
keystrs := []string{"foo", "bar", "baz", "bat"}
m := New()
for _, s := range keystrs {
m.AppendWanted(u.Key(s))
m.AddWanted(u.Key(s))
}
exported := m.Wantlist()

Expand All @@ -81,7 +81,7 @@ func TestCopyProtoByValue(t *testing.T) {
const str = "foo"
m := New()
protoBeforeAppend := m.ToProto()
m.AppendWanted(u.Key(str))
m.AddWanted(u.Key(str))
if contains(protoBeforeAppend.GetWantlist(), str) {
t.Fail()
}
Expand All @@ -101,11 +101,11 @@ func TestToNetMethodSetsPeer(t *testing.T) {

func TestToNetFromNetPreservesWantList(t *testing.T) {
original := New()
original.AppendWanted(u.Key("M"))
original.AppendWanted(u.Key("B"))
original.AppendWanted(u.Key("D"))
original.AppendWanted(u.Key("T"))
original.AppendWanted(u.Key("F"))
original.AddWanted(u.Key("M"))
original.AddWanted(u.Key("B"))
original.AddWanted(u.Key("D"))
original.AddWanted(u.Key("T"))
original.AddWanted(u.Key("F"))

p := peer.WithIDString("X")
netmsg, err := original.ToNet(p)
Expand Down Expand Up @@ -133,10 +133,10 @@ func TestToNetFromNetPreservesWantList(t *testing.T) {
func TestToAndFromNetMessage(t *testing.T) {

original := New()
original.AppendBlock(*blocks.NewBlock([]byte("W")))
original.AppendBlock(*blocks.NewBlock([]byte("E")))
original.AppendBlock(*blocks.NewBlock([]byte("F")))
original.AppendBlock(*blocks.NewBlock([]byte("M")))
original.AddBlock(*blocks.NewBlock([]byte("W")))
original.AddBlock(*blocks.NewBlock([]byte("E")))
original.AddBlock(*blocks.NewBlock([]byte("F")))
original.AddBlock(*blocks.NewBlock([]byte("M")))

p := peer.WithIDString("X")
netmsg, err := original.ToNet(p)
Expand Down Expand Up @@ -169,3 +169,20 @@ func contains(s []string, x string) bool {
}
return false
}

func TestDuplicates(t *testing.T) {
b := blocks.NewBlock([]byte("foo"))
msg := New()

msg.AddWanted(b.Key())
msg.AddWanted(b.Key())
if len(msg.Wantlist()) != 1 {
t.Fatal("Duplicate in BitSwapMessage")
}

msg.AddBlock(*b)
msg.AddBlock(*b)
if len(msg.Blocks()) != 1 {
t.Fatal("Duplicate in BitSwapMessage")
}
}
4 changes: 2 additions & 2 deletions exchange/bitswap/strategy/strategy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func TestConsistentAccounting(t *testing.T) {

m := message.New()
content := []string{"this", "is", "message", "i"}
m.AppendBlock(*blocks.NewBlock([]byte(strings.Join(content, " "))))
m.AddBlock(*blocks.NewBlock([]byte(strings.Join(content, " "))))

sender.MessageSent(receiver.Peer, m)
receiver.MessageReceived(sender.Peer, m)
Expand Down Expand Up @@ -60,7 +60,7 @@ func TestBlockRecordedAsWantedAfterMessageReceived(t *testing.T) {
block := blocks.NewBlock([]byte("data wanted by beggar"))

messageFromBeggarToChooser := message.New()
messageFromBeggarToChooser.AppendWanted(block.Key())
messageFromBeggarToChooser.AddWanted(block.Key())

chooser.MessageReceived(beggar.Peer, messageFromBeggarToChooser)
// for this test, doesn't matter if you record that beggar sent
Expand Down
8 changes: 4 additions & 4 deletions exchange/bitswap/testnet/network_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,15 @@ func TestSendRequestToCooperativePeer(t *testing.T) {
// TODO test contents of incoming message

m := bsmsg.New()
m.AppendBlock(*blocks.NewBlock([]byte(expectedStr)))
m.AddBlock(*blocks.NewBlock([]byte(expectedStr)))

return from, m
}))

t.Log("Build a message and send a synchronous request to recipient")

message := bsmsg.New()
message.AppendBlock(*blocks.NewBlock([]byte("data")))
message.AddBlock(*blocks.NewBlock([]byte("data")))
response, err := initiator.SendRequest(
context.Background(), peer.WithID(idOfRecipient), message)
if err != nil {
Expand Down Expand Up @@ -77,7 +77,7 @@ func TestSendMessageAsyncButWaitForResponse(t *testing.T) {
peer.Peer, bsmsg.BitSwapMessage) {

msgToWaiter := bsmsg.New()
msgToWaiter.AppendBlock(*blocks.NewBlock([]byte(expectedStr)))
msgToWaiter.AddBlock(*blocks.NewBlock([]byte(expectedStr)))

return fromWaiter, msgToWaiter
}))
Expand Down Expand Up @@ -105,7 +105,7 @@ func TestSendMessageAsyncButWaitForResponse(t *testing.T) {
}))

messageSentAsync := bsmsg.New()
messageSentAsync.AppendBlock(*blocks.NewBlock([]byte("data")))
messageSentAsync.AddBlock(*blocks.NewBlock([]byte("data")))
errSending := waiter.SendMessage(
context.Background(), peer.WithID(idOfResponder), messageSentAsync)
if errSending != nil {
Expand Down