Skip to content

Commit

Permalink
handle second callback call in effectAsync (zio#1402)
Browse files Browse the repository at this point in the history
  • Loading branch information
ghostdogpr committed Aug 17, 2019
1 parent 59ef24b commit a737e76
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 6 deletions.
14 changes: 14 additions & 0 deletions core-tests/jvm/src/test/scala/zio/RTSSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ class RTSSpec(implicit ee: ExecutionEnv) extends TestRuntime with org.specs2.mat
sleep 0 must return $testSleepZeroReturns
shallow bind of async chain $testShallowBindOfAsyncChainIsCorrect
effectAsyncM can fail before registering $testEffectAsyncMCanFail
second callback call is ignored $testAsyncSecondCallback

RTS concurrency correctness
shallow fork/join identity $testForkJoinIsId
Expand Down Expand Up @@ -640,6 +641,19 @@ class RTSSpec(implicit ee: ExecutionEnv) extends TestRuntime with org.specs2.mat
.map(_ must_=== "Ouch")
}

def testAsyncSecondCallback =
unsafeRun(for {
_ <- IO.effectAsync[Throwable, Int] { k =>
k(IO.succeed(42))
Thread.sleep(500)
k(IO.succeed(42))
}
res <- IO.effectAsync[Throwable, String] { k =>
Thread.sleep(1000)
k(IO.succeed("ok"))
}
} yield res) must_=== "ok"

def testSleepZeroReturns =
unsafeRun(clock.sleep(1.nanos)) must_=== ((): Unit)

Expand Down
11 changes: 5 additions & 6 deletions core/shared/src/main/scala/zio/internal/FiberContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,14 @@

package zio.internal

import java.util.concurrent.atomic.{ AtomicLong, AtomicReference }

import java.util.concurrent.atomic.{ AtomicBoolean, AtomicLong, AtomicReference }
import com.github.ghik.silencer.silent

import scala.collection.JavaConverters._
import zio.internal.FiberContext.{ FiberRefLocals, SuperviseStatus }
import zio.Cause
import zio._
import zio.internal.stacktracer.ZTraceElement
import zio.internal.tracing.ZIOFn

import scala.annotation.{ switch, tailrec }

/**
Expand Down Expand Up @@ -609,8 +606,10 @@ private[zio] final class FiberContext[E, A](
*
* @param value The value produced by the asynchronous computation.
*/
private[this] final val resumeAsync: IO[E, Any] => Unit =
zio => if (exitAsync()) evaluateLater(zio)
private[this] final def resumeAsync: IO[E, Any] => Unit = {
val a = new AtomicBoolean(true)
zio => if (a.getAndSet(false) && exitAsync()) evaluateLater(zio)
}

final def interrupt: UIO[Exit[E, A]] = ZIO.effectAsyncMaybe[Any, Nothing, Exit[E, A]] { k =>
kill0(x => k(ZIO.done(x)))
Expand Down

0 comments on commit a737e76

Please sign in to comment.