From 1f8764be3b43448ccfd60706c42dab09b0bc6ed3 Mon Sep 17 00:00:00 2001 From: lzhfromustc <43191155+lzhfromustc@users.noreply.github.com> Date: Thu, 5 Dec 2019 18:40:10 -0500 Subject: [PATCH] integration: prevent goroutines leaks in test (#11318) Some goroutines in test functions will be leaked in certain cases. This patch stops these leaks no matter what happens in test, by putting the blocking channel in select together with a new stopc channel, or adding 1 buffer to the blocking channel. --- integration/v3_barrier_test.go | 9 ++++++++- integration/v3_leadership_test.go | 8 +++++++- integration/v3_lock_test.go | 21 ++++++++++++++++++--- integration/v3_watch_test.go | 2 +- integration/v3election_grpc_test.go | 2 +- 5 files changed, 35 insertions(+), 7 deletions(-) diff --git a/integration/v3_barrier_test.go b/integration/v3_barrier_test.go index 1fbc78b3c6a..838f45c5998 100644 --- a/integration/v3_barrier_test.go +++ b/integration/v3_barrier_test.go @@ -47,13 +47,20 @@ func testBarrier(t *testing.T, waiters int, chooseClient func() *clientv3.Client } donec := make(chan struct{}) + stopc := make(chan struct{}) + defer close(stopc) + for i := 0; i < waiters; i++ { go func() { br := recipe.NewBarrier(chooseClient(), "test-barrier") if err := br.Wait(); err != nil { t.Errorf("could not wait on barrier (%v)", err) } - donec <- struct{}{} + select { + case donec <- struct{}{}: + case <-stopc: + } + }() } diff --git a/integration/v3_leadership_test.go b/integration/v3_leadership_test.go index 29818113a14..17368e19f12 100644 --- a/integration/v3_leadership_test.go +++ b/integration/v3_leadership_test.go @@ -39,10 +39,16 @@ func testMoveLeader(t *testing.T, auto bool) { // ensure followers go through leader transition while learship transfer idc := make(chan uint64) + stopc := make(chan struct{}) + defer close(stopc) + for i := range clus.Members { if oldLeadIdx != i { go func(m *member) { - idc <- checkLeaderTransition(m, oldLeadID) + select { + case idc <- checkLeaderTransition(m, oldLeadID): + case <-stopc: + } }(clus.Members[i]) } } diff --git a/integration/v3_lock_test.go b/integration/v3_lock_test.go index e36af2d43ce..b122ff5de5b 100644 --- a/integration/v3_lock_test.go +++ b/integration/v3_lock_test.go @@ -49,6 +49,9 @@ func TestMutexLockMultiNode(t *testing.T) { func testMutexLock(t *testing.T, waiters int, chooseClient func() *clientv3.Client) { // stream lock acquisitions lockedC := make(chan *concurrency.Mutex) + stopC := make(chan struct{}) + defer close(stopC) + for i := 0; i < waiters; i++ { go func() { session, err := concurrency.NewSession(chooseClient()) @@ -59,7 +62,11 @@ func testMutexLock(t *testing.T, waiters int, chooseClient func() *clientv3.Clie if err := m.Lock(context.TODO()); err != nil { t.Errorf("could not wait on lock (%v)", err) } - lockedC <- m + select { + case lockedC <- m: + case <-stopC: + } + }() } // unlock locked mutexes @@ -103,6 +110,8 @@ func TestMutexTryLockMultiNode(t *testing.T) { func testMutexTryLock(t *testing.T, lockers int, chooseClient func() *clientv3.Client) { lockedC := make(chan *concurrency.Mutex) notlockedC := make(chan *concurrency.Mutex) + stopC := make(chan struct{}) + defer close(stopC) for i := 0; i < lockers; i++ { go func() { session, err := concurrency.NewSession(chooseClient()) @@ -112,9 +121,15 @@ func testMutexTryLock(t *testing.T, lockers int, chooseClient func() *clientv3.C m := concurrency.NewMutex(session, "test-mutex-try-lock") err = m.TryLock(context.TODO()) if err == nil { - lockedC <- m + select { + case lockedC <- m: + case <-stopC: + } } else if err == concurrency.ErrLocked { - notlockedC <- m + select { + case notlockedC <- m: + case <-stopC: + } } else { t.Errorf("Unexpected Error %v", err) } diff --git a/integration/v3_watch_test.go b/integration/v3_watch_test.go index dd04eab4531..d5a52e790d6 100644 --- a/integration/v3_watch_test.go +++ b/integration/v3_watch_test.go @@ -1188,7 +1188,7 @@ func TestV3WatchWithPrevKV(t *testing.T) { t.Fatal(err) } - recv := make(chan *pb.WatchResponse) + recv := make(chan *pb.WatchResponse, 1) go func() { // check received PUT resp, rerr := ws.Recv() diff --git a/integration/v3election_grpc_test.go b/integration/v3election_grpc_test.go index 664b4f3139d..41a9df441e0 100644 --- a/integration/v3election_grpc_test.go +++ b/integration/v3election_grpc_test.go @@ -97,7 +97,7 @@ func TestV3ElectionObserve(t *testing.T) { lc := toGRPC(clus.Client(0)).Election // observe leadership events - observec := make(chan struct{}) + observec := make(chan struct{}, 1) go func() { defer close(observec) s, err := lc.Observe(context.Background(), &epb.LeaderRequest{Name: []byte("foo")})