Skip to content

Commit

Permalink
Fix queue flakiness (zio#1439)
Browse files Browse the repository at this point in the history
  • Loading branch information
ghostdogpr authored and jdegoes committed Aug 19, 2019
1 parent 21bb6f2 commit f5ead75
Showing 1 changed file with 14 additions and 9 deletions.
23 changes: 14 additions & 9 deletions core-tests/shared/src/test/scala/zio/ZQueueSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -46,14 +46,15 @@ object ZQueueSpec
} yield assert(l.toSet, Predicate.equals(values.toSet))
},
testM("offers are suspended by back pressure") {
(for {
for {
queue <- Queue.bounded[Int](10)
_ <- queue.offer(1).repeat(ZSchedule.recurs(9))
refSuspended <- Ref.make[Boolean](true)
_ <- (queue.offer(2) *> refSuspended.set(false)).fork
f <- (queue.offer(2) *> refSuspended.set(false)).fork
_ <- waitForSize(queue, 11)
isSuspended <- refSuspended.get
} yield assert(isSuspended, Predicate.isTrue)).interruptChildren
_ <- f.interrupt
} yield assert(isSuspended, Predicate.isTrue)
},
testM("back pressured offers are retrieved") {
for {
Expand Down Expand Up @@ -204,14 +205,15 @@ object ZQueueSpec
} yield assert((list1, list2), Predicate.equals((List(10, 20), List(30, 40))))
},
testM("takeUpTo doesn't return back-pressured offers") {
(for {
for {
queue <- Queue.bounded[Int](4)
values = List(1, 2, 3, 4)
_ <- values.map(queue.offer).foldLeft(IO.succeed(false))(_ *> _)
_ <- queue.offer(5).fork
f <- queue.offer(5).fork
_ <- waitForSize(queue, 5)
l <- queue.takeUpTo(5)
} yield assert(l, Predicate.equals(List(1, 2, 3, 4)))).interruptChildren
_ <- f.interrupt
} yield assert(l, Predicate.equals(List(1, 2, 3, 4)))
},
testM("offerAll with takeAll") {
for {
Expand All @@ -226,9 +228,10 @@ object ZQueueSpec
for {
queue <- Queue.bounded[Int](2)
orders = Range.inclusive(1, 3).toList
_ <- queue.offerAll(orders).fork
f <- queue.offerAll(orders).fork
size <- waitForSize(queue, 3)
l <- queue.takeAll
_ <- f.interrupt
} yield assert((size, l), Predicate.equals((3, List(1, 2))))
},
testM("offerAll with takeAll and back pressure + interruption") {
Expand All @@ -248,9 +251,10 @@ object ZQueueSpec
for {
queue <- Queue.bounded[Int](64)
orders = Range.inclusive(1, 128).toList
_ <- queue.offerAll(orders).fork
f <- queue.offerAll(orders).fork
_ <- waitForSize(queue, 128)
l <- queue.takeAll
_ <- f.interrupt
} yield assert(l, Predicate.equals(Range.inclusive(1, 64).toList))
},
testM("offerAll with pending takers") {
Expand Down Expand Up @@ -282,11 +286,12 @@ object ZQueueSpec
values = Range.inclusive(1, 100).toList
takers <- IO.forkAll(List.fill(100)(queue.take))
_ <- waitForSize(queue, -100)
_ <- IO.forkAll(List.fill(100)(queue.take))
f <- IO.forkAll(List.fill(100)(queue.take))
_ <- waitForSize(queue, -200)
_ <- queue.offerAll(values)
l <- takers.join
s <- queue.size
_ <- f.interrupt
} yield assert((l.toSet, s), Predicate.equals((values.toSet, -100)))
},
testM("offerAll with take and back pressure") {
Expand Down

0 comments on commit f5ead75

Please sign in to comment.