diff --git a/core-tests/shared/src/test/scala/zio/ZQueueSpec.scala b/core-tests/shared/src/test/scala/zio/ZQueueSpec.scala index db0ea8700db0..69ee131b7924 100644 --- a/core-tests/shared/src/test/scala/zio/ZQueueSpec.scala +++ b/core-tests/shared/src/test/scala/zio/ZQueueSpec.scala @@ -46,14 +46,15 @@ object ZQueueSpec } yield assert(l.toSet, Predicate.equals(values.toSet)) }, testM("offers are suspended by back pressure") { - (for { + for { queue <- Queue.bounded[Int](10) _ <- queue.offer(1).repeat(ZSchedule.recurs(9)) refSuspended <- Ref.make[Boolean](true) - _ <- (queue.offer(2) *> refSuspended.set(false)).fork + f <- (queue.offer(2) *> refSuspended.set(false)).fork _ <- waitForSize(queue, 11) isSuspended <- refSuspended.get - } yield assert(isSuspended, Predicate.isTrue)).interruptChildren + _ <- f.interrupt + } yield assert(isSuspended, Predicate.isTrue) }, testM("back pressured offers are retrieved") { for { @@ -204,14 +205,15 @@ object ZQueueSpec } yield assert((list1, list2), Predicate.equals((List(10, 20), List(30, 40)))) }, testM("takeUpTo doesn't return back-pressured offers") { - (for { + for { queue <- Queue.bounded[Int](4) values = List(1, 2, 3, 4) _ <- values.map(queue.offer).foldLeft(IO.succeed(false))(_ *> _) - _ <- queue.offer(5).fork + f <- queue.offer(5).fork _ <- waitForSize(queue, 5) l <- queue.takeUpTo(5) - } yield assert(l, Predicate.equals(List(1, 2, 3, 4)))).interruptChildren + _ <- f.interrupt + } yield assert(l, Predicate.equals(List(1, 2, 3, 4))) }, testM("offerAll with takeAll") { for { @@ -226,9 +228,10 @@ object ZQueueSpec for { queue <- Queue.bounded[Int](2) orders = Range.inclusive(1, 3).toList - _ <- queue.offerAll(orders).fork + f <- queue.offerAll(orders).fork size <- waitForSize(queue, 3) l <- queue.takeAll + _ <- f.interrupt } yield assert((size, l), Predicate.equals((3, List(1, 2)))) }, testM("offerAll with takeAll and back pressure + interruption") { @@ -248,9 +251,10 @@ object ZQueueSpec for { queue <- Queue.bounded[Int](64) orders = Range.inclusive(1, 128).toList - _ <- queue.offerAll(orders).fork + f <- queue.offerAll(orders).fork _ <- waitForSize(queue, 128) l <- queue.takeAll + _ <- f.interrupt } yield assert(l, Predicate.equals(Range.inclusive(1, 64).toList)) }, testM("offerAll with pending takers") { @@ -282,11 +286,12 @@ object ZQueueSpec values = Range.inclusive(1, 100).toList takers <- IO.forkAll(List.fill(100)(queue.take)) _ <- waitForSize(queue, -100) - _ <- IO.forkAll(List.fill(100)(queue.take)) + f <- IO.forkAll(List.fill(100)(queue.take)) _ <- waitForSize(queue, -200) _ <- queue.offerAll(values) l <- takers.join s <- queue.size + _ <- f.interrupt } yield assert((l.toSet, s), Predicate.equals((values.toSet, -100))) }, testM("offerAll with take and back pressure") {