Skip to content

Commit

Permalink
Java's CompletionStage: make cancellable only the async part (zio#6717)
Browse files Browse the repository at this point in the history
  • Loading branch information
sideeffffect authored Apr 25, 2022
1 parent d733233 commit 3cb59d3
Showing 1 changed file with 30 additions and 20 deletions.
50 changes: 30 additions & 20 deletions core/jvm/src/main/scala/zio/interop/javaz.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package zio.interop
import _root_.java.nio.channels.CompletionHandler
import _root_.java.util.concurrent.{CompletableFuture, CompletionException, CompletionStage, Future}
import zio._
import zio.blocking.{Blocking, blocking}
import zio.blocking.Blocking

import java.util.concurrent.CancellationException
import scala.concurrent.ExecutionException
Expand Down Expand Up @@ -64,20 +64,24 @@ private[zio] object javaz {
} catch catchFromGet(isFatal)

def fromCompletionStage[A](thunk: => CompletionStage[A]): Task[A] =
Task.effect(thunk).flatMap { cs =>
Task.effectSuspendTotalWith { (p, _) =>
val cf = cs.toCompletableFuture
if (cf.isDone) {
unwrapDone(p.fatal)(cf)
} else {
Task.effectAsyncInterrupt { cb =>
val _ = cs.handle[Unit] { (v: A, t: Throwable) =>
val io = Option(t).fold[Task[A]](Task.succeed(v)) { t =>
catchFromGet(p.fatal).lift(t).getOrElse(Task.die(t))
Task.uninterruptibleMask { restore =>
Task.effect(thunk).flatMap { cs =>
Task.effectSuspendTotalWith { (p, _) =>
val cf = cs.toCompletableFuture
if (cf.isDone) {
unwrapDone(p.fatal)(cf)
} else {
restore {
Task.effectAsyncInterrupt[A] { cb =>
val _ = cs.handle[Unit] { (v: A, t: Throwable) =>
val io = Option(t).fold[Task[A]](Task.succeed(v)) { t =>
catchFromGet(p.fatal).lift(t).getOrElse(Task.die(t))
}
cb(io)
}
Left(UIO(cf.cancel(false)))
}
cb(io)
}
Left(UIO(cf.cancel(false)))
}.onInterrupt(UIO(cf.cancel(false)))
}
}
}
Expand All @@ -88,12 +92,18 @@ private[zio] object javaz {
* `fromCompletionStage`
*/
def fromFutureJava[A](thunk: => Future[A]): RIO[Blocking, A] =
RIO.effect(thunk).flatMap { future =>
RIO.effectSuspendTotalWith { (p, _) =>
if (future.isDone) {
unwrapDone(p.fatal)(future)
} else {
blocking(Task.effectSuspend(unwrapDone(p.fatal)(future))).onInterrupt(UIO(future.cancel(false)))
ZIO.service[Blocking.Service].flatMap { blocking =>
Task.uninterruptibleMask { restore =>
RIO.effect(thunk).flatMap { future =>
RIO.effectSuspendTotalWith { (p, _) =>
if (future.isDone) {
unwrapDone(p.fatal)(future)
} else {
restore {
blocking.blocking(Task.effectSuspend(unwrapDone(p.fatal)(future)))
}.onInterrupt(UIO(future.cancel(false)))
}
}
}
}
}
Expand Down

0 comments on commit 3cb59d3

Please sign in to comment.