diff --git a/zio-kafka-bench/src/main/scala/zio/kafka/bench/comparison/ComparisonBenchmark.scala b/zio-kafka-bench/src/main/scala/zio/kafka/bench/comparison/ComparisonBenchmark.scala index 28673b33d1..35e9471933 100644 --- a/zio-kafka-bench/src/main/scala/zio/kafka/bench/comparison/ComparisonBenchmark.scala +++ b/zio-kafka-bench/src/main/scala/zio/kafka/bench/comparison/ComparisonBenchmark.scala @@ -45,8 +45,8 @@ trait ComparisonBenchmark extends ZioBenchmark[Env] { clientId = randomThing("client"), groupId = Some(randomThing("client")), `max.poll.records` = 1000, // A more production worthy value - runloopTimeout = - 1.hour // Absurdly high timeout to avoid the runloop from being interrupted while we're benchmarking other stuff + // Absurdly high timeout to avoid the runloop from being interrupted while we're benchmarking other stuff + runloopTimeout = 1.hour ) ) diff --git a/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala b/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala index 4155c7c68b..4d1cad5d8f 100644 --- a/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala +++ b/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala @@ -12,8 +12,14 @@ import org.apache.kafka.common.TopicPartition import zio._ import zio.kafka.ZIOSpecDefaultSlf4j import zio.kafka.consumer.Consumer.{ AutoOffsetStrategy, OffsetRetrieval } +import zio.kafka.consumer.diagnostics.DiagnosticEvent.Finalization +import zio.kafka.consumer.diagnostics.DiagnosticEvent.Finalization.{ + ConsumerFinalized, + RunloopFinalized, + SubscriptionFinalized +} import zio.kafka.consumer.diagnostics.{ DiagnosticEvent, Diagnostics } -import zio.kafka.producer.TransactionalProducer +import zio.kafka.producer.{ Producer, TransactionalProducer } import zio.kafka.serde.Serde import zio.kafka.testkit.KafkaTestUtils._ import zio.kafka.testkit.{ Kafka, KafkaRandom } @@ -22,8 +28,10 @@ import zio.test.Assertion._ import zio.test.TestAspect._ import zio.test._ +import java.util.concurrent.atomic.AtomicInteger import scala.reflect.ClassTag +//noinspection SimplifyAssertInspection object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { override val kafkaPrefix: String = "consumespec" @@ -1052,11 +1060,188 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { _ <- produceOne(topic, "key2", "message2") _ <- recordsOut.take } yield assertCompletes - } + }, + suite("issue #856")( + test( + "Booting a Consumer to do something else than consuming should not fail with `RunloopTimeout` exception" + ) { + def test(diagnostics: Diagnostics) = + for { + clientId <- randomClient + settings <- consumerSettings(clientId = clientId, runloopTimeout = 500.millis) + _ <- Consumer.make(settings, diagnostics = diagnostics) + _ <- ZIO.sleep(1.second) + } yield assertCompletes + + for { + diagnostics <- Diagnostics.SlidingQueue.make(1000) + testResult <- ZIO.scoped { + test(diagnostics) + } + finalizationEvents <- diagnostics.queue.takeAll.map(_.filter(_.isInstanceOf[Finalization])) + } yield testResult && assert(finalizationEvents)(hasSameElements(Chunk(ConsumerFinalized))) + }, + suite( + "Ordering of finalizers matters. If subscriptions are finalized after the runloop, it creates a deadlock" + )( + test("When not consuming, the Runloop is not started so only the Consumer is finalized") { + def test(diagnostics: Diagnostics): ZIO[Scope & Kafka, Throwable, TestResult] = + for { + clientId <- randomClient + settings <- consumerSettings(clientId = clientId) + _ <- Consumer.make(settings, diagnostics = diagnostics) + } yield assertCompletes + + for { + diagnostics <- Diagnostics.SlidingQueue.make(1000) + testResult <- ZIO.scoped { + test(diagnostics) + } + finalizationEvents <- diagnostics.queue.takeAll.map(_.filter(_.isInstanceOf[Finalization])) + } yield testResult && assert(finalizationEvents)(hasSameElements(Chunk(ConsumerFinalized))) + }, + test("When consuming, the Runloop is started. The finalization orders matters to avoid a deadlock") { + // This test ensures that we're not inadvertently introducing a deadlock by changing the order of finalizers. + + def test(diagnostics: Diagnostics): ZIO[Producer & Scope & Kafka, Throwable, TestResult] = + for { + clientId <- randomClient + topic <- randomTopic + settings <- consumerSettings(clientId = clientId) + consumer <- Consumer.make(settings, diagnostics = diagnostics) + _ <- produceOne(topic, "key1", "message1") + // Starting a consumption session to start the Runloop. + consumed_0 <- consumer + .plainStream(Subscription.manual(topic -> 0), Serde.string, Serde.string) + .take(1) + .runCount + consumed_1 <- consumer + .plainStream(Subscription.manual(topic -> 0), Serde.string, Serde.string) + .take(1) + .runCount + } yield assert(consumed_0)(equalTo(1L)) && assert(consumed_1)(equalTo(1L)) + + for { + diagnostics <- Diagnostics.SlidingQueue.make(1000) + testResult <- ZIO.scoped { + test(diagnostics) + } + finalizationEvents <- diagnostics.queue.takeAll.map(_.filter(_.isInstanceOf[Finalization])) + } yield testResult && assert(finalizationEvents)( + // The order is very important. + // The subscription must be finalized before the runloop, otherwise it creates a deadlock. + equalTo( + Chunk( + SubscriptionFinalized, + SubscriptionFinalized, + RunloopFinalized, + ConsumerFinalized + ) + ) + ) + }, + test( + "Calling `Consumer::stopConsumption` just after starting a forked consumption session should stop the consumption" + ) { + val numberOfMessages: Int = 100000 + val kvs: Iterable[(String, String)] = Iterable.tabulate(numberOfMessages)(i => (s"key-$i", s"msg-$i")) + + def test(diagnostics: Diagnostics): ZIO[Producer & Scope & Kafka, Throwable, TestResult] = + for { + clientId <- randomClient + topic <- randomTopic + settings <- consumerSettings(clientId = clientId) + consumer <- Consumer.make(settings, diagnostics = diagnostics) + _ <- produceMany(topic, kvs) + ref = new AtomicInteger(0) + // Starting a consumption session to start the Runloop. + fiber <- + consumer + .plainStream(Subscription.manual(topic -> 0), Serde.string, Serde.string) + .mapChunksZIO(chunks => ZIO.logDebug(s"Consumed ${ref.getAndAdd(chunks.size)} messages").as(chunks)) + .take(numberOfMessages.toLong) + .runCount + .fork + _ <- consumer.stopConsumption + consumed_0 <- fiber.join + } yield assert(consumed_0)(isLessThan(numberOfMessages.toLong)) + + for { + diagnostics <- Diagnostics.SlidingQueue.make(1000) + testResult <- ZIO.scoped { + test(diagnostics) + } + finalizationEvents <- diagnostics.queue.takeAll.map(_.filter(_.isInstanceOf[Finalization])) + } yield testResult && assert(finalizationEvents)( + // The order is very important. + // The subscription must be finalized before the runloop, otherwise it creates a deadlock. + equalTo( + Chunk( + SubscriptionFinalized, + RunloopFinalized, + ConsumerFinalized + ) + ) + ) + } @@ nonFlaky(5), + test( + "it's possible to start a new consumption session from a Consumer that had a consumption session stopped previously" + ) { + val numberOfMessages: Int = 100000 + val kvs: Iterable[(String, String)] = Iterable.tabulate(numberOfMessages)(i => (s"key-$i", s"msg-$i")) + + def test(diagnostics: Diagnostics): ZIO[Producer & Scope & Kafka, Throwable, TestResult] = + for { + clientId <- randomClient + topic <- randomTopic + settings <- consumerSettings(clientId = clientId) + consumer <- Consumer.make(settings, diagnostics = diagnostics) + _ <- produceMany(topic, kvs) + // Starting a consumption session to start the Runloop. + fiber <- consumer + .plainStream(Subscription.manual(topic -> 0), Serde.string, Serde.string) + .take(numberOfMessages.toLong) + .runCount + .forkScoped + _ <- ZIO.sleep(200.millis) + _ <- consumer.stopConsumption + consumed_0 <- fiber.join + _ <- ZIO.logDebug(s"consumed_0: $consumed_0") + + _ <- ZIO.logDebug("About to sleep 5 seconds") + _ <- ZIO.sleep(5.seconds) + _ <- ZIO.logDebug("Slept 5 seconds") + consumed_1 <- consumer + .plainStream(Subscription.manual(topic -> 0), Serde.string, Serde.string) + .take(numberOfMessages.toLong) + .runCount + } yield assert(consumed_0)(isGreaterThan(0L) && isLessThan(numberOfMessages.toLong)) && + assert(consumed_1)(equalTo(numberOfMessages.toLong)) + + for { + diagnostics <- Diagnostics.SlidingQueue.make(1000) + testResult <- ZIO.scoped { + test(diagnostics) + } + finalizationEvents <- diagnostics.queue.takeAll.map(_.filter(_.isInstanceOf[Finalization])) + } yield testResult && assert(finalizationEvents)( + // The order is very important. + // The subscription must be finalized before the runloop, otherwise it creates a deadlock. + equalTo( + Chunk( + SubscriptionFinalized, + RunloopFinalized, + ConsumerFinalized + ) + ) + ) + } + ) + ) ) .provideSome[Scope & Kafka](producer) .provideSomeShared[Scope]( Kafka.embedded - ) @@ withLiveClock @@ TestAspect.sequential @@ timeout(2.minutes) + ) @@ withLiveClock @@ sequential @@ timeout(2.minutes) } diff --git a/zio-kafka-test/src/test/scala/zio/kafka/consumer/SubscriptionsSpec.scala b/zio-kafka-test/src/test/scala/zio/kafka/consumer/SubscriptionsSpec.scala index 7fdf5ea187..14dc6352c5 100644 --- a/zio-kafka-test/src/test/scala/zio/kafka/consumer/SubscriptionsSpec.scala +++ b/zio-kafka-test/src/test/scala/zio/kafka/consumer/SubscriptionsSpec.scala @@ -13,6 +13,9 @@ import zio.test.Assertion._ import zio.test.TestAspect._ import zio.test._ +import java.util.concurrent.atomic.AtomicInteger + +//noinspection SimplifyAssertInspection object SubscriptionsSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { override val kafkaPrefix: String = "subscriptionsspec" @@ -88,22 +91,94 @@ object SubscriptionsSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { _ <- produceMany(topic1, kvs) _ <- produceMany(topic2, kvs) - result <- - (Consumer + consumer_0 = + Consumer .plainStream(Subscription.topics(topic1), Serde.string, Serde.string) - .runCollect zipPar - Consumer - .plainStream( - Subscription.manual(topic2, 1), - Serde.string, - Serde.string - ) - .runCollect) - .provideSomeLayer[Kafka & Scope](consumer(client, Some(group))) - .unit - .exit + .runCollect + + consumer_1 = + Consumer + .plainStream( + Subscription.manual(topic2, 1), // invalid with the previous subscription + Serde.string, + Serde.string + ) + .runCollect + + result <- (consumer_0 zipPar consumer_1) + .provideSomeLayer[Kafka & Scope](consumer(client, Some(group))) + .unit + .exit } yield assert(result)(fails(isSubtype[InvalidSubscriptionUnion](anything))) }, + test( + "gives an error when attempting to subscribe using a manual subscription when there is already a topic subscription and doesn't fail the already running consuming session" + ) { + val numberOfMessages = 20 + val kvs = (0 to numberOfMessages).toList.map(i => (s"key$i", s"msg$i")) + for { + topic1 <- randomTopic + client <- randomClient + group <- randomGroup + + _ <- produceMany(topic1, kvs) + + counter = new AtomicInteger(1) + + firstMessagesRef <- Ref.make(("", "")) + finalizersRef <- Ref.make(Chunk.empty[String]) + + consumer_0 = + Consumer + .plainStream(Subscription.topics(topic1), Serde.string, Serde.string) + // Here we delay each message to be sure that `consumer_1` will fail while `consumer_0` is still running + .mapZIO { r => + firstMessagesRef.updateSome { case ("", v) => + ("First consumer_0 message", v) + } *> + ZIO + .logDebug(s"Consumed ${counter.getAndIncrement()} records") + .delay(10.millis) + .as(r) + } + .take(numberOfMessages.toLong) + .runCollect + .exit + .zipLeft(finalizersRef.update(_ :+ "consumer_0 finalized")) + + consumer_1 = + Consumer + .plainStream( + Subscription.manual(topic1, 1), // invalid with the previous subscription + Serde.string, + Serde.string + ) + .tapError { _ => + firstMessagesRef.updateSome { case (v, "") => + (v, "consumer_1 error") + } + } + .runCollect + .exit + .zipLeft(finalizersRef.update(_ :+ "consumer_1 finalized")) + + consumerInstance <- consumer(client, Some(group)).build + + fiber_0 <- consumer_0.provideEnvironment(consumerInstance).fork + _ <- ZIO.unit.delay(100.millis) // Wait to be sure that `consumer_0` is running + fiber_1 <- consumer_1.provideEnvironment(consumerInstance).fork + + result_0 <- fiber_0.join + result_1 <- fiber_1.join + + finalizingOrder <- finalizersRef.get + firstMessages <- firstMessagesRef.get + } yield assert(result_0)(succeeds(hasSize(equalTo(numberOfMessages)))) && + assert(result_1)(fails(isSubtype[InvalidSubscriptionUnion](anything))) && + // Here we check that `consumer_0` was running when `consumer_1` failed + assert(firstMessages)(equalTo(("First consumer_0 message", "consumer_1 error"))) && + assert(finalizingOrder)(equalTo(Chunk("consumer_1 finalized", "consumer_0 finalized"))) + } @@ nonFlaky(5), test("distributes records (randomly) from overlapping subscriptions over all subscribers") { val kvs = (1 to 500).toList.map(i => (s"key$i", s"msg$i")) for { diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala index 45754c572a..6c1951b81f 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala @@ -3,9 +3,9 @@ package zio.kafka.consumer import org.apache.kafka.clients.consumer.{ ConsumerRecord, OffsetAndMetadata, OffsetAndTimestamp } import org.apache.kafka.common._ import zio._ +import zio.kafka.consumer.diagnostics.DiagnosticEvent.Finalization import zio.kafka.consumer.diagnostics.Diagnostics -import zio.kafka.consumer.internal.Runloop.ByteArrayCommittableRecord -import zio.kafka.consumer.internal.{ ConsumerAccess, Runloop } +import zio.kafka.consumer.internal.{ ConsumerAccess, RunloopAccess } import zio.kafka.serde.{ Deserializer, Serde } import zio.kafka.utils.SslHelper import zio.stream._ @@ -155,16 +155,11 @@ trait Consumer { } object Consumer { - case object RunloopTimeout extends RuntimeException("Timeout in Runloop") with NoStackTrace private final class Live private[Consumer] ( - private val consumer: ConsumerAccess, - private val runloop: Runloop, - private val subscriptions: Ref.Synchronized[Set[Subscription]], - private val partitionAssignments: Hub[ - Take[Throwable, Chunk[(TopicPartition, ZStream[Any, Throwable, ByteArrayCommittableRecord])]] - ] + consumer: ConsumerAccess, + runloopAccess: RunloopAccess ) extends Consumer { override def assignment: Task[Set[TopicPartition]] = @@ -201,7 +196,9 @@ object Consumer { * Stops consumption of data, drains buffered records, and ends the attached streams while still serving commit * requests. */ - override def stopConsumption: UIO[Unit] = runloop.stopConsumption + override def stopConsumption: UIO[Unit] = + ZIO.logDebug("stopConsumption called") *> + runloopAccess.stopConsumption() override def listTopics(timeout: Duration = Duration.Infinity): Task[Map[String, List[PartitionInfo]]] = consumer.withConsumer(_.listTopics(timeout.asJava).asScala.map { case (k, v) => k -> v.asScala.toList }.toMap) @@ -222,43 +219,20 @@ object Consumer { keyDeserializer: Deserializer[R, K], valueDeserializer: Deserializer[R, V] ): Stream[Throwable, Chunk[(TopicPartition, ZStream[R, Throwable, CommittableRecord[K, V]])]] = { - def extendSubscriptions = subscriptions.updateZIO { existingSubscriptions => - val newSubscriptions = NonEmptyChunk.fromIterable(subscription, existingSubscriptions) - Subscription.unionAll(newSubscriptions) match { - case None => ZIO.fail(InvalidSubscriptionUnion(newSubscriptions)) - case Some(union) => - ZIO.logDebug(s"Changing kafka subscription to $union") *> - subscribe(union).as(newSubscriptions.toSet) - } - }.uninterruptible - - def reduceSubscriptions = subscriptions.updateZIO { existingSubscriptions => - val newSubscriptions = NonEmptyChunk.fromIterableOption(existingSubscriptions - subscription) - val newUnion = newSubscriptions.flatMap(Subscription.unionAll) - - (newUnion match { - case Some(union) => - ZIO.logDebug(s"Reducing kafka subscription to $union") *> subscribe(union) - case None => - ZIO.logDebug(s"Unsubscribing kafka consumer") *> unsubscribe - }).as(newSubscriptions.fold(Set.empty[Subscription])(_.toSet)) - }.uninterruptible - val onlyByteArraySerdes: Boolean = (keyDeserializer eq Serde.byteArray) && (valueDeserializer eq Serde.byteArray) ZStream.unwrapScoped { for { - stream <- ZStream.fromHubScoped(partitionAssignments) - _ <- extendSubscriptions.withFinalizer(_ => reduceSubscriptions.orDie) + stream <- runloopAccess.subscribe(subscription) } yield stream .map(_.exit) .flattenExitOption - .flattenChunks .map { _.collect { case (tp, partitionStream) if Subscription.subscriptionMatches(subscription, tp) => val stream: ZStream[R, Throwable, CommittableRecord[K, V]] = - if (onlyByteArraySerdes) partitionStream.asInstanceOf[ZStream[R, Throwable, CommittableRecord[K, V]]] + if (onlyByteArraySerdes) + partitionStream.asInstanceOf[ZStream[R, Throwable, CommittableRecord[K, V]]] else partitionStream.mapChunksZIO(_.mapZIO(_.deserializeWith(keyDeserializer, valueDeserializer))) tp -> stream @@ -320,12 +294,6 @@ object Consumer { .runDrain } yield () - private def subscribe(subscription: Subscription): Task[Unit] = - runloop.changeSubscription(Some(subscription)) - - private def unsubscribe: Task[Unit] = - runloop.changeSubscription(None) - override def metrics: Task[Map[MetricName, Metric]] = consumer.withConsumer(_.metrics().asScala.toMap) } @@ -345,40 +313,13 @@ object Consumer { def make( settings: ConsumerSettings, diagnostics: Diagnostics = Diagnostics.NoOp - ): ZIO[Scope, Throwable, Consumer] = { - /* - We must supply a queue size for the partitionAssignments hub below. Under most circumstances, - a value of 1 should be sufficient, as runloop.partitions is already an unbounded queue. But if - there is a large skew in speed of consuming partition assignments (not the speed of consuming kafka messages) - between the subscriptions, there may arise a situation where the faster stream is 'blocked' from - getting new partition assignments by the faster stream. A value of 32 should be more than sufficient to cover - this situation. - */ - val hubCapacity = 32 - + ): ZIO[Scope, Throwable, Consumer] = for { - _ <- SslHelper.validateEndpoint(settings.bootstrapServers, settings.properties) - wrapper <- ConsumerAccess.make(settings) - runloop <- Runloop.make( - hasGroupId = settings.hasGroupId, - consumer = wrapper, - pollTimeout = settings.pollTimeout, - diagnostics = diagnostics, - offsetRetrieval = settings.offsetRetrieval, - userRebalanceListener = settings.rebalanceListener, - restartStreamsOnRebalancing = settings.restartStreamOnRebalancing, - runloopTimeout = settings.runloopTimeout, - consumerSettings = settings - ) - subscriptions <- Ref.Synchronized.make(Set.empty[Subscription]) - - partitionAssignments <- ZStream - .fromQueue(runloop.partitionsQueue) - .map(_.exit) - .flattenExitOption - .toHub(hubCapacity) - } yield new Live(wrapper, runloop, subscriptions, partitionAssignments) - } + _ <- ZIO.addFinalizer(diagnostics.emit(Finalization.ConsumerFinalized)) + _ <- SslHelper.validateEndpoint(settings.bootstrapServers, settings.properties) + consumerAccess <- ConsumerAccess.make(settings) + runloopAccess <- RunloopAccess.make(settings, diagnostics, consumerAccess, settings) + } yield new Live(consumerAccess, runloopAccess) /** * Accessor method for [[Consumer.assignment]] diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/diagnostics/DiagnosticEvent.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/diagnostics/DiagnosticEvent.scala index 2a2da75e07..f22f2d7d5b 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/diagnostics/DiagnosticEvent.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/diagnostics/DiagnosticEvent.scala @@ -27,4 +27,11 @@ object DiagnosticEvent { final case class Lost(partitions: Set[TopicPartition]) extends Rebalance } + sealed trait Finalization extends DiagnosticEvent + object Finalization { + case object SubscriptionFinalized extends Finalization + case object RunloopFinalized extends Finalization + case object ConsumerFinalized extends Finalization + } + } diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala index 2776e11491..ba7a10552d 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala @@ -6,9 +6,10 @@ import org.apache.kafka.common.errors.RebalanceInProgressException import zio._ import zio.kafka.consumer.Consumer.{ OffsetRetrieval, RunloopTimeout } import zio.kafka.consumer._ +import zio.kafka.consumer.diagnostics.DiagnosticEvent.Finalization import zio.kafka.consumer.diagnostics.{ DiagnosticEvent, Diagnostics } import zio.kafka.consumer.internal.ConsumerAccess.ByteArrayKafkaConsumer -import zio.kafka.consumer.internal.Runloop._ +import zio.kafka.consumer.internal.RunloopAccess.PartitionAssignment import zio.stream._ import java.util @@ -20,10 +21,9 @@ private[consumer] final class Runloop private ( hasGroupId: Boolean, consumer: ConsumerAccess, pollTimeout: Duration, - runloopTimeout: Duration, commandQueue: Queue[RunloopCommand], lastRebalanceEvent: Ref.Synchronized[Option[Runloop.RebalanceEvent]], - val partitionsQueue: Queue[Take[Throwable, (TopicPartition, Stream[Throwable, ByteArrayCommittableRecord])]], + partitionsHub: Hub[Take[Throwable, PartitionAssignment]], diagnostics: Diagnostics, offsetRetrieval: OffsetRetrieval, userRebalanceListener: RebalanceListener, @@ -36,19 +36,33 @@ private[consumer] final class Runloop private ( PartitionStreamControl.newPartitionStream(tp, commandQueue, diagnostics) def stopConsumption: UIO[Unit] = - commandQueue.offer(RunloopCommand.StopAllStreams).unit - - def changeSubscription( - subscription: Option[Subscription] - ): Task[Unit] = - Promise - .make[Throwable, Unit] - .flatMap { cont => - commandQueue.offer(RunloopCommand.ChangeSubscription(subscription, cont)) *> - cont.await - } - .unit - .uninterruptible + ZIO.logDebug("stopConsumption called") *> + commandQueue.offer(RunloopCommand.StopAllStreams).unit + + private[consumer] def shutdown: UIO[Unit] = + ZIO.logDebug(s"Shutting down runloop initiated") *> + commandQueue + .offerAll( + Chunk( + RunloopCommand.RemoveAllSubscriptions, + RunloopCommand.StopAllStreams, + RunloopCommand.StopRunloop + ) + ) + .unit + + private[internal] def addSubscription(subscription: Subscription): IO[InvalidSubscriptionUnion, Unit] = + for { + _ <- ZIO.logDebug(s"Add subscription $subscription") + promise <- Promise.make[InvalidSubscriptionUnion, Unit] + _ <- commandQueue.offer(RunloopCommand.AddSubscription(subscription, promise)) + _ <- ZIO.logDebug(s"Waiting for subscription $subscription") + _ <- promise.await + _ <- ZIO.logDebug(s"Done for subscription $subscription") + } yield () + + private[internal] def removeSubscription(subscription: Subscription): UIO[Unit] = + commandQueue.offer(RunloopCommand.RemoveSubscription(subscription)).unit private val rebalanceListener: RebalanceListener = { val emitDiagnostics = RebalanceListener( @@ -351,7 +365,7 @@ private[consumer] final class Runloop private ( .foreach(Chunk.fromIterable(pollResult.startingTps))(newPartitionStream) .tap { newStreams => ZIO.logDebug(s"Offering partition assignment ${pollResult.startingTps}") *> - partitionsQueue.offer(Take.chunk(Chunk.fromIterable(newStreams.map(_.tpStream)))) + partitionsHub.publish(Take.chunk(Chunk.fromIterable(newStreams.map(_.tpStream)))) } } runningStreams <- ZIO.filter(pollResult.assignedStreams)(_.isRunning) @@ -369,68 +383,107 @@ private[consumer] final class Runloop private ( assignedStreams = updatedStreams ) - private def handleCommand(state: State, cmd: RunloopCommand.StreamCommand): Task[State] = + private def handleCommand(state: State, cmd: RunloopCommand.StreamCommand): Task[State] = { + def doChangeSubscription(newSubscriptionState: SubscriptionState): Task[State] = + applyNewSubscriptionState(newSubscriptionState).flatMap { newAssignedStreams => + val newState = state.copy( + assignedStreams = state.assignedStreams ++ newAssignedStreams, + subscriptionState = newSubscriptionState + ) + if (newSubscriptionState.isSubscribed) ZIO.succeed(newState) + else + // End all streams and pending requests + endRevokedPartitions( + newState.pendingRequests, + newState.assignedStreams, + isRevoked = _ => true + ).map { revokeResult => + newState.copy( + pendingRequests = revokeResult.pendingRequests, + assignedStreams = revokeResult.assignedStreams + ) + } + } + cmd match { case req: RunloopCommand.Request => ZIO.succeed(state.addRequest(req)) case cmd: RunloopCommand.Commit => doCommit(cmd).as(state.addCommit(cmd)) - case cmd @ RunloopCommand.ChangeSubscription(subscription, _) => - handleChangeSubscription(subscription).flatMap { newAssignedStreams => - val newState = state.copy( - assignedStreams = state.assignedStreams ++ newAssignedStreams, - subscription = subscription - ) - if (subscription.isDefined) ZIO.succeed(newState) - else { - // End all streams and pending requests - endRevokedPartitions( - newState.pendingRequests, - newState.assignedStreams, - isRevoked = _ => true - ).map { revokeResult => - newState.copy( - pendingRequests = revokeResult.pendingRequests, - assignedStreams = revokeResult.assignedStreams - ) + case cmd @ RunloopCommand.AddSubscription(newSubscription, _) => + state.subscriptionState match { + case SubscriptionState.NotSubscribed => + val newSubState = + SubscriptionState.Subscribed(subscriptions = Set(newSubscription), union = newSubscription) + cmd.succeed *> doChangeSubscription(newSubState) + case SubscriptionState.Subscribed(existingSubscriptions, _) => + val subs = NonEmptyChunk.fromIterable(newSubscription, existingSubscriptions) + + Subscription.unionAll(subs) match { + case None => cmd.fail(InvalidSubscriptionUnion(subs)).as(state) + case Some(union) => + val newSubState = + SubscriptionState.Subscribed( + subscriptions = existingSubscriptions + newSubscription, + union = union + ) + cmd.succeed *> doChangeSubscription(newSubState) + } + } + case RunloopCommand.RemoveSubscription(subscription) => + state.subscriptionState match { + case SubscriptionState.NotSubscribed => ZIO.succeed(state) + case SubscriptionState.Subscribed(existingSubscriptions, _) => + val newUnion: Option[(Subscription, NonEmptyChunk[Subscription])] = + NonEmptyChunk + .fromIterableOption(existingSubscriptions - subscription) + .flatMap(subs => Subscription.unionAll(subs).map(_ -> subs)) + + newUnion match { + case Some((union, newSubscriptions)) => + val newSubState = + SubscriptionState.Subscribed(subscriptions = newSubscriptions.toSet, union = union) + doChangeSubscription(newSubState) + case None => + ZIO.logDebug(s"Unsubscribing kafka consumer") *> + doChangeSubscription(SubscriptionState.NotSubscribed) } - } } - .tapBoth(e => cmd.fail(e), _ => cmd.succeed) - .uninterruptible + case RunloopCommand.RemoveAllSubscriptions => doChangeSubscription(SubscriptionState.NotSubscribed) case RunloopCommand.StopAllStreams => for { _ <- ZIO.logDebug("Stop all streams initiated") _ <- ZIO.foreachDiscard(state.assignedStreams)(_.end()) - _ <- partitionsQueue.offer(Take.end) + _ <- partitionsHub.publish(Take.end) _ <- ZIO.logDebug("Stop all streams done") } yield state.copy(pendingRequests = Chunk.empty) } + } - private def handleChangeSubscription( - newSubscription: Option[Subscription] + private def applyNewSubscriptionState( + newSubscriptionState: SubscriptionState ): Task[Chunk[PartitionStreamControl]] = consumer.runloopAccess { c => - newSubscription match { - case None => + newSubscriptionState match { + case SubscriptionState.NotSubscribed => ZIO .attempt(c.unsubscribe()) .as(Chunk.empty) - case Some(Subscription.Pattern(pattern)) => + case SubscriptionState.Subscribed(_, Subscription.Pattern(pattern)) => val rc = RebalanceConsumer.Live(c) ZIO .attempt(c.subscribe(pattern.pattern, rebalanceListener.toKafka(runtime, rc))) .as(Chunk.empty) - case Some(Subscription.Topics(topics)) => + case SubscriptionState.Subscribed(_, Subscription.Topics(topics)) => val rc = RebalanceConsumer.Live(c) ZIO .attempt(c.subscribe(topics.asJava, rebalanceListener.toKafka(runtime, rc))) .as(Chunk.empty) - case Some(Subscription.Manual(topicPartitions)) => + case SubscriptionState.Subscribed(_, Subscription.Manual(topicPartitions)) => // For manual subscriptions we have to do some manual work before starting the run loop for { _ <- ZIO.attempt(c.assign(topicPartitions.asJava)) _ <- doSeekForNewPartitions(c, topicPartitions) partitionStreams <- ZIO.foreach(Chunk.fromIterable(topicPartitions))(newPartitionStream) - _ <- partitionsQueue.offer(Take.chunk(partitionStreams.map(_.tpStream))) + _ <- partitionsHub.publish(Take.chunk(partitionStreams.map(_.tpStream))) } yield partitionStreams } } @@ -452,7 +505,7 @@ private[consumer] final class Runloop private ( ZStream .fromQueue(commandQueue) - .timeoutFail[Throwable](RunloopTimeout)(runloopTimeout) + .timeoutFail[Throwable](RunloopTimeout)(consumerSettings.runloopTimeout) .takeWhile(_ != RunloopCommand.StopRunloop) .runFoldChunksDiscardZIO(State.initial) { (state, commands) => for { @@ -467,7 +520,7 @@ private[consumer] final class Runloop private ( } yield updatedStateAfterPoll } .tapErrorCause(cause => ZIO.logErrorCause("Error in Runloop", cause)) - .onError(cause => partitionsQueue.offer(Take.failCause(cause))) + .onError(cause => partitionsHub.offer(Take.failCause(cause))) } } @@ -494,9 +547,6 @@ private[consumer] object Runloop { type ByteArrayCommittableRecord = CommittableRecord[Array[Byte], Array[Byte]] - // Internal parameters, should not be necessary to tune - private val commandQueueSize = 1024 - private final case class PollResult( startingTps: Set[TopicPartition], pendingRequests: Chunk[RunloopCommand.Request], @@ -522,6 +572,9 @@ private[consumer] object Runloop { ) extends RebalanceEvent } + // Internal parameters, should not be necessary to tune + private final val commandQueueSize: Int = 1024 + def make( hasGroupId: Boolean, consumer: ConsumerAccess, @@ -530,29 +583,23 @@ private[consumer] object Runloop { offsetRetrieval: OffsetRetrieval, userRebalanceListener: RebalanceListener, restartStreamsOnRebalancing: Boolean, - runloopTimeout: Duration, + partitionsHub: Hub[Take[Throwable, PartitionAssignment]], consumerSettings: ConsumerSettings - ): ZIO[Scope, Throwable, Runloop] = + ): URIO[Scope, Runloop] = for { + _ <- ZIO.addFinalizer(diagnostics.emit(Finalization.RunloopFinalized)) commandQueue <- ZIO.acquireRelease(Queue.bounded[RunloopCommand](commandQueueSize))(_.shutdown) lastRebalanceEvent <- Ref.Synchronized.make[Option[Runloop.RebalanceEvent]](None) - partitionsQueue <- ZIO.acquireRelease( - Queue - .unbounded[ - Take[Throwable, (TopicPartition, Stream[Throwable, ByteArrayCommittableRecord])] - ] - )(_.shutdown) - currentStateRef <- Ref.make(State.initial) - runtime <- ZIO.runtime[Any] + currentStateRef <- Ref.make(State.initial) + runtime <- ZIO.runtime[Any] runloop = new Runloop( runtime = runtime, hasGroupId = hasGroupId, consumer = consumer, pollTimeout = pollTimeout, - runloopTimeout = runloopTimeout, commandQueue = commandQueue, lastRebalanceEvent = lastRebalanceEvent, - partitionsQueue = partitionsQueue, + partitionsHub = partitionsHub, diagnostics = diagnostics, offsetRetrieval = offsetRetrieval, userRebalanceListener = userRebalanceListener, @@ -568,9 +615,8 @@ private[consumer] object Runloop { waitForRunloopStop = fiber.join.orDie _ <- ZIO.addFinalizer( - ZIO.logTrace("Shutting down Runloop") *> - commandQueue.offer(RunloopCommand.StopAllStreams) *> - commandQueue.offer(RunloopCommand.StopRunloop) *> + ZIO.logDebug("Shutting down Runloop") *> + runloop.shutdown *> waitForRunloopStop <* ZIO.logDebug("Shut down Runloop") ) @@ -581,15 +627,13 @@ private[internal] final case class State( pendingRequests: Chunk[RunloopCommand.Request], pendingCommits: Chunk[RunloopCommand.Commit], assignedStreams: Chunk[PartitionStreamControl], - subscription: Option[Subscription] + subscriptionState: SubscriptionState ) { def addCommit(c: RunloopCommand.Commit): State = copy(pendingCommits = pendingCommits :+ c) def addRequest(r: RunloopCommand.Request): State = copy(pendingRequests = pendingRequests :+ r) - def isSubscribed: Boolean = subscription.isDefined - def shouldPoll: Boolean = - isSubscribed && (pendingRequests.nonEmpty || pendingCommits.nonEmpty || assignedStreams.isEmpty) + subscriptionState.isSubscribed && (pendingRequests.nonEmpty || pendingCommits.nonEmpty || assignedStreams.isEmpty) } object State { @@ -597,6 +641,6 @@ object State { pendingRequests = Chunk.empty, pendingCommits = Chunk.empty, assignedStreams = Chunk.empty, - subscription = None + subscriptionState = SubscriptionState.NotSubscribed ) } diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/RunloopAccess.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/RunloopAccess.scala new file mode 100644 index 0000000000..e5f8399c15 --- /dev/null +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/RunloopAccess.scala @@ -0,0 +1,119 @@ +package zio.kafka.consumer.internal + +import org.apache.kafka.common.TopicPartition +import zio.kafka.consumer.diagnostics.DiagnosticEvent.Finalization +import zio.kafka.consumer.diagnostics.Diagnostics +import zio.kafka.consumer.internal.Runloop.ByteArrayCommittableRecord +import zio.kafka.consumer.internal.RunloopAccess.PartitionAssignment +import zio.kafka.consumer.{ ConsumerSettings, InvalidSubscriptionUnion, Subscription } +import zio.stream.{ Stream, Take, UStream, ZStream } +import zio.{ durationInt, Hub, IO, Ref, Scope, UIO, ZIO, ZLayer } + +private[internal] sealed trait RunloopState +private[internal] object RunloopState { + case object NotStarted extends RunloopState + final case class Started(runloop: Runloop) extends RunloopState + case object Stopped extends RunloopState +} + +/** + * This [[RunloopAccess]] is here to make the [[Runloop]] instantiation/boot lazy: we only starts it when the user is + * starting a consuming session. + * + * This is needed because a Consumer can be used to do something else than consuming (e.g. fetching Kafka topics + * metadata) + */ +private[consumer] final class RunloopAccess private ( + runloopStateRef: Ref.Synchronized[RunloopState], + partitionHub: Hub[Take[Throwable, PartitionAssignment]], + makeRunloop: UIO[Runloop], + diagnostics: Diagnostics +) { + private def runloop(shouldStartIfNot: Boolean): UIO[RunloopState] = + runloopStateRef.updateSomeAndGetZIO { + case RunloopState.NotStarted if shouldStartIfNot => makeRunloop.map(RunloopState.Started.apply) + } + private def withRunloopZIO[E, A](shouldStartIfNot: Boolean)(f: Runloop => IO[E, A]): IO[E, A] = + runloop(shouldStartIfNot).flatMap { + case RunloopState.Stopped => ZIO.unit.asInstanceOf[IO[E, A]] + case RunloopState.NotStarted => ZIO.unit.asInstanceOf[IO[E, A]] + case RunloopState.Started(runloop) => f(runloop) + } + + /** + * No need to call `Runloop::stopConsumption` if the Runloop has not been started or has been stopped. + * + * Note: + * 1. We do a 100 retries waiting 10ms between each to roughly take max 1s before to stop to retry. We want to avoid + * an infinite loop. We need this recursion because if the user calls `stopConsumption` before the Runloop is + * started, we need to wait for it to be started. Can happen if the user starts a consuming session in a forked + * fiber and immediately after forking, stops it. The Runloop will potentially not be started yet. + */ + // noinspection SimplifyUnlessInspection + def stopConsumption(retry: Int = 100, initialCall: Boolean = true): UIO[Unit] = { + @inline def next: UIO[Unit] = stopConsumption(retry - 1, initialCall = false) + + runloop(shouldStartIfNot = false).flatMap { + case RunloopState.Stopped => ZIO.unit + case RunloopState.Started(runloop) => runloop.stopConsumption + case RunloopState.NotStarted => + if (retry <= 0) ZIO.unit + else if (initialCall) next + else next.delay(10.millis) + } + } + + /** + * We're doing all of these things in this method so that the interface of this class is as simple as possible and + * there's no mistake possible for the caller. + * + * The external world (Consumer) doesn't need to know how we "subscribe", "unsubscribe", etc. internally. + */ + def subscribe( + subscription: Subscription + ): ZIO[Scope, InvalidSubscriptionUnion, UStream[Take[Throwable, PartitionAssignment]]] = + for { + stream <- ZStream.fromHubScoped(partitionHub) + // starts the Runloop if not already started + _ <- withRunloopZIO(shouldStartIfNot = true)(_.addSubscription(subscription)) + _ <- ZIO.addFinalizer { + withRunloopZIO(shouldStartIfNot = false)(_.removeSubscription(subscription)) <* + diagnostics.emit(Finalization.SubscriptionFinalized) + } + } yield stream + +} + +private[consumer] object RunloopAccess { + type PartitionAssignment = (TopicPartition, Stream[Throwable, ByteArrayCommittableRecord]) + + def make( + settings: ConsumerSettings, + diagnostics: Diagnostics = Diagnostics.NoOp, + consumerAccess: ConsumerAccess, + consumerSettings: ConsumerSettings + ): ZIO[Scope, Throwable, RunloopAccess] = + for { + // This scope allows us to link the lifecycle of the Runloop and of the Hub to the lifecycle of the Consumer + // When the Consumer is shutdown, the Runloop and the Hub will be shutdown too (before the consumer) + consumerScope <- ZIO.scope + partitionsHub <- ZIO + .acquireRelease(Hub.unbounded[Take[Throwable, PartitionAssignment]])(_.shutdown) + .provide(ZLayer.succeed(consumerScope)) + runloopStateRef <- Ref.Synchronized.make[RunloopState](RunloopState.NotStarted) + makeRunloop = Runloop + .make( + hasGroupId = settings.hasGroupId, + consumer = consumerAccess, + pollTimeout = settings.pollTimeout, + diagnostics = diagnostics, + offsetRetrieval = settings.offsetRetrieval, + userRebalanceListener = settings.rebalanceListener, + restartStreamsOnRebalancing = settings.restartStreamOnRebalancing, + partitionsHub = partitionsHub, + consumerSettings = consumerSettings + ) + .withFinalizer(_ => runloopStateRef.set(RunloopState.Stopped)) + .provide(ZLayer.succeed(consumerScope)) + } yield new RunloopAccess(runloopStateRef, partitionsHub, makeRunloop, diagnostics) +} diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/RunloopCommand.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/RunloopCommand.scala index 2d226f4a92..f737c58641 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/RunloopCommand.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/RunloopCommand.scala @@ -2,7 +2,7 @@ package zio.kafka.consumer.internal import org.apache.kafka.common.TopicPartition import zio._ -import zio.kafka.consumer.Subscription +import zio.kafka.consumer.{ InvalidSubscriptionUnion, Subscription } sealed trait RunloopCommand object RunloopCommand { @@ -20,20 +20,18 @@ object RunloopCommand { case object StopAllStreams extends StreamCommand final case class Commit(offsets: Map[TopicPartition, Long], cont: Promise[Throwable, Unit]) extends StreamCommand { - @inline def isDone: UIO[Boolean] = cont.isDone - + @inline def isDone: UIO[Boolean] = cont.isDone @inline def isPending: UIO[Boolean] = isDone.negate } /** Used by a stream to request more records. */ final case class Request(tp: TopicPartition) extends StreamCommand - final case class ChangeSubscription( - subscription: Option[Subscription], - cont: Promise[Throwable, Unit] - ) extends StreamCommand { - @inline def succeed: UIO[Boolean] = cont.succeed(()) - - @inline def fail(throwable: Throwable): UIO[Boolean] = cont.fail(throwable) + final case class AddSubscription(subscription: Subscription, cont: Promise[InvalidSubscriptionUnion, Unit]) + extends StreamCommand { + @inline def succeed: UIO[Unit] = cont.succeed(()).unit + @inline def fail(e: InvalidSubscriptionUnion): UIO[Unit] = cont.fail(e).unit } + final case class RemoveSubscription(subscription: Subscription) extends StreamCommand + case object RemoveAllSubscriptions extends StreamCommand } diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/RunloopExecutor.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/RunloopExecutor.scala index 2e901e3c7e..bc3d6d60a4 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/RunloopExecutor.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/RunloopExecutor.scala @@ -1,6 +1,6 @@ package zio.kafka.consumer.internal -import zio.{ Executor, Scope, ZIO } +import zio.{ Executor, Scope, URIO, ZIO } import java.util.concurrent.Executors import java.util.concurrent.atomic.AtomicLong @@ -9,9 +9,9 @@ private[consumer] object RunloopExecutor { private val counter: AtomicLong = new AtomicLong(0) - private val newSingleThreadedExecutor: ZIO[Scope, Throwable, Executor] = + private val newSingleThreadedExecutor: URIO[Scope, Executor] = ZIO.acquireRelease { - ZIO.attempt { + ZIO.succeed { val javaExecutor = Executors.newSingleThreadExecutor { runnable => new Thread(runnable, s"zio-kafka-runloop-thread-${counter.getAndIncrement()}") @@ -21,6 +21,6 @@ private[consumer] object RunloopExecutor { } } { case (_, executor) => ZIO.attempt(executor.shutdown()).orDie }.map(_._1) - val newInstance: ZIO[Scope, Throwable, Executor] = newSingleThreadedExecutor + val newInstance: URIO[Scope, Executor] = newSingleThreadedExecutor } diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/SubscriptionState.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/SubscriptionState.scala new file mode 100644 index 0000000000..09113dbdda --- /dev/null +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/SubscriptionState.scala @@ -0,0 +1,15 @@ +package zio.kafka.consumer.internal + +import zio.kafka.consumer.Subscription + +private[internal] sealed trait SubscriptionState { + def isSubscribed: Boolean = + this match { + case _: SubscriptionState.Subscribed => true + case SubscriptionState.NotSubscribed => false + } +} +private[internal] object SubscriptionState { + case object NotSubscribed extends SubscriptionState + final case class Subscribed(subscriptions: Set[Subscription], union: Subscription) extends SubscriptionState +}