Skip to content

Commit

Permalink
Use a Promise in the RunloopState.Started as suggested by @erikvano…
Browse files Browse the repository at this point in the history
  • Loading branch information
guizmaii committed Jun 7, 2023
1 parent 12c2dd9 commit a6ed3d2
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 39 deletions.
2 changes: 1 addition & 1 deletion zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ object Consumer {
*/
override def stopConsumption: UIO[Unit] =
ZIO.logDebug("stopConsumption called") *>
runloopAccess.stopConsumption()
runloopAccess.stopConsumption

override def listTopics(timeout: Duration = Duration.Infinity): Task[Map[String, List[PartitionInfo]]] =
consumer.withConsumer(_.listTopics(timeout.asJava).asScala.map { case (k, v) => k -> v.asScala.toList }.toMap)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -577,7 +577,7 @@ private[consumer] object Runloop {
restartStreamsOnRebalancing: Boolean,
partitionsHub: Hub[Take[Throwable, PartitionAssignment]],
consumerSettings: ConsumerSettings
): ZIO[Scope, Throwable, Runloop] =
): URIO[Scope, Runloop] =
for {
_ <- ZIO.addFinalizer(diagnostics.emit(Finalization.RunloopFinalized))
commandQueue <- ZIO.acquireRelease(Queue.bounded[RunloopCommand](commandQueueSize))(_.shutdown)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,25 @@ import zio.kafka.consumer.internal.Runloop.ByteArrayCommittableRecord
import zio.kafka.consumer.internal.RunloopAccess.PartitionAssignment
import zio.kafka.consumer.{ ConsumerSettings, Subscription }
import zio.stream.{ Stream, Take, UStream, ZStream }
import zio.{ durationInt, Hub, RIO, Ref, Scope, Task, UIO, ZIO, ZLayer }
import zio.{ Hub, Promise, Ref, Scope, UIO, ZIO, ZLayer }

private[internal] sealed trait RunloopState
private[internal] object RunloopState {
case object NotStarted extends RunloopState
final case class Started(runloop: Runloop) extends RunloopState
case object Stopped extends RunloopState

/**
* Why do we need a Promise here?
*
* If the user starts a forked consumption session and just after the fork, calls `consumer.stopConsumption`, the
* consumption needs to be stopped even if the runloop is still booting up.
*
* For all the details, see discussion: https://github.com/zio/zio-kafka/pull/857#discussion_r1218434608
*/
final case class Started(promise: Promise[Nothing, Runloop]) extends RunloopState {
@inline def runloop: UIO[Runloop] = promise.await
}
case object NotStarted extends RunloopState
case object Stopped extends RunloopState

}

/**
Expand All @@ -37,41 +49,28 @@ private[internal] object RunloopState {
private[consumer] final class RunloopAccess private (
runloopStateRef: Ref.Synchronized[RunloopState],
partitionHub: Hub[Take[Throwable, PartitionAssignment]],
makeRunloop: Task[RunloopState.Started],
makeRunloop: UIO[Runloop],
diagnostics: Diagnostics
) {
private def runloop(shouldStartIfNot: Boolean): Task[RunloopState] =
runloopStateRef.updateSomeAndGetZIO { case RunloopState.NotStarted if shouldStartIfNot => makeRunloop }
private def withRunloopZIO[R, A](shouldStartIfNot: Boolean)(f: Runloop => RIO[R, A]): RIO[R, A] =
private def runloop(shouldStartIfNot: Boolean): UIO[RunloopState] =
runloopStateRef.updateSomeAndGetZIO {
case RunloopState.NotStarted if shouldStartIfNot =>
for {
promise <- Promise.make[Nothing, Runloop]
_ <- makeRunloop.map(promise.succeed).fork
} yield RunloopState.Started(promise)
}
private def withRunloopZIO[A](shouldStartIfNot: Boolean)(f: Runloop => UIO[A]): UIO[A] =
runloop(shouldStartIfNot).flatMap {
case RunloopState.NotStarted => ZIO.unit.asInstanceOf[RIO[R, A]]
case RunloopState.Started(runloop) => f(runloop)
case RunloopState.Stopped => ZIO.unit.asInstanceOf[RIO[R, A]]
case RunloopState.NotStarted => ZIO.unit.asInstanceOf[UIO[A]]
case RunloopState.Stopped => ZIO.unit.asInstanceOf[UIO[A]]
case s: RunloopState.Started => s.runloop.flatMap(f)
}

/**
* No need to call `Runloop::stopConsumption` if the Runloop has not been started or has been stopped.
*
* Note:
* 1. The `.orDie` is just here for compilation. It cannot happen.
* 1. We do a 100 retries waiting 10ms between each to roughly take max 1s before to stop to retry. We want to avoid
* an infinite loop. We need this recursion because if the user calls `stopConsumption` before the Runloop is
* started, we need to wait for it to be started. Can happen if the user starts a consuming session in a forked
* fiber and immediately after forking, stops it. The Runloop will potentially not be started yet.
*/
// noinspection SimplifyUnlessInspection
def stopConsumption(retry: Int = 100, initialCall: Boolean = true): UIO[Unit] = {
@inline def next: UIO[Unit] = stopConsumption(retry - 1, initialCall = false)

runloop(shouldStartIfNot = false).orDie.flatMap {
case RunloopState.Stopped => ZIO.unit
case RunloopState.Started(runloop) => runloop.stopConsumption
case RunloopState.NotStarted =>
if (retry <= 0) ZIO.unit
else if (initialCall) next
else next.delay(10.millis)
}
}
def stopConsumption: UIO[Unit] = withRunloopZIO(shouldStartIfNot = false)(_.stopConsumption)

/**
* We're doing all of these things in this method so that the interface of this class is as simple as possible and
Expand All @@ -87,7 +86,7 @@ private[consumer] final class RunloopAccess private (
// starts the Runloop if not already started
_ <- withRunloopZIO(shouldStartIfNot = true)(_.addSubscription(subscription))
_ <- ZIO.addFinalizer {
withRunloopZIO(shouldStartIfNot = false)(_.removeSubscription(subscription)).orDie <*
withRunloopZIO(shouldStartIfNot = false)(_.removeSubscription(subscription)) <*
diagnostics.emit(Finalization.SubscriptionFinalized)
}
} yield stream
Expand Down Expand Up @@ -124,7 +123,6 @@ private[consumer] object RunloopAccess {
consumerSettings = consumerSettings
)
.withFinalizer(_ => runloopStateRef.set(RunloopState.Stopped))
.map(RunloopState.Started.apply)
.provide(ZLayer.succeed(consumerScope))
} yield new RunloopAccess(runloopStateRef, partitionsHub, makeRunloop, diagnostics)
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package zio.kafka.consumer.internal

import zio.{ Executor, Scope, ZIO }
import zio.{ Executor, Scope, URIO, ZIO }

import java.util.concurrent.Executors
import java.util.concurrent.atomic.AtomicLong
Expand All @@ -9,16 +9,16 @@ private[consumer] object RunloopExecutor {

private val counter: AtomicLong = new AtomicLong(0)

private def newSingleThreadedExecutor(i: Long): ZIO[Scope, Throwable, Executor] =
private def newSingleThreadedExecutor(i: Long): URIO[Scope, Executor] =
ZIO.acquireRelease {
ZIO.attempt {
ZIO.succeed {
val javaExecutor =
Executors.newSingleThreadExecutor(runnable => new Thread(runnable, s"zio-kafka-runloop-thread-$i"))

Executor.fromJavaExecutor(javaExecutor) -> javaExecutor
}
} { case (_, executor) => ZIO.attempt(executor.shutdown()).orDie }.map(_._1)

val newInstance: ZIO[Scope, Throwable, Executor] = newSingleThreadedExecutor(counter.getAndIncrement())
val newInstance: URIO[Scope, Executor] = newSingleThreadedExecutor(counter.getAndIncrement())

}

0 comments on commit a6ed3d2

Please sign in to comment.