Skip to content

Commit

Permalink
Use a Promise in the RunloopState.Started as suggested by @erikvano…
Browse files Browse the repository at this point in the history
  • Loading branch information
guizmaii committed Jun 8, 2023
1 parent fbf342f commit c574f48
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 30 deletions.
2 changes: 1 addition & 1 deletion zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ object Consumer {
*/
override def stopConsumption: UIO[Unit] =
ZIO.logDebug("stopConsumption called") *>
runloopAccess.stopConsumption()
runloopAccess.stopConsumption

override def listTopics(timeout: Duration = Duration.Infinity): Task[Map[String, List[PartitionInfo]]] =
consumer.withConsumer(_.listTopics(timeout.asJava).asScala.map { case (k, v) => k -> v.asScala.toList }.toMap)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,25 @@ import zio.kafka.consumer.internal.Runloop.ByteArrayCommittableRecord
import zio.kafka.consumer.internal.RunloopAccess.PartitionAssignment
import zio.kafka.consumer.{ ConsumerSettings, Subscription }
import zio.stream.{ Stream, Take, UStream, ZStream }
import zio.{ durationInt, Hub, Ref, Scope, UIO, ZIO, ZLayer }
import zio.{ Hub, Promise, Ref, Scope, UIO, ZIO, ZLayer }

private[internal] sealed trait RunloopState
private[internal] object RunloopState {
case object NotStarted extends RunloopState
final case class Started(runloop: Runloop) extends RunloopState
case object Stopped extends RunloopState

/**
* Why do we need a Promise here?
*
* If the user starts a forked consumption session and just after the fork, calls `consumer.stopConsumption`, the
* consumption needs to be stopped even if the runloop is still booting up.
*
* For all the details, see discussion: https://github.com/zio/zio-kafka/pull/857#discussion_r1218434608
*/
final case class Started(promise: Promise[Nothing, Runloop]) extends RunloopState {
@inline def runloop: UIO[Runloop] = promise.await
}
case object NotStarted extends RunloopState
case object Stopped extends RunloopState

}

/**
Expand All @@ -37,40 +49,28 @@ private[internal] object RunloopState {
private[consumer] final class RunloopAccess private (
runloopStateRef: Ref.Synchronized[RunloopState],
partitionHub: Hub[Take[Throwable, PartitionAssignment]],
makeRunloop: UIO[RunloopState.Started],
makeRunloop: UIO[Runloop],
diagnostics: Diagnostics
) {
private def runloop(shouldStartIfNot: Boolean): UIO[RunloopState] =
runloopStateRef.updateSomeAndGetZIO { case RunloopState.NotStarted if shouldStartIfNot => makeRunloop }
runloopStateRef.updateSomeAndGetZIO {
case RunloopState.NotStarted if shouldStartIfNot =>
for {
promise <- Promise.make[Nothing, Runloop]
_ <- makeRunloop.map(promise.succeed).fork
} yield RunloopState.Started(promise)
}
private def withRunloopZIO[A](shouldStartIfNot: Boolean)(f: Runloop => UIO[A]): UIO[A] =
runloop(shouldStartIfNot).flatMap {
case RunloopState.Stopped => ZIO.unit.asInstanceOf[UIO[A]]
case RunloopState.NotStarted => ZIO.unit.asInstanceOf[UIO[A]]
case RunloopState.Started(runloop) => f(runloop)
case RunloopState.NotStarted => ZIO.unit.asInstanceOf[UIO[A]]
case RunloopState.Stopped => ZIO.unit.asInstanceOf[UIO[A]]
case s: RunloopState.Started => s.runloop.flatMap(f)
}

/**
* No need to call `Runloop::stopConsumption` if the Runloop has not been started or has been stopped.
*
* Note:
* 1. We do a 100 retries waiting 10ms between each to roughly take max 1s before to stop to retry. We want to avoid
* an infinite loop. We need this recursion because if the user calls `stopConsumption` before the Runloop is
* started, we need to wait for it to be started. Can happen if the user starts a consuming session in a forked
* fiber and immediately after forking, stops it. The Runloop will potentially not be started yet.
*/
// noinspection SimplifyUnlessInspection
def stopConsumption(retry: Int = 100, initialCall: Boolean = true): UIO[Unit] = {
@inline def next: UIO[Unit] = stopConsumption(retry - 1, initialCall = false)

runloop(shouldStartIfNot = false).flatMap {
case RunloopState.Stopped => ZIO.unit
case RunloopState.Started(runloop) => runloop.stopConsumption
case RunloopState.NotStarted =>
if (retry <= 0) ZIO.unit
else if (initialCall) next
else next.delay(10.millis)
}
}
def stopConsumption: UIO[Unit] = withRunloopZIO(shouldStartIfNot = false)(_.stopConsumption)

/**
* We're doing all of these things in this method so that the interface of this class is as simple as possible and
Expand Down Expand Up @@ -123,7 +123,6 @@ private[consumer] object RunloopAccess {
consumerSettings = consumerSettings
)
.withFinalizer(_ => runloopStateRef.set(RunloopState.Stopped))
.map(RunloopState.Started.apply)
.provide(ZLayer.succeed(consumerScope))
} yield new RunloopAccess(runloopStateRef, partitionsHub, makeRunloop, diagnostics)
}

0 comments on commit c574f48

Please sign in to comment.