From d282e9a3662dfd105243dcffe00ec0f224abe019 Mon Sep 17 00:00:00 2001 From: Marco Munizaga Date: Wed, 5 Jul 2023 17:04:33 -0700 Subject: [PATCH] Make this test less flaky --- p2p/test/transport/transport_test.go | 31 ++++++++++++++++++++++------ 1 file changed, 25 insertions(+), 6 deletions(-) diff --git a/p2p/test/transport/transport_test.go b/p2p/test/transport/transport_test.go index 513d2d9b2b..5a1ae0cea9 100644 --- a/p2p/test/transport/transport_test.go +++ b/p2p/test/transport/transport_test.go @@ -368,10 +368,14 @@ func TestMoreStreamsThanOurLimits(t *testing.T) { })) var handledStreams atomic.Int32 + + semaphore := make(chan struct{}, streamCount) + // Start with a single stream at a time. If that works, we'll increase the number of concurrent streams. + semaphore <- struct{}{} + listener.SetStreamHandler("echo", func(s network.Stream) { io.Copy(s, s) s.Close() - handledStreams.Add(1) }) wg := sync.WaitGroup{} @@ -380,14 +384,30 @@ func TestMoreStreamsThanOurLimits(t *testing.T) { var completedStreams atomic.Int32 for i := 0; i < streamCount; i++ { go func() { + <-semaphore + var didErr bool defer wg.Done() defer completedStreams.Add(1) + defer func() { + select { + case semaphore <- struct{}{}: + default: + } + if !didErr { + // No error! We can add one more stream to our concurrency limit. + select { + case semaphore <- struct{}{}: + default: + } + } + }() var s network.Stream var err error // maxRetries is an arbitrary retry amount if there's any error. maxRetries := streamCount * 4 shouldRetry := func(err error) bool { + didErr = true maxRetries-- if maxRetries == 0 || len(errCh) > 0 { select { @@ -426,14 +446,13 @@ func TestMoreStreamsThanOurLimits(t *testing.T) { if !bytes.Equal(b, []byte("hello")) { return errors.New("received data does not match sent data") } + handledStreams.Add(1) return nil }(s) - if err != nil { - if shouldRetry(err) { - time.Sleep(50 * time.Millisecond) - continue - } + if err != nil && shouldRetry(err) { + time.Sleep(50 * time.Millisecond) + continue } return }