Skip to content

Commit

Permalink
Make this test less flaky
Browse files Browse the repository at this point in the history
  • Loading branch information
MarcoPolo committed Jul 6, 2023
1 parent 87c2561 commit d282e9a
Showing 1 changed file with 25 additions and 6 deletions.
31 changes: 25 additions & 6 deletions p2p/test/transport/transport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -368,10 +368,14 @@ func TestMoreStreamsThanOurLimits(t *testing.T) {
}))

var handledStreams atomic.Int32

semaphore := make(chan struct{}, streamCount)
// Start with a single stream at a time. If that works, we'll increase the number of concurrent streams.
semaphore <- struct{}{}

listener.SetStreamHandler("echo", func(s network.Stream) {
io.Copy(s, s)
s.Close()
handledStreams.Add(1)
})

wg := sync.WaitGroup{}
Expand All @@ -380,14 +384,30 @@ func TestMoreStreamsThanOurLimits(t *testing.T) {
var completedStreams atomic.Int32
for i := 0; i < streamCount; i++ {
go func() {
<-semaphore
var didErr bool
defer wg.Done()
defer completedStreams.Add(1)
defer func() {
select {
case semaphore <- struct{}{}:
default:
}
if !didErr {
// No error! We can add one more stream to our concurrency limit.
select {
case semaphore <- struct{}{}:
default:
}
}
}()

var s network.Stream
var err error
// maxRetries is an arbitrary retry amount if there's any error.
maxRetries := streamCount * 4
shouldRetry := func(err error) bool {
didErr = true
maxRetries--
if maxRetries == 0 || len(errCh) > 0 {
select {
Expand Down Expand Up @@ -426,14 +446,13 @@ func TestMoreStreamsThanOurLimits(t *testing.T) {
if !bytes.Equal(b, []byte("hello")) {
return errors.New("received data does not match sent data")
}
handledStreams.Add(1)

return nil
}(s)
if err != nil {
if shouldRetry(err) {
time.Sleep(50 * time.Millisecond)
continue
}
if err != nil && shouldRetry(err) {
time.Sleep(50 * time.Millisecond)
continue
}
return
}
Expand Down

0 comments on commit d282e9a

Please sign in to comment.