Skip to content
This repository has been archived by the owner on Aug 2, 2021. It is now read-only.

Commit

Permalink
retrieval: fix memory leak (#2103)
Browse files Browse the repository at this point in the history
* retrieval: call method to expire retrieval after timeout was triggered

* retrieval: add TestNoSuitablePeer test

* retrieval: address pr comments

* retrieval: fix test
  • Loading branch information
acud authored Feb 19, 2020
1 parent 9c010bf commit b7216e3
Show file tree
Hide file tree
Showing 6 changed files with 87 additions and 17 deletions.
2 changes: 1 addition & 1 deletion network/kademlia.go
Original file line number Diff line number Diff line change
Expand Up @@ -947,7 +947,7 @@ func (k *Kademlia) kademliaInfo() (ki KademliaInfo) {

row := []string{}
bin.ValIterator(func(val pot.Val) bool {
e := val.(*Peer)
e := val.(*entry)
row = append(row, hex.EncodeToString(e.Address()))
return true
})
Expand Down
7 changes: 7 additions & 0 deletions network/retrieval/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,13 @@ func (p *Peer) addRetrieval(ruid uint, addr storage.Address) {
p.retrievals[ruid] = addr
}

func (p *Peer) expireRetrieval(ruid uint) {
p.mtx.Lock()
defer p.mtx.Unlock()

delete(p.retrievals, ruid)
}

// chunkReceived is called upon ChunkDelivery message reception
// it is meant to idenfify unsolicited chunk deliveries
func (p *Peer) checkRequest(ruid uint, addr storage.Address) error {
Expand Down
17 changes: 11 additions & 6 deletions network/retrieval/retrieve.go
Original file line number Diff line number Diff line change
Expand Up @@ -383,8 +383,9 @@ func (r *Retrieval) handleChunkDelivery(ctx context.Context, p *Peer, msg *Chunk
return nil
}

// RequestFromPeers sends a chunk retrieve request to the next found peer
func (r *Retrieval) RequestFromPeers(ctx context.Context, req *storage.Request, localID enode.ID) (*enode.ID, error) {
// RequestFromPeers sends a chunk retrieve request to the next found peer.
// returns the next peer to try, a cleanup function to expire retrievals that were never delivered
func (r *Retrieval) RequestFromPeers(ctx context.Context, req *storage.Request, localID enode.ID) (*enode.ID, func(), error) {
r.logger.Debug("retrieval.requestFromPeers", "req.Addr", req.Addr, "localID", localID)
metrics.GetOrRegisterCounter("network.retrieve.request_from_peers", nil).Inc(1)

Expand All @@ -395,7 +396,7 @@ FINDPEER:
sp, err := r.findPeerLB(ctx, req)
if err != nil {
r.logger.Trace(err.Error())
return nil, err
return nil, func() {}, err
}

protoPeer := r.getPeer(sp.ID())
Expand All @@ -405,7 +406,7 @@ FINDPEER:
retries++
if retries == maxFindPeerRetries {
r.logger.Error("max find peer retries reached", "max retries", maxFindPeerRetries, "ref", req.Addr)
return nil, ErrNoPeerFound
return nil, func() {}, ErrNoPeerFound
}

goto FINDPEER
Expand All @@ -417,14 +418,18 @@ FINDPEER:
}
protoPeer.logger.Trace("sending retrieve request", "ref", ret.Addr, "origin", localID, "ruid", ret.Ruid)
protoPeer.addRetrieval(ret.Ruid, ret.Addr)
cleanup := func() {
protoPeer.expireRetrieval(ret.Ruid)
}
err = protoPeer.Send(ctx, ret)
if err != nil {
protoPeer.logger.Error("error sending retrieve request to peer", "ruid", ret.Ruid, "err", err)
return nil, err
cleanup()
return nil, func() {}, err
}

spID := protoPeer.ID()
return &spID, nil
return &spID, cleanup, nil
}

func (r *Retrieval) Start(server *p2p.Server) error {
Expand Down
65 changes: 61 additions & 4 deletions network/retrieval/retrieve_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
"github.com/ethereum/go-ethereum/p2p/enr"
"github.com/ethereum/go-ethereum/p2p/simulations/adapters"
"github.com/ethersphere/swarm/chunk"
chunktesting "github.com/ethersphere/swarm/chunk/testing"
"github.com/ethersphere/swarm/network"
"github.com/ethersphere/swarm/network/simulation"
"github.com/ethersphere/swarm/p2p/protocols"
Expand Down Expand Up @@ -137,6 +138,62 @@ func TestChunkDelivery(t *testing.T) {
}
}

// TestNoSuitablePeer brings up two nodes, tries to retrieve a chunk which is never
// found, expecting a NoSuitablePeer error from netstore
func TestNoSuitablePeer(t *testing.T) {
nodes := 2

sim := simulation.NewBzzInProc(map[string]simulation.ServiceFunc{
"bzz-retrieve": newBzzRetrieveWithLocalstore,
}, true)
defer sim.Close()

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
_, err := sim.AddNodesAndConnectFull(nodes)
if err != nil {
t.Fatal(err)
}

result := sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) error {
nodeIDs := sim.UpNodeIDs()
if len(nodeIDs) != nodes {
t.Fatal("not enough nodes up")
}
// allow the two nodes time to set up the protocols otherwise kademlias will be empty when retrieve requests happen
i := 0
for iterate := true; iterate; {
kinfo := sim.MustNodeItem(nodeIDs[1], simulation.BucketKeyKademlia).(*network.Kademlia).KademliaInfo()
if kinfo.TotalConnections != 1 {
i++
} else {
break
}
time.Sleep(50 * time.Millisecond)
if i == 5 {
t.Fatal("timed out waiting for 1 connections")
}
}

log.Debug("fetching through node", "enode", nodeIDs[1])
ns := sim.MustNodeItem(nodeIDs[1], bucketKeyNetstore).(*storage.NetStore)
c := chunktesting.GenerateTestRandomChunk()

ref := c.Address()
_, err := ns.Get(context.Background(), chunk.ModeGetRequest, storage.NewRequest(ref))
if err == nil {
return errors.New("expected netstore retrieval error but got none")
}
if err != storage.ErrNoSuitablePeer {
return fmt.Errorf("expected ErrNoSuitablePeer but got %v instead", err)
}
return nil
})
if result.Error != nil {
t.Fatal(result.Error)
}
}

// TestUnsolicitedChunkDelivery tests that a node is dropped in response to an unsolicited chunk delivery
// this case covers a chunk Ruid that was not previously known to the downstream peer
func TestUnsolicitedChunkDelivery(t *testing.T) {
Expand Down Expand Up @@ -193,8 +250,8 @@ func TestUnsolicitedChunkDeliveryFaultyAddr(t *testing.T) {
t.Fatal(err)
}
defer teardown()
ns.RemoteGet = func(ctx context.Context, req *storage.Request, localID enode.ID) (*enode.ID, error) {
return &enode.ID{}, nil
ns.RemoteGet = func(ctx context.Context, req *storage.Request, localID enode.ID) (*enode.ID, func(), error) {
return &enode.ID{}, func() {}, nil
}
node := tester.Nodes[0]

Expand Down Expand Up @@ -267,8 +324,8 @@ func TestUnsolicitedChunkDeliveryDouble(t *testing.T) {
t.Fatal(err)
}
defer teardown()
ns.RemoteGet = func(ctx context.Context, req *storage.Request, localID enode.ID) (*enode.ID, error) {
return &enode.ID{}, nil
ns.RemoteGet = func(ctx context.Context, req *storage.Request, localID enode.ID) (*enode.ID, func(), error) {
return &enode.ID{}, func() {}, nil
}
node := tester.Nodes[0]

Expand Down
8 changes: 4 additions & 4 deletions storage/feed/testutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,8 @@ func NewTestHandler(datadir string, params *HandlerParams) (*TestHandler, error)
localStore := chunk.NewValidatorStore(db, storage.NewContentAddressValidator(storage.MakeHashFunc(feedsHashAlgorithm)), fh)

netStore := storage.NewNetStore(localStore, network.NewBzzAddr(make([]byte, 32), nil))
netStore.RemoteGet = func(ctx context.Context, req *storage.Request, localID enode.ID) (*enode.ID, error) {
return nil, errors.New("not found")
netStore.RemoteGet = func(ctx context.Context, req *storage.Request, localID enode.ID) (*enode.ID, func(), error) {
return nil, func() {}, errors.New("not found")
}
fh.SetStore(netStore)
return &TestHandler{fh}, nil
Expand All @@ -69,8 +69,8 @@ func newTestHandlerWithStore(fh *Handler, datadir string, db chunk.Store, params
localStore := chunk.NewValidatorStore(db, storage.NewContentAddressValidator(storage.MakeHashFunc(feedsHashAlgorithm)), fh)

netStore := storage.NewNetStore(localStore, network.NewBzzAddr(make([]byte, 32), nil))
netStore.RemoteGet = func(ctx context.Context, req *storage.Request, localID enode.ID) (*enode.ID, error) {
return nil, errors.New("not found")
netStore.RemoteGet = func(ctx context.Context, req *storage.Request, localID enode.ID) (*enode.ID, func(), error) {
return nil, func() {}, errors.New("not found")
}
fh.SetStore(netStore)
return &TestHandler{fh}, nil
Expand Down
5 changes: 3 additions & 2 deletions storage/netstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func (fi *Fetcher) SafeClose(ch chunk.Chunk) {
})
}

type RemoteGetFunc func(ctx context.Context, req *Request, localID enode.ID) (*enode.ID, error)
type RemoteGetFunc func(ctx context.Context, req *Request, localID enode.ID) (*enode.ID, func(), error)

// NetStore is an extension of LocalStore
// it implements the ChunkStore interface
Expand Down Expand Up @@ -247,13 +247,14 @@ func (n *NetStore) RemoteFetch(ctx context.Context, req *Request, fi *Fetcher) (

log.Trace("remote.fetch", "ref", ref)

currentPeer, err := n.RemoteGet(ctx, req, n.LocalID)
currentPeer, cleanup, err := n.RemoteGet(ctx, req, n.LocalID)
if err != nil {
n.logger.Trace(err.Error(), "ref", ref)
osp.LogFields(olog.String("err", err.Error()))
osp.Finish()
return nil, ErrNoSuitablePeer
}
defer cleanup()

// add peer to the set of peers to skip from now
n.logger.Trace("remote.fetch, adding peer to skip", "ref", ref, "peer", currentPeer.String())
Expand Down

0 comments on commit b7216e3

Please sign in to comment.