Skip to content

Commit 0c10d37

Browse files
nolashnonsense
authored andcommitted
swarm/network, swarm/storage: Preserve opentracing contexts (ethereum#19022)
1 parent 0436412 commit 0c10d37

File tree

10 files changed

+122
-107
lines changed

10 files changed

+122
-107
lines changed

swarm/network/fetcher.go

Lines changed: 19 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ type Fetcher struct {
5252
requestC chan uint8 // channel for incoming requests (with the hopCount value in it)
5353
searchTimeout time.Duration
5454
skipCheck bool
55+
ctx context.Context
5556
}
5657

5758
type Request struct {
@@ -109,29 +110,30 @@ func NewFetcherFactory(request RequestFunc, skipCheck bool) *FetcherFactory {
109110
// contain the peers which are actively requesting this chunk, to make sure we
110111
// don't request back the chunks from them.
111112
// The created Fetcher is started and returned.
112-
func (f *FetcherFactory) New(ctx context.Context, source storage.Address, peersToSkip *sync.Map) storage.NetFetcher {
113-
fetcher := NewFetcher(source, f.request, f.skipCheck)
114-
go fetcher.run(ctx, peersToSkip)
113+
func (f *FetcherFactory) New(ctx context.Context, source storage.Address, peers *sync.Map) storage.NetFetcher {
114+
fetcher := NewFetcher(ctx, source, f.request, f.skipCheck)
115+
go fetcher.run(peers)
115116
return fetcher
116117
}
117118

118119
// NewFetcher creates a new Fetcher for the given chunk address using the given request function.
119-
func NewFetcher(addr storage.Address, rf RequestFunc, skipCheck bool) *Fetcher {
120+
func NewFetcher(ctx context.Context, addr storage.Address, rf RequestFunc, skipCheck bool) *Fetcher {
120121
return &Fetcher{
121122
addr: addr,
122123
protoRequestFunc: rf,
123124
offerC: make(chan *enode.ID),
124125
requestC: make(chan uint8),
125126
searchTimeout: defaultSearchTimeout,
126127
skipCheck: skipCheck,
128+
ctx: ctx,
127129
}
128130
}
129131

130132
// Offer is called when an upstream peer offers the chunk via syncing as part of `OfferedHashesMsg` and the node does not have the chunk locally.
131-
func (f *Fetcher) Offer(ctx context.Context, source *enode.ID) {
133+
func (f *Fetcher) Offer(source *enode.ID) {
132134
// First we need to have this select to make sure that we return if context is done
133135
select {
134-
case <-ctx.Done():
136+
case <-f.ctx.Done():
135137
return
136138
default:
137139
}
@@ -140,15 +142,15 @@ func (f *Fetcher) Offer(ctx context.Context, source *enode.ID) {
140142
// push to offerC instead if offerC is available (see number 2 in https://golang.org/ref/spec#Select_statements)
141143
select {
142144
case f.offerC <- source:
143-
case <-ctx.Done():
145+
case <-f.ctx.Done():
144146
}
145147
}
146148

147149
// Request is called when an upstream peer request the chunk as part of `RetrieveRequestMsg`, or from a local request through FileStore, and the node does not have the chunk locally.
148-
func (f *Fetcher) Request(ctx context.Context, hopCount uint8) {
150+
func (f *Fetcher) Request(hopCount uint8) {
149151
// First we need to have this select to make sure that we return if context is done
150152
select {
151-
case <-ctx.Done():
153+
case <-f.ctx.Done():
152154
return
153155
default:
154156
}
@@ -162,13 +164,13 @@ func (f *Fetcher) Request(ctx context.Context, hopCount uint8) {
162164
// push to offerC instead if offerC is available (see number 2 in https://golang.org/ref/spec#Select_statements)
163165
select {
164166
case f.requestC <- hopCount + 1:
165-
case <-ctx.Done():
167+
case <-f.ctx.Done():
166168
}
167169
}
168170

169171
// start prepares the Fetcher
170172
// it keeps the Fetcher alive within the lifecycle of the passed context
171-
func (f *Fetcher) run(ctx context.Context, peers *sync.Map) {
173+
func (f *Fetcher) run(peers *sync.Map) {
172174
var (
173175
doRequest bool // determines if retrieval is initiated in the current iteration
174176
wait *time.Timer // timer for search timeout
@@ -219,7 +221,7 @@ func (f *Fetcher) run(ctx context.Context, peers *sync.Map) {
219221
doRequest = requested
220222

221223
// all Fetcher context closed, can quit
222-
case <-ctx.Done():
224+
case <-f.ctx.Done():
223225
log.Trace("terminate fetcher", "request addr", f.addr)
224226
// TODO: send cancellations to all peers left over in peers map (i.e., those we requested from)
225227
return
@@ -228,7 +230,7 @@ func (f *Fetcher) run(ctx context.Context, peers *sync.Map) {
228230
// need to issue a new request
229231
if doRequest {
230232
var err error
231-
sources, err = f.doRequest(ctx, gone, peers, sources, hopCount)
233+
sources, err = f.doRequest(gone, peers, sources, hopCount)
232234
if err != nil {
233235
log.Info("unable to request", "request addr", f.addr, "err", err)
234236
}
@@ -266,7 +268,7 @@ func (f *Fetcher) run(ctx context.Context, peers *sync.Map) {
266268
// * the peer's address is added to the set of peers to skip
267269
// * the peer's address is removed from prospective sources, and
268270
// * a go routine is started that reports on the gone channel if the peer is disconnected (or terminated their streamer)
269-
func (f *Fetcher) doRequest(ctx context.Context, gone chan *enode.ID, peersToSkip *sync.Map, sources []*enode.ID, hopCount uint8) ([]*enode.ID, error) {
271+
func (f *Fetcher) doRequest(gone chan *enode.ID, peersToSkip *sync.Map, sources []*enode.ID, hopCount uint8) ([]*enode.ID, error) {
270272
var i int
271273
var sourceID *enode.ID
272274
var quit chan struct{}
@@ -283,7 +285,7 @@ func (f *Fetcher) doRequest(ctx context.Context, gone chan *enode.ID, peersToSki
283285
for i = 0; i < len(sources); i++ {
284286
req.Source = sources[i]
285287
var err error
286-
sourceID, quit, err = f.protoRequestFunc(ctx, req)
288+
sourceID, quit, err = f.protoRequestFunc(f.ctx, req)
287289
if err == nil {
288290
// remove the peer from known sources
289291
// Note: we can modify the source although we are looping on it, because we break from the loop immediately
@@ -297,7 +299,7 @@ func (f *Fetcher) doRequest(ctx context.Context, gone chan *enode.ID, peersToSki
297299
if !foundSource {
298300
req.Source = nil
299301
var err error
300-
sourceID, quit, err = f.protoRequestFunc(ctx, req)
302+
sourceID, quit, err = f.protoRequestFunc(f.ctx, req)
301303
if err != nil {
302304
// if no peers found to request from
303305
return sources, err
@@ -314,7 +316,7 @@ func (f *Fetcher) doRequest(ctx context.Context, gone chan *enode.ID, peersToSki
314316
select {
315317
case <-quit:
316318
gone <- sourceID
317-
case <-ctx.Done():
319+
case <-f.ctx.Done():
318320
}
319321
}()
320322
return sources, nil

swarm/network/fetcher_test.go

Lines changed: 54 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -69,21 +69,21 @@ func (m *mockRequester) doRequest(ctx context.Context, request *Request) (*enode
6969
func TestFetcherSingleRequest(t *testing.T) {
7070
requester := newMockRequester()
7171
addr := make([]byte, 32)
72-
fetcher := NewFetcher(addr, requester.doRequest, true)
72+
73+
ctx, cancel := context.WithCancel(context.Background())
74+
defer cancel()
75+
76+
fetcher := NewFetcher(ctx, addr, requester.doRequest, true)
7377

7478
peers := []string{"a", "b", "c", "d"}
7579
peersToSkip := &sync.Map{}
7680
for _, p := range peers {
7781
peersToSkip.Store(p, time.Now())
7882
}
7983

80-
ctx, cancel := context.WithCancel(context.Background())
81-
defer cancel()
82-
83-
go fetcher.run(ctx, peersToSkip)
84+
go fetcher.run(peersToSkip)
8485

85-
rctx := context.Background()
86-
fetcher.Request(rctx, 0)
86+
fetcher.Request(0)
8787

8888
select {
8989
case request := <-requester.requestC:
@@ -115,20 +115,19 @@ func TestFetcherSingleRequest(t *testing.T) {
115115
func TestFetcherCancelStopsFetcher(t *testing.T) {
116116
requester := newMockRequester()
117117
addr := make([]byte, 32)
118-
fetcher := NewFetcher(addr, requester.doRequest, true)
119-
120-
peersToSkip := &sync.Map{}
121118

122119
ctx, cancel := context.WithCancel(context.Background())
123120

121+
fetcher := NewFetcher(ctx, addr, requester.doRequest, true)
122+
123+
peersToSkip := &sync.Map{}
124+
124125
// we start the fetcher, and then we immediately cancel the context
125-
go fetcher.run(ctx, peersToSkip)
126+
go fetcher.run(peersToSkip)
126127
cancel()
127128

128-
rctx, rcancel := context.WithTimeout(ctx, 100*time.Millisecond)
129-
defer rcancel()
130129
// we call Request with an active context
131-
fetcher.Request(rctx, 0)
130+
fetcher.Request(0)
132131

133132
// fetcher should not initiate request, we can only check by waiting a bit and making sure no request is happening
134133
select {
@@ -140,23 +139,23 @@ func TestFetcherCancelStopsFetcher(t *testing.T) {
140139

141140
// TestFetchCancelStopsRequest tests that calling a Request function with a cancelled context does not initiate a request
142141
func TestFetcherCancelStopsRequest(t *testing.T) {
142+
t.Skip("since context is now per fetcher, this test is likely redundant")
143+
143144
requester := newMockRequester(100 * time.Millisecond)
144145
addr := make([]byte, 32)
145-
fetcher := NewFetcher(addr, requester.doRequest, true)
146-
147-
peersToSkip := &sync.Map{}
148146

149147
ctx, cancel := context.WithCancel(context.Background())
150148
defer cancel()
151149

152-
// we start the fetcher with an active context
153-
go fetcher.run(ctx, peersToSkip)
150+
fetcher := NewFetcher(ctx, addr, requester.doRequest, true)
154151

155-
rctx, rcancel := context.WithCancel(context.Background())
156-
rcancel()
152+
peersToSkip := &sync.Map{}
153+
154+
// we start the fetcher with an active context
155+
go fetcher.run(peersToSkip)
157156

158157
// we call Request with a cancelled context
159-
fetcher.Request(rctx, 0)
158+
fetcher.Request(0)
160159

161160
// fetcher should not initiate request, we can only check by waiting a bit and making sure no request is happening
162161
select {
@@ -166,8 +165,7 @@ func TestFetcherCancelStopsRequest(t *testing.T) {
166165
}
167166

168167
// if there is another Request with active context, there should be a request, because the fetcher itself is not cancelled
169-
rctx = context.Background()
170-
fetcher.Request(rctx, 0)
168+
fetcher.Request(0)
171169

172170
select {
173171
case <-requester.requestC:
@@ -182,19 +180,19 @@ func TestFetcherCancelStopsRequest(t *testing.T) {
182180
func TestFetcherOfferUsesSource(t *testing.T) {
183181
requester := newMockRequester(100 * time.Millisecond)
184182
addr := make([]byte, 32)
185-
fetcher := NewFetcher(addr, requester.doRequest, true)
186-
187-
peersToSkip := &sync.Map{}
188183

189184
ctx, cancel := context.WithCancel(context.Background())
190185
defer cancel()
191186

187+
fetcher := NewFetcher(ctx, addr, requester.doRequest, true)
188+
189+
peersToSkip := &sync.Map{}
190+
192191
// start the fetcher
193-
go fetcher.run(ctx, peersToSkip)
192+
go fetcher.run(peersToSkip)
194193

195-
rctx := context.Background()
196194
// call the Offer function with the source peer
197-
fetcher.Offer(rctx, &sourcePeerID)
195+
fetcher.Offer(&sourcePeerID)
198196

199197
// fetcher should not initiate request
200198
select {
@@ -204,8 +202,7 @@ func TestFetcherOfferUsesSource(t *testing.T) {
204202
}
205203

206204
// call Request after the Offer
207-
rctx = context.Background()
208-
fetcher.Request(rctx, 0)
205+
fetcher.Request(0)
209206

210207
// there should be exactly 1 request coming from fetcher
211208
var request *Request
@@ -234,19 +231,19 @@ func TestFetcherOfferUsesSource(t *testing.T) {
234231
func TestFetcherOfferAfterRequestUsesSourceFromContext(t *testing.T) {
235232
requester := newMockRequester(100 * time.Millisecond)
236233
addr := make([]byte, 32)
237-
fetcher := NewFetcher(addr, requester.doRequest, true)
238-
239-
peersToSkip := &sync.Map{}
240234

241235
ctx, cancel := context.WithCancel(context.Background())
242236
defer cancel()
243237

238+
fetcher := NewFetcher(ctx, addr, requester.doRequest, true)
239+
240+
peersToSkip := &sync.Map{}
241+
244242
// start the fetcher
245-
go fetcher.run(ctx, peersToSkip)
243+
go fetcher.run(peersToSkip)
246244

247245
// call Request first
248-
rctx := context.Background()
249-
fetcher.Request(rctx, 0)
246+
fetcher.Request(0)
250247

251248
// there should be a request coming from fetcher
252249
var request *Request
@@ -260,7 +257,7 @@ func TestFetcherOfferAfterRequestUsesSourceFromContext(t *testing.T) {
260257
}
261258

262259
// after the Request call Offer
263-
fetcher.Offer(context.Background(), &sourcePeerID)
260+
fetcher.Offer(&sourcePeerID)
264261

265262
// there should be a request coming from fetcher
266263
select {
@@ -283,21 +280,21 @@ func TestFetcherOfferAfterRequestUsesSourceFromContext(t *testing.T) {
283280
func TestFetcherRetryOnTimeout(t *testing.T) {
284281
requester := newMockRequester()
285282
addr := make([]byte, 32)
286-
fetcher := NewFetcher(addr, requester.doRequest, true)
283+
284+
ctx, cancel := context.WithCancel(context.Background())
285+
defer cancel()
286+
287+
fetcher := NewFetcher(ctx, addr, requester.doRequest, true)
287288
// set searchTimeOut to low value so the test is quicker
288289
fetcher.searchTimeout = 250 * time.Millisecond
289290

290291
peersToSkip := &sync.Map{}
291292

292-
ctx, cancel := context.WithCancel(context.Background())
293-
defer cancel()
294-
295293
// start the fetcher
296-
go fetcher.run(ctx, peersToSkip)
294+
go fetcher.run(peersToSkip)
297295

298296
// call the fetch function with an active context
299-
rctx := context.Background()
300-
fetcher.Request(rctx, 0)
297+
fetcher.Request(0)
301298

302299
// after 100ms the first request should be initiated
303300
time.Sleep(100 * time.Millisecond)
@@ -339,7 +336,7 @@ func TestFetcherFactory(t *testing.T) {
339336

340337
fetcher := fetcherFactory.New(context.Background(), addr, peersToSkip)
341338

342-
fetcher.Request(context.Background(), 0)
339+
fetcher.Request(0)
343340

344341
// check if the created fetchFunction really starts a fetcher and initiates a request
345342
select {
@@ -353,21 +350,21 @@ func TestFetcherFactory(t *testing.T) {
353350
func TestFetcherRequestQuitRetriesRequest(t *testing.T) {
354351
requester := newMockRequester()
355352
addr := make([]byte, 32)
356-
fetcher := NewFetcher(addr, requester.doRequest, true)
353+
354+
ctx, cancel := context.WithCancel(context.Background())
355+
defer cancel()
356+
357+
fetcher := NewFetcher(ctx, addr, requester.doRequest, true)
357358

358359
// make sure the searchTimeout is long so it is sure the request is not
359360
// retried because of timeout
360361
fetcher.searchTimeout = 10 * time.Second
361362

362363
peersToSkip := &sync.Map{}
363364

364-
ctx, cancel := context.WithCancel(context.Background())
365-
defer cancel()
366-
367-
go fetcher.run(ctx, peersToSkip)
365+
go fetcher.run(peersToSkip)
368366

369-
rctx := context.Background()
370-
fetcher.Request(rctx, 0)
367+
fetcher.Request(0)
371368

372369
select {
373370
case <-requester.requestC:
@@ -460,17 +457,15 @@ func TestRequestSkipPeerPermanent(t *testing.T) {
460457
func TestFetcherMaxHopCount(t *testing.T) {
461458
requester := newMockRequester()
462459
addr := make([]byte, 32)
463-
fetcher := NewFetcher(addr, requester.doRequest, true)
464460

465461
ctx, cancel := context.WithCancel(context.Background())
466462
defer cancel()
467463

468-
peersToSkip := &sync.Map{}
464+
fetcher := NewFetcher(ctx, addr, requester.doRequest, true)
469465

470-
go fetcher.run(ctx, peersToSkip)
466+
peersToSkip := &sync.Map{}
471467

472-
rctx := context.Background()
473-
fetcher.Request(rctx, maxHopCount)
468+
go fetcher.run(peersToSkip)
474469

475470
// if hopCount is already at max no request should be initiated
476471
select {

0 commit comments

Comments
 (0)