Skip to content
This repository has been archived by the owner on Aug 2, 2021. It is now read-only.

Fix data races in fetcher tests #1121

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions swarm/network/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,12 @@ func (f *Fetcher) Request(ctx context.Context, hopCount uint8) {

// start prepares the Fetcher
// it keeps the Fetcher alive within the lifecycle of the passed context
// If doneC is not nil it will be closed to signal that the run function has returned.
// Not nil doneC is used in tests to synchronize cleanup operations when the test is done.
func (f *Fetcher) run(ctx context.Context, peers *sync.Map) {
if testHookFetcherRun != nil {
defer testHookFetcherRun()
}
var (
doRequest bool // determines if retrieval is initiated in the current iteration
wait *time.Timer // timer for search timeout
Expand Down Expand Up @@ -250,6 +255,8 @@ func (f *Fetcher) run(ctx context.Context, peers *sync.Map) {
}
}

var testHookFetcherRun func()

// doRequest attempts at finding a peer to request the chunk from
// * first it tries to request explicitly from peers that are known to have offered the chunk
// * if there are no such peers (available) it tries to request it from a peer closest to the chunk address
Expand Down
170 changes: 167 additions & 3 deletions swarm/network/fetcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,11 @@ func (m *mockRequester) doRequest(ctx context.Context, request *Request) (*enode
m.count++
}
time.Sleep(waitTime)
m.requestC <- request
select {
case m.requestC <- request:
case <-ctx.Done():
return nil, nil, ctx.Err()
}

// if there is a Source in the request use that, if not use the global requestedPeerId
source := request.Source
Expand All @@ -77,6 +81,15 @@ func TestFetcherSingleRequest(t *testing.T) {
peersToSkip.Store(p, time.Now())
}

// fetcherDoneC will be closed when fetcher run function returns.
// Waiting for fetcher run function goroutine to terminate
// prevents the race condition on searchTimeout value change by other
// tests. The fetcherDoneC defer function will wait for a proper
// goroutine cleanup.
fetcherDoneC := make(chan struct{})
defer setTestHookFetcherRun(func() { close(fetcherDoneC) })()
defer waitFetcherDone(t, fetcherDoneC)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

Expand Down Expand Up @@ -119,6 +132,15 @@ func TestFetcherCancelStopsFetcher(t *testing.T) {

peersToSkip := &sync.Map{}

// fetcherDoneC will be closed when fetcher run function returns.
// Waiting for fetcher run function goroutine to terminate
// prevents the race condition on searchTimeout value change by other
// tests. The fetcherDoneC defer function will wait for a proper
// goroutine cleanup.
fetcherDoneC := make(chan struct{})
defer setTestHookFetcherRun(func() { close(fetcherDoneC) })()
defer waitFetcherDone(t, fetcherDoneC)

ctx, cancel := context.WithCancel(context.Background())

// we start the fetcher, and then we immediately cancel the context
Expand Down Expand Up @@ -146,6 +168,15 @@ func TestFetcherCancelStopsRequest(t *testing.T) {

peersToSkip := &sync.Map{}

// fetcherDoneC will be closed when fetcher run function returns.
// Waiting for fetcher run function goroutine to terminate
// prevents the race condition on searchTimeout value change by other
// tests. The fetcherDoneC defer function will wait for a proper
// goroutine cleanup.
fetcherDoneC := make(chan struct{})
defer setTestHookFetcherRun(func() { close(fetcherDoneC) })()
defer waitFetcherDone(t, fetcherDoneC)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

Expand Down Expand Up @@ -186,6 +217,15 @@ func TestFetcherOfferUsesSource(t *testing.T) {

peersToSkip := &sync.Map{}

// fetcherDoneC will be closed when fetcher run function returns.
// Waiting for fetcher run function goroutine to terminate
// prevents the race condition on searchTimeout value change by other
// tests. The fetcherDoneC defer function will wait for a proper
// goroutine cleanup.
fetcherDoneC := make(chan struct{})
defer setTestHookFetcherRun(func() { close(fetcherDoneC) })()
defer waitFetcherDone(t, fetcherDoneC)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

Expand Down Expand Up @@ -238,6 +278,15 @@ func TestFetcherOfferAfterRequestUsesSourceFromContext(t *testing.T) {

peersToSkip := &sync.Map{}

// fetcherDoneC will be closed when fetcher run function returns.
// Waiting for fetcher run function goroutine to terminate
// prevents the race condition on searchTimeout value change by other
// tests. The fetcherDoneC defer function will wait for a proper
// goroutine cleanup.
fetcherDoneC := make(chan struct{})
defer setTestHookFetcherRun(func() { close(fetcherDoneC) })()
defer waitFetcherDone(t, fetcherDoneC)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

Expand Down Expand Up @@ -293,6 +342,14 @@ func TestFetcherRetryOnTimeout(t *testing.T) {
}(searchTimeout)
searchTimeout = 250 * time.Millisecond

// fetcherDoneC will be closed when fetcher run function returns.
// Context cancel defer will close the context and deferred function
// that waits for fetcherDoneC will prevent searchTimeout defer function
// to change searchTimeout before fetcher run returns.
fetcherDoneC := make(chan struct{})
defer setTestHookFetcherRun(func() { close(fetcherDoneC) })()
defer waitFetcherDone(t, fetcherDoneC)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

Expand Down Expand Up @@ -341,9 +398,21 @@ func TestFetcherFactory(t *testing.T) {

peersToSkip := &sync.Map{}

fetcher := fetcherFactory.New(context.Background(), addr, peersToSkip)
// fetcherDoneC will be closed when fetcher run function returns.
// Waiting for fetcher run function goroutine to terminate
// prevents the race condition on searchTimeout value change by other
// tests. The fetcherDoneC defer function will wait for a proper
// goroutine cleanup.
fetcherDoneC := make(chan struct{})
defer setTestHookFetcherRun(func() { close(fetcherDoneC) })()
defer waitFetcherDone(t, fetcherDoneC)

fetcher.Request(context.Background(), 0)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

fetcher := fetcherFactory.New(ctx, addr, peersToSkip)

fetcher.Request(ctx, 0)

// check if the created fetchFunction really starts a fetcher and initiates a request
select {
Expand All @@ -367,6 +436,15 @@ func TestFetcherRequestQuitRetriesRequest(t *testing.T) {

peersToSkip := &sync.Map{}

// fetcherDoneC will be closed when fetcher run function returns.
// Waiting for fetcher run function goroutine to terminate
// prevents the race condition on searchTimeout value change by other
// tests. The fetcherDoneC defer function will wait for a proper
// goroutine cleanup.
fetcherDoneC := make(chan struct{})
defer setTestHookFetcherRun(func() { close(fetcherDoneC) })()
defer waitFetcherDone(t, fetcherDoneC)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

Expand Down Expand Up @@ -468,6 +546,15 @@ func TestFetcherMaxHopCount(t *testing.T) {
addr := make([]byte, 32)
fetcher := NewFetcher(addr, requester.doRequest, true)

// fetcherDoneC will be closed when fetcher run function returns.
// Waiting for fetcher run function goroutine to terminate
// prevents the race condition on searchTimeout value change by other
// tests. The fetcherDoneC defer function will wait for a proper
// goroutine cleanup.
fetcherDoneC := make(chan struct{})
defer setTestHookFetcherRun(func() { close(fetcherDoneC) })()
defer waitFetcherDone(t, fetcherDoneC)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

Expand All @@ -485,3 +572,80 @@ func TestFetcherMaxHopCount(t *testing.T) {
case <-time.After(200 * time.Millisecond):
}
}

// waitFetcherDone is a helper function that waits for
// a channel to be closed.
func waitFetcherDone(t *testing.T, doneC <-chan struct{}) {
t.Helper()
select {
case _, ok := <-doneC:
if ok {
t.Error("channel struct{} received, but channel not closed")
}
case <-time.After(10 * time.Second):
t.Error("fetcher run function did not finish")
}
}

// setTestHookFetcherRun sets testHookFetcherRun and
// returns a function that will reset it to the
// value before the change.
func setTestHookFetcherRun(h func()) (reset func()) {
current := testHookFetcherRun
reset = func() { testHookFetcherRun = current }
testHookFetcherRun = h
return reset
}

// TestSetTestHookFetcherRun tests if setTestHookFetcherRun changes
// testHookFetcherRun function correctly and if its reset function
// resets the original function.
func TestSetTestHookFetcherRun(t *testing.T) {
// Set the current function after the test finishes.
defer func(h func()) { testHookFetcherRun = h }(testHookFetcherRun)

// expected value for the unchanged function
original := 1
// expected value for the changed function
changed := 2

// this variable will be set with two different functions
var got int

// define the original (unchanged) functions
testHookFetcherRun = func() {
got = original
}

// set got variable
testHookFetcherRun()

// test if got variable is set correctly
if got != original {
t.Errorf("got hook value %v, want %v", got, original)
}

// set the new function
reset := setTestHookFetcherRun(func() {
got = changed
})

// set got variable
testHookFetcherRun()

// test if got variable is set correctly to changed value
if got != changed {
t.Errorf("got hook value %v, want %v", got, changed)
}

// set the function to the original one
reset()

// set got variable
testHookFetcherRun()

// test if got variable is set correctly to original value
if got != original {
t.Errorf("got hook value %v, want %v", got, original)
}
}