From b7216e31b6678d798b4ace15a307fbaf8a9879c0 Mon Sep 17 00:00:00 2001 From: acud <12988138+acud@users.noreply.github.com> Date: Wed, 19 Feb 2020 18:49:41 +0800 Subject: [PATCH] retrieval: fix memory leak (#2103) * retrieval: call method to expire retrieval after timeout was triggered * retrieval: add TestNoSuitablePeer test * retrieval: address pr comments * retrieval: fix test --- network/kademlia.go | 2 +- network/retrieval/peer.go | 7 ++++ network/retrieval/retrieve.go | 17 +++++--- network/retrieval/retrieve_test.go | 65 ++++++++++++++++++++++++++++-- storage/feed/testutil.go | 8 ++-- storage/netstore.go | 5 ++- 6 files changed, 87 insertions(+), 17 deletions(-) diff --git a/network/kademlia.go b/network/kademlia.go index 09953c1081..732db21244 100644 --- a/network/kademlia.go +++ b/network/kademlia.go @@ -947,7 +947,7 @@ func (k *Kademlia) kademliaInfo() (ki KademliaInfo) { row := []string{} bin.ValIterator(func(val pot.Val) bool { - e := val.(*Peer) + e := val.(*entry) row = append(row, hex.EncodeToString(e.Address())) return true }) diff --git a/network/retrieval/peer.go b/network/retrieval/peer.go index 2148ad9189..ab6abbfaed 100644 --- a/network/retrieval/peer.go +++ b/network/retrieval/peer.go @@ -53,6 +53,13 @@ func (p *Peer) addRetrieval(ruid uint, addr storage.Address) { p.retrievals[ruid] = addr } +func (p *Peer) expireRetrieval(ruid uint) { + p.mtx.Lock() + defer p.mtx.Unlock() + + delete(p.retrievals, ruid) +} + // chunkReceived is called upon ChunkDelivery message reception // it is meant to idenfify unsolicited chunk deliveries func (p *Peer) checkRequest(ruid uint, addr storage.Address) error { diff --git a/network/retrieval/retrieve.go b/network/retrieval/retrieve.go index 303de33f66..82e6363376 100644 --- a/network/retrieval/retrieve.go +++ b/network/retrieval/retrieve.go @@ -383,8 +383,9 @@ func (r *Retrieval) handleChunkDelivery(ctx context.Context, p *Peer, msg *Chunk return nil } -// RequestFromPeers sends a chunk retrieve request to the next found peer -func (r *Retrieval) RequestFromPeers(ctx context.Context, req *storage.Request, localID enode.ID) (*enode.ID, error) { +// RequestFromPeers sends a chunk retrieve request to the next found peer. +// returns the next peer to try, a cleanup function to expire retrievals that were never delivered +func (r *Retrieval) RequestFromPeers(ctx context.Context, req *storage.Request, localID enode.ID) (*enode.ID, func(), error) { r.logger.Debug("retrieval.requestFromPeers", "req.Addr", req.Addr, "localID", localID) metrics.GetOrRegisterCounter("network.retrieve.request_from_peers", nil).Inc(1) @@ -395,7 +396,7 @@ FINDPEER: sp, err := r.findPeerLB(ctx, req) if err != nil { r.logger.Trace(err.Error()) - return nil, err + return nil, func() {}, err } protoPeer := r.getPeer(sp.ID()) @@ -405,7 +406,7 @@ FINDPEER: retries++ if retries == maxFindPeerRetries { r.logger.Error("max find peer retries reached", "max retries", maxFindPeerRetries, "ref", req.Addr) - return nil, ErrNoPeerFound + return nil, func() {}, ErrNoPeerFound } goto FINDPEER @@ -417,14 +418,18 @@ FINDPEER: } protoPeer.logger.Trace("sending retrieve request", "ref", ret.Addr, "origin", localID, "ruid", ret.Ruid) protoPeer.addRetrieval(ret.Ruid, ret.Addr) + cleanup := func() { + protoPeer.expireRetrieval(ret.Ruid) + } err = protoPeer.Send(ctx, ret) if err != nil { protoPeer.logger.Error("error sending retrieve request to peer", "ruid", ret.Ruid, "err", err) - return nil, err + cleanup() + return nil, func() {}, err } spID := protoPeer.ID() - return &spID, nil + return &spID, cleanup, nil } func (r *Retrieval) Start(server *p2p.Server) error { diff --git a/network/retrieval/retrieve_test.go b/network/retrieval/retrieve_test.go index 6be1b6be52..001b38d3f4 100644 --- a/network/retrieval/retrieve_test.go +++ b/network/retrieval/retrieve_test.go @@ -41,6 +41,7 @@ import ( "github.com/ethereum/go-ethereum/p2p/enr" "github.com/ethereum/go-ethereum/p2p/simulations/adapters" "github.com/ethersphere/swarm/chunk" + chunktesting "github.com/ethersphere/swarm/chunk/testing" "github.com/ethersphere/swarm/network" "github.com/ethersphere/swarm/network/simulation" "github.com/ethersphere/swarm/p2p/protocols" @@ -137,6 +138,62 @@ func TestChunkDelivery(t *testing.T) { } } +// TestNoSuitablePeer brings up two nodes, tries to retrieve a chunk which is never +// found, expecting a NoSuitablePeer error from netstore +func TestNoSuitablePeer(t *testing.T) { + nodes := 2 + + sim := simulation.NewBzzInProc(map[string]simulation.ServiceFunc{ + "bzz-retrieve": newBzzRetrieveWithLocalstore, + }, true) + defer sim.Close() + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + _, err := sim.AddNodesAndConnectFull(nodes) + if err != nil { + t.Fatal(err) + } + + result := sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) error { + nodeIDs := sim.UpNodeIDs() + if len(nodeIDs) != nodes { + t.Fatal("not enough nodes up") + } + // allow the two nodes time to set up the protocols otherwise kademlias will be empty when retrieve requests happen + i := 0 + for iterate := true; iterate; { + kinfo := sim.MustNodeItem(nodeIDs[1], simulation.BucketKeyKademlia).(*network.Kademlia).KademliaInfo() + if kinfo.TotalConnections != 1 { + i++ + } else { + break + } + time.Sleep(50 * time.Millisecond) + if i == 5 { + t.Fatal("timed out waiting for 1 connections") + } + } + + log.Debug("fetching through node", "enode", nodeIDs[1]) + ns := sim.MustNodeItem(nodeIDs[1], bucketKeyNetstore).(*storage.NetStore) + c := chunktesting.GenerateTestRandomChunk() + + ref := c.Address() + _, err := ns.Get(context.Background(), chunk.ModeGetRequest, storage.NewRequest(ref)) + if err == nil { + return errors.New("expected netstore retrieval error but got none") + } + if err != storage.ErrNoSuitablePeer { + return fmt.Errorf("expected ErrNoSuitablePeer but got %v instead", err) + } + return nil + }) + if result.Error != nil { + t.Fatal(result.Error) + } +} + // TestUnsolicitedChunkDelivery tests that a node is dropped in response to an unsolicited chunk delivery // this case covers a chunk Ruid that was not previously known to the downstream peer func TestUnsolicitedChunkDelivery(t *testing.T) { @@ -193,8 +250,8 @@ func TestUnsolicitedChunkDeliveryFaultyAddr(t *testing.T) { t.Fatal(err) } defer teardown() - ns.RemoteGet = func(ctx context.Context, req *storage.Request, localID enode.ID) (*enode.ID, error) { - return &enode.ID{}, nil + ns.RemoteGet = func(ctx context.Context, req *storage.Request, localID enode.ID) (*enode.ID, func(), error) { + return &enode.ID{}, func() {}, nil } node := tester.Nodes[0] @@ -267,8 +324,8 @@ func TestUnsolicitedChunkDeliveryDouble(t *testing.T) { t.Fatal(err) } defer teardown() - ns.RemoteGet = func(ctx context.Context, req *storage.Request, localID enode.ID) (*enode.ID, error) { - return &enode.ID{}, nil + ns.RemoteGet = func(ctx context.Context, req *storage.Request, localID enode.ID) (*enode.ID, func(), error) { + return &enode.ID{}, func() {}, nil } node := tester.Nodes[0] diff --git a/storage/feed/testutil.go b/storage/feed/testutil.go index 0bff595382..9a10281e4c 100644 --- a/storage/feed/testutil.go +++ b/storage/feed/testutil.go @@ -53,8 +53,8 @@ func NewTestHandler(datadir string, params *HandlerParams) (*TestHandler, error) localStore := chunk.NewValidatorStore(db, storage.NewContentAddressValidator(storage.MakeHashFunc(feedsHashAlgorithm)), fh) netStore := storage.NewNetStore(localStore, network.NewBzzAddr(make([]byte, 32), nil)) - netStore.RemoteGet = func(ctx context.Context, req *storage.Request, localID enode.ID) (*enode.ID, error) { - return nil, errors.New("not found") + netStore.RemoteGet = func(ctx context.Context, req *storage.Request, localID enode.ID) (*enode.ID, func(), error) { + return nil, func() {}, errors.New("not found") } fh.SetStore(netStore) return &TestHandler{fh}, nil @@ -69,8 +69,8 @@ func newTestHandlerWithStore(fh *Handler, datadir string, db chunk.Store, params localStore := chunk.NewValidatorStore(db, storage.NewContentAddressValidator(storage.MakeHashFunc(feedsHashAlgorithm)), fh) netStore := storage.NewNetStore(localStore, network.NewBzzAddr(make([]byte, 32), nil)) - netStore.RemoteGet = func(ctx context.Context, req *storage.Request, localID enode.ID) (*enode.ID, error) { - return nil, errors.New("not found") + netStore.RemoteGet = func(ctx context.Context, req *storage.Request, localID enode.ID) (*enode.ID, func(), error) { + return nil, func() {}, errors.New("not found") } fh.SetStore(netStore) return &TestHandler{fh}, nil diff --git a/storage/netstore.go b/storage/netstore.go index fb98e05c67..69c8b417c0 100644 --- a/storage/netstore.go +++ b/storage/netstore.go @@ -86,7 +86,7 @@ func (fi *Fetcher) SafeClose(ch chunk.Chunk) { }) } -type RemoteGetFunc func(ctx context.Context, req *Request, localID enode.ID) (*enode.ID, error) +type RemoteGetFunc func(ctx context.Context, req *Request, localID enode.ID) (*enode.ID, func(), error) // NetStore is an extension of LocalStore // it implements the ChunkStore interface @@ -247,13 +247,14 @@ func (n *NetStore) RemoteFetch(ctx context.Context, req *Request, fi *Fetcher) ( log.Trace("remote.fetch", "ref", ref) - currentPeer, err := n.RemoteGet(ctx, req, n.LocalID) + currentPeer, cleanup, err := n.RemoteGet(ctx, req, n.LocalID) if err != nil { n.logger.Trace(err.Error(), "ref", ref) osp.LogFields(olog.String("err", err.Error())) osp.Finish() return nil, ErrNoSuitablePeer } + defer cleanup() // add peer to the set of peers to skip from now n.logger.Trace("remote.fetch, adding peer to skip", "ref", ref, "peer", currentPeer.String())