Skip to content

Commit

Permalink
Using a Consumer without consuming shouldn't throw a RunloopTimeout
Browse files Browse the repository at this point in the history
… exception
  • Loading branch information
guizmaii committed Jun 24, 2023
1 parent 7472bad commit 074bddf
Show file tree
Hide file tree
Showing 10 changed files with 564 additions and 180 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ trait ComparisonBenchmark extends ZioBenchmark[Env] {
clientId = randomThing("client"),
groupId = Some(randomThing("client")),
`max.poll.records` = 1000, // A more production worthy value
runloopTimeout =
1.hour // Absurdly high timeout to avoid the runloop from being interrupted while we're benchmarking other stuff
// Absurdly high timeout to avoid the runloop from being interrupted while we're benchmarking other stuff
runloopTimeout = 1.hour
)
)

Expand Down
191 changes: 188 additions & 3 deletions zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,14 @@ import org.apache.kafka.common.TopicPartition
import zio._
import zio.kafka.ZIOSpecDefaultSlf4j
import zio.kafka.consumer.Consumer.{ AutoOffsetStrategy, OffsetRetrieval }
import zio.kafka.consumer.diagnostics.DiagnosticEvent.Finalization
import zio.kafka.consumer.diagnostics.DiagnosticEvent.Finalization.{
ConsumerFinalized,
RunloopFinalized,
SubscriptionFinalized
}
import zio.kafka.consumer.diagnostics.{ DiagnosticEvent, Diagnostics }
import zio.kafka.producer.TransactionalProducer
import zio.kafka.producer.{ Producer, TransactionalProducer }
import zio.kafka.serde.Serde
import zio.kafka.testkit.KafkaTestUtils._
import zio.kafka.testkit.{ Kafka, KafkaRandom }
Expand All @@ -22,8 +28,10 @@ import zio.test.Assertion._
import zio.test.TestAspect._
import zio.test._

import java.util.concurrent.atomic.AtomicInteger
import scala.reflect.ClassTag

//noinspection SimplifyAssertInspection
object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
override val kafkaPrefix: String = "consumespec"

Expand Down Expand Up @@ -1052,11 +1060,188 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
_ <- produceOne(topic, "key2", "message2")
_ <- recordsOut.take
} yield assertCompletes
}
},
suite("issue #856")(
test(
"Booting a Consumer to do something else than consuming should not fail with `RunloopTimeout` exception"
) {
def test(diagnostics: Diagnostics) =
for {
clientId <- randomClient
settings <- consumerSettings(clientId = clientId, runloopTimeout = 500.millis)
_ <- Consumer.make(settings, diagnostics = diagnostics)
_ <- ZIO.sleep(1.second)
} yield assertCompletes

for {
diagnostics <- Diagnostics.SlidingQueue.make(1000)
testResult <- ZIO.scoped {
test(diagnostics)
}
finalizationEvents <- diagnostics.queue.takeAll.map(_.filter(_.isInstanceOf[Finalization]))
} yield testResult && assert(finalizationEvents)(hasSameElements(Chunk(ConsumerFinalized)))
},
suite(
"Ordering of finalizers matters. If subscriptions are finalized after the runloop, it creates a deadlock"
)(
test("When not consuming, the Runloop is not started so only the Consumer is finalized") {
def test(diagnostics: Diagnostics): ZIO[Scope & Kafka, Throwable, TestResult] =
for {
clientId <- randomClient
settings <- consumerSettings(clientId = clientId)
_ <- Consumer.make(settings, diagnostics = diagnostics)
} yield assertCompletes

for {
diagnostics <- Diagnostics.SlidingQueue.make(1000)
testResult <- ZIO.scoped {
test(diagnostics)
}
finalizationEvents <- diagnostics.queue.takeAll.map(_.filter(_.isInstanceOf[Finalization]))
} yield testResult && assert(finalizationEvents)(hasSameElements(Chunk(ConsumerFinalized)))
},
test("When consuming, the Runloop is started. The finalization orders matters to avoid a deadlock") {
// This test ensures that we're not inadvertently introducing a deadlock by changing the order of finalizers.

def test(diagnostics: Diagnostics): ZIO[Producer & Scope & Kafka, Throwable, TestResult] =
for {
clientId <- randomClient
topic <- randomTopic
settings <- consumerSettings(clientId = clientId)
consumer <- Consumer.make(settings, diagnostics = diagnostics)
_ <- produceOne(topic, "key1", "message1")
// Starting a consumption session to start the Runloop.
consumed_0 <- consumer
.plainStream(Subscription.manual(topic -> 0), Serde.string, Serde.string)
.take(1)
.runCount
consumed_1 <- consumer
.plainStream(Subscription.manual(topic -> 0), Serde.string, Serde.string)
.take(1)
.runCount
} yield assert(consumed_0)(equalTo(1L)) && assert(consumed_1)(equalTo(1L))

for {
diagnostics <- Diagnostics.SlidingQueue.make(1000)
testResult <- ZIO.scoped {
test(diagnostics)
}
finalizationEvents <- diagnostics.queue.takeAll.map(_.filter(_.isInstanceOf[Finalization]))
} yield testResult && assert(finalizationEvents)(
// The order is very important.
// The subscription must be finalized before the runloop, otherwise it creates a deadlock.
equalTo(
Chunk(
SubscriptionFinalized,
SubscriptionFinalized,
RunloopFinalized,
ConsumerFinalized
)
)
)
},
test(
"Calling `Consumer::stopConsumption` just after starting a forked consumption session should stop the consumption"
) {
val numberOfMessages: Int = 100000
val kvs: Iterable[(String, String)] = Iterable.tabulate(numberOfMessages)(i => (s"key-$i", s"msg-$i"))

def test(diagnostics: Diagnostics): ZIO[Producer & Scope & Kafka, Throwable, TestResult] =
for {
clientId <- randomClient
topic <- randomTopic
settings <- consumerSettings(clientId = clientId)
consumer <- Consumer.make(settings, diagnostics = diagnostics)
_ <- produceMany(topic, kvs)
ref = new AtomicInteger(0)
// Starting a consumption session to start the Runloop.
fiber <-
consumer
.plainStream(Subscription.manual(topic -> 0), Serde.string, Serde.string)
.mapChunksZIO(chunks => ZIO.logDebug(s"Consumed ${ref.getAndAdd(chunks.size)} messages").as(chunks))
.take(numberOfMessages.toLong)
.runCount
.fork
_ <- consumer.stopConsumption
consumed_0 <- fiber.join
} yield assert(consumed_0)(isLessThan(numberOfMessages.toLong))

for {
diagnostics <- Diagnostics.SlidingQueue.make(1000)
testResult <- ZIO.scoped {
test(diagnostics)
}
finalizationEvents <- diagnostics.queue.takeAll.map(_.filter(_.isInstanceOf[Finalization]))
} yield testResult && assert(finalizationEvents)(
// The order is very important.
// The subscription must be finalized before the runloop, otherwise it creates a deadlock.
equalTo(
Chunk(
SubscriptionFinalized,
RunloopFinalized,
ConsumerFinalized
)
)
)
} @@ nonFlaky(5),
test(
"it's possible to start a new consumption session from a Consumer that had a consumption session stopped previously"
) {
val numberOfMessages: Int = 100000
val kvs: Iterable[(String, String)] = Iterable.tabulate(numberOfMessages)(i => (s"key-$i", s"msg-$i"))

def test(diagnostics: Diagnostics): ZIO[Producer & Scope & Kafka, Throwable, TestResult] =
for {
clientId <- randomClient
topic <- randomTopic
settings <- consumerSettings(clientId = clientId)
consumer <- Consumer.make(settings, diagnostics = diagnostics)
_ <- produceMany(topic, kvs)
// Starting a consumption session to start the Runloop.
fiber <- consumer
.plainStream(Subscription.manual(topic -> 0), Serde.string, Serde.string)
.take(numberOfMessages.toLong)
.runCount
.forkScoped
_ <- ZIO.sleep(200.millis)
_ <- consumer.stopConsumption
consumed_0 <- fiber.join
_ <- ZIO.logDebug(s"consumed_0: $consumed_0")

_ <- ZIO.logDebug("About to sleep 5 seconds")
_ <- ZIO.sleep(5.seconds)
_ <- ZIO.logDebug("Slept 5 seconds")
consumed_1 <- consumer
.plainStream(Subscription.manual(topic -> 0), Serde.string, Serde.string)
.take(numberOfMessages.toLong)
.runCount
} yield assert(consumed_0)(isGreaterThan(0L) && isLessThan(numberOfMessages.toLong)) &&
assert(consumed_1)(equalTo(numberOfMessages.toLong))

for {
diagnostics <- Diagnostics.SlidingQueue.make(1000)
testResult <- ZIO.scoped {
test(diagnostics)
}
finalizationEvents <- diagnostics.queue.takeAll.map(_.filter(_.isInstanceOf[Finalization]))
} yield testResult && assert(finalizationEvents)(
// The order is very important.
// The subscription must be finalized before the runloop, otherwise it creates a deadlock.
equalTo(
Chunk(
SubscriptionFinalized,
RunloopFinalized,
ConsumerFinalized
)
)
)
}
)
)
)
.provideSome[Scope & Kafka](producer)
.provideSomeShared[Scope](
Kafka.embedded
) @@ withLiveClock @@ TestAspect.sequential @@ timeout(2.minutes)
) @@ withLiveClock @@ sequential @@ timeout(2.minutes)

}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ import zio.test.Assertion._
import zio.test.TestAspect._
import zio.test._

import java.util.concurrent.atomic.AtomicInteger

//noinspection SimplifyAssertInspection
object SubscriptionsSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
override val kafkaPrefix: String = "subscriptionsspec"

Expand Down Expand Up @@ -88,22 +91,94 @@ object SubscriptionsSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
_ <- produceMany(topic1, kvs)
_ <- produceMany(topic2, kvs)

result <-
(Consumer
consumer_0 =
Consumer
.plainStream(Subscription.topics(topic1), Serde.string, Serde.string)
.runCollect zipPar
Consumer
.plainStream(
Subscription.manual(topic2, 1),
Serde.string,
Serde.string
)
.runCollect)
.provideSomeLayer[Kafka & Scope](consumer(client, Some(group)))
.unit
.exit
.runCollect

consumer_1 =
Consumer
.plainStream(
Subscription.manual(topic2, 1), // invalid with the previous subscription
Serde.string,
Serde.string
)
.runCollect

result <- (consumer_0 zipPar consumer_1)
.provideSomeLayer[Kafka & Scope](consumer(client, Some(group)))
.unit
.exit
} yield assert(result)(fails(isSubtype[InvalidSubscriptionUnion](anything)))
},
test(
"gives an error when attempting to subscribe using a manual subscription when there is already a topic subscription and doesn't fail the already running consuming session"
) {
val numberOfMessages = 20
val kvs = (0 to numberOfMessages).toList.map(i => (s"key$i", s"msg$i"))
for {
topic1 <- randomTopic
client <- randomClient
group <- randomGroup

_ <- produceMany(topic1, kvs)

counter = new AtomicInteger(1)

firstMessagesRef <- Ref.make(("", ""))
finalizersRef <- Ref.make(Chunk.empty[String])

consumer_0 =
Consumer
.plainStream(Subscription.topics(topic1), Serde.string, Serde.string)
// Here we delay each message to be sure that `consumer_1` will fail while `consumer_0` is still running
.mapZIO { r =>
firstMessagesRef.updateSome { case ("", v) =>
("First consumer_0 message", v)
} *>
ZIO
.logDebug(s"Consumed ${counter.getAndIncrement()} records")
.delay(10.millis)
.as(r)
}
.take(numberOfMessages.toLong)
.runCollect
.exit
.zipLeft(finalizersRef.update(_ :+ "consumer_0 finalized"))

consumer_1 =
Consumer
.plainStream(
Subscription.manual(topic1, 1), // invalid with the previous subscription
Serde.string,
Serde.string
)
.tapError { _ =>
firstMessagesRef.updateSome { case (v, "") =>
(v, "consumer_1 error")
}
}
.runCollect
.exit
.zipLeft(finalizersRef.update(_ :+ "consumer_1 finalized"))

consumerInstance <- consumer(client, Some(group)).build

fiber_0 <- consumer_0.provideEnvironment(consumerInstance).fork
_ <- ZIO.unit.delay(100.millis) // Wait to be sure that `consumer_0` is running
fiber_1 <- consumer_1.provideEnvironment(consumerInstance).fork

result_0 <- fiber_0.join
result_1 <- fiber_1.join

finalizingOrder <- finalizersRef.get
firstMessages <- firstMessagesRef.get
} yield assert(result_0)(succeeds(hasSize(equalTo(numberOfMessages)))) &&
assert(result_1)(fails(isSubtype[InvalidSubscriptionUnion](anything))) &&
// Here we check that `consumer_0` was running when `consumer_1` failed
assert(firstMessages)(equalTo(("First consumer_0 message", "consumer_1 error"))) &&
assert(finalizingOrder)(equalTo(Chunk("consumer_1 finalized", "consumer_0 finalized")))
} @@ nonFlaky(5),
test("distributes records (randomly) from overlapping subscriptions over all subscribers") {
val kvs = (1 to 500).toList.map(i => (s"key$i", s"msg$i"))
for {
Expand Down
Loading

0 comments on commit 074bddf

Please sign in to comment.