From a6ed3d229c3e8db102a81628fed7d52b2d80d9b2 Mon Sep 17 00:00:00 2001 From: jules Ivanic Date: Wed, 7 Jun 2023 17:17:27 +0400 Subject: [PATCH] Use a Promise in the `RunloopState.Started` as suggested by @erikvanoosten See https://github.com/zio/zio-kafka/pull/857#discussion_r1218434608 --- .../scala/zio/kafka/consumer/Consumer.scala | 2 +- .../zio/kafka/consumer/internal/Runloop.scala | 2 +- .../consumer/internal/RunloopAccess.scala | 64 +++++++++---------- .../consumer/internal/RunloopExecutor.scala | 8 +-- 4 files changed, 37 insertions(+), 39 deletions(-) diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala index 8f83525915..f785bf86ce 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala @@ -196,7 +196,7 @@ object Consumer { */ override def stopConsumption: UIO[Unit] = ZIO.logDebug("stopConsumption called") *> - runloopAccess.stopConsumption() + runloopAccess.stopConsumption override def listTopics(timeout: Duration = Duration.Infinity): Task[Map[String, List[PartitionInfo]]] = consumer.withConsumer(_.listTopics(timeout.asJava).asScala.map { case (k, v) => k -> v.asScala.toList }.toMap) diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala index ef72e17924..58f2e84dff 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala @@ -577,7 +577,7 @@ private[consumer] object Runloop { restartStreamsOnRebalancing: Boolean, partitionsHub: Hub[Take[Throwable, PartitionAssignment]], consumerSettings: ConsumerSettings - ): ZIO[Scope, Throwable, Runloop] = + ): URIO[Scope, Runloop] = for { _ <- ZIO.addFinalizer(diagnostics.emit(Finalization.RunloopFinalized)) commandQueue <- ZIO.acquireRelease(Queue.bounded[RunloopCommand](commandQueueSize))(_.shutdown) diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/RunloopAccess.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/RunloopAccess.scala index bd49ad696a..ca161a6370 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/RunloopAccess.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/RunloopAccess.scala @@ -7,13 +7,25 @@ import zio.kafka.consumer.internal.Runloop.ByteArrayCommittableRecord import zio.kafka.consumer.internal.RunloopAccess.PartitionAssignment import zio.kafka.consumer.{ ConsumerSettings, Subscription } import zio.stream.{ Stream, Take, UStream, ZStream } -import zio.{ durationInt, Hub, RIO, Ref, Scope, Task, UIO, ZIO, ZLayer } +import zio.{ Hub, Promise, Ref, Scope, UIO, ZIO, ZLayer } private[internal] sealed trait RunloopState private[internal] object RunloopState { - case object NotStarted extends RunloopState - final case class Started(runloop: Runloop) extends RunloopState - case object Stopped extends RunloopState + + /** + * Why do we need a Promise here? + * + * If the user starts a forked consumption session and just after the fork, calls `consumer.stopConsumption`, the + * consumption needs to be stopped even if the runloop is still booting up. + * + * For all the details, see discussion: https://github.com/zio/zio-kafka/pull/857#discussion_r1218434608 + */ + final case class Started(promise: Promise[Nothing, Runloop]) extends RunloopState { + @inline def runloop: UIO[Runloop] = promise.await + } + case object NotStarted extends RunloopState + case object Stopped extends RunloopState + } /** @@ -37,41 +49,28 @@ private[internal] object RunloopState { private[consumer] final class RunloopAccess private ( runloopStateRef: Ref.Synchronized[RunloopState], partitionHub: Hub[Take[Throwable, PartitionAssignment]], - makeRunloop: Task[RunloopState.Started], + makeRunloop: UIO[Runloop], diagnostics: Diagnostics ) { - private def runloop(shouldStartIfNot: Boolean): Task[RunloopState] = - runloopStateRef.updateSomeAndGetZIO { case RunloopState.NotStarted if shouldStartIfNot => makeRunloop } - private def withRunloopZIO[R, A](shouldStartIfNot: Boolean)(f: Runloop => RIO[R, A]): RIO[R, A] = + private def runloop(shouldStartIfNot: Boolean): UIO[RunloopState] = + runloopStateRef.updateSomeAndGetZIO { + case RunloopState.NotStarted if shouldStartIfNot => + for { + promise <- Promise.make[Nothing, Runloop] + _ <- makeRunloop.map(promise.succeed).fork + } yield RunloopState.Started(promise) + } + private def withRunloopZIO[A](shouldStartIfNot: Boolean)(f: Runloop => UIO[A]): UIO[A] = runloop(shouldStartIfNot).flatMap { - case RunloopState.NotStarted => ZIO.unit.asInstanceOf[RIO[R, A]] - case RunloopState.Started(runloop) => f(runloop) - case RunloopState.Stopped => ZIO.unit.asInstanceOf[RIO[R, A]] + case RunloopState.NotStarted => ZIO.unit.asInstanceOf[UIO[A]] + case RunloopState.Stopped => ZIO.unit.asInstanceOf[UIO[A]] + case s: RunloopState.Started => s.runloop.flatMap(f) } /** * No need to call `Runloop::stopConsumption` if the Runloop has not been started or has been stopped. - * - * Note: - * 1. The `.orDie` is just here for compilation. It cannot happen. - * 1. We do a 100 retries waiting 10ms between each to roughly take max 1s before to stop to retry. We want to avoid - * an infinite loop. We need this recursion because if the user calls `stopConsumption` before the Runloop is - * started, we need to wait for it to be started. Can happen if the user starts a consuming session in a forked - * fiber and immediately after forking, stops it. The Runloop will potentially not be started yet. */ - // noinspection SimplifyUnlessInspection - def stopConsumption(retry: Int = 100, initialCall: Boolean = true): UIO[Unit] = { - @inline def next: UIO[Unit] = stopConsumption(retry - 1, initialCall = false) - - runloop(shouldStartIfNot = false).orDie.flatMap { - case RunloopState.Stopped => ZIO.unit - case RunloopState.Started(runloop) => runloop.stopConsumption - case RunloopState.NotStarted => - if (retry <= 0) ZIO.unit - else if (initialCall) next - else next.delay(10.millis) - } - } + def stopConsumption: UIO[Unit] = withRunloopZIO(shouldStartIfNot = false)(_.stopConsumption) /** * We're doing all of these things in this method so that the interface of this class is as simple as possible and @@ -87,7 +86,7 @@ private[consumer] final class RunloopAccess private ( // starts the Runloop if not already started _ <- withRunloopZIO(shouldStartIfNot = true)(_.addSubscription(subscription)) _ <- ZIO.addFinalizer { - withRunloopZIO(shouldStartIfNot = false)(_.removeSubscription(subscription)).orDie <* + withRunloopZIO(shouldStartIfNot = false)(_.removeSubscription(subscription)) <* diagnostics.emit(Finalization.SubscriptionFinalized) } } yield stream @@ -124,7 +123,6 @@ private[consumer] object RunloopAccess { consumerSettings = consumerSettings ) .withFinalizer(_ => runloopStateRef.set(RunloopState.Stopped)) - .map(RunloopState.Started.apply) .provide(ZLayer.succeed(consumerScope)) } yield new RunloopAccess(runloopStateRef, partitionsHub, makeRunloop, diagnostics) } diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/RunloopExecutor.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/RunloopExecutor.scala index b833f13a70..767fe939da 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/RunloopExecutor.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/RunloopExecutor.scala @@ -1,6 +1,6 @@ package zio.kafka.consumer.internal -import zio.{ Executor, Scope, ZIO } +import zio.{ Executor, Scope, URIO, ZIO } import java.util.concurrent.Executors import java.util.concurrent.atomic.AtomicLong @@ -9,9 +9,9 @@ private[consumer] object RunloopExecutor { private val counter: AtomicLong = new AtomicLong(0) - private def newSingleThreadedExecutor(i: Long): ZIO[Scope, Throwable, Executor] = + private def newSingleThreadedExecutor(i: Long): URIO[Scope, Executor] = ZIO.acquireRelease { - ZIO.attempt { + ZIO.succeed { val javaExecutor = Executors.newSingleThreadExecutor(runnable => new Thread(runnable, s"zio-kafka-runloop-thread-$i")) @@ -19,6 +19,6 @@ private[consumer] object RunloopExecutor { } } { case (_, executor) => ZIO.attempt(executor.shutdown()).orDie }.map(_._1) - val newInstance: ZIO[Scope, Throwable, Executor] = newSingleThreadedExecutor(counter.getAndIncrement()) + val newInstance: URIO[Scope, Executor] = newSingleThreadedExecutor(counter.getAndIncrement()) }