Skip to content

Commit

Permalink
Using a Consumer without consuming shouldn't throw a RunloopTimeout
Browse files Browse the repository at this point in the history
… exception
  • Loading branch information
guizmaii committed Jun 3, 2023
1 parent 51fa26a commit 626029d
Show file tree
Hide file tree
Showing 14 changed files with 508 additions and 235 deletions.
14 changes: 7 additions & 7 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ jobs:
- name: Install libuv
run: sudo apt-get update && sudo apt-get install -y libuv1-dev
- name: Setup Scala
uses: actions/setup-java@v3.10.0
uses: actions/setup-java@v3.11.0
with:
distribution: temurin
java-version: '8'
Expand All @@ -54,7 +54,7 @@ jobs:
- name: Install libuv
run: sudo apt-get update && sudo apt-get install -y libuv1-dev
- name: Setup Scala
uses: actions/setup-java@v3.10.0
uses: actions/setup-java@v3.11.0
with:
distribution: temurin
java-version: '8'
Expand All @@ -80,7 +80,7 @@ jobs:
- name: Install libuv
run: sudo apt-get update && sudo apt-get install -y libuv1-dev
- name: Setup Scala
uses: actions/setup-java@v3.10.0
uses: actions/setup-java@v3.11.0
with:
distribution: temurin
java-version: ${{ matrix.java }}
Expand All @@ -92,7 +92,7 @@ jobs:
with:
fetch-depth: '0'
- name: Test
run: sbt ++test
run: sbt +test
update-readme:
name: Update README
runs-on: ubuntu-latest
Expand All @@ -106,7 +106,7 @@ jobs:
- name: Install libuv
run: sudo apt-get update && sudo apt-get install -y libuv1-dev
- name: Setup Scala
uses: actions/setup-java@v3.10.0
uses: actions/setup-java@v3.11.0
with:
distribution: temurin
java-version: '8'
Expand Down Expand Up @@ -180,7 +180,7 @@ jobs:
- name: Install libuv
run: sudo apt-get update && sudo apt-get install -y libuv1-dev
- name: Setup Scala
uses: actions/setup-java@v3.10.0
uses: actions/setup-java@v3.11.0
with:
distribution: temurin
java-version: '8'
Expand Down Expand Up @@ -209,7 +209,7 @@ jobs:
- name: Install libuv
run: sudo apt-get update && sudo apt-get install -y libuv1-dev
- name: Setup Scala
uses: actions/setup-java@v3.10.0
uses: actions/setup-java@v3.11.0
with:
distribution: temurin
java-version: '8'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import zio.kafka.producer.Producer
import zio.kafka.serde.Serde
import zio.kafka.testkit.Kafka
import zio.kafka.testkit.KafkaTestUtils.{ consumerSettings, minimalConsumer, produceMany, producer }
import zio.{ durationInt, ULayer, ZIO, ZLayer }
import zio.{ ULayer, ZIO, ZLayer }

import java.util.concurrent.TimeUnit
import scala.jdk.CollectionConverters._
Expand Down Expand Up @@ -55,9 +55,7 @@ class ConsumersComparisonBenchmark extends ZioBenchmark[Env] {
consumerSettings(
clientId = randomThing("client"),
groupId = Some(randomThing("client")),
`max.poll.records` = 1000, // A more production worthy value
runloopTimeout =
1.hour // Absurdly high timeout to avoid the runloop from being interrupted while we're benchmarking other stuff
`max.poll.records` = 1000 // A more production worthy value
)
)

Expand Down
2 changes: 1 addition & 1 deletion zio-kafka-test/src/test/scala/zio/kafka/ProducerSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -470,5 +470,5 @@ object ProducerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
)
.provideSomeShared[Scope](
Kafka.embedded
) @@ withLiveClock @@ timeout(2.minutes) @@ sequential
) @@ withLiveClock @@ timeout(3.minutes) @@ sequential
}
189 changes: 184 additions & 5 deletions zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,14 @@ import org.apache.kafka.common.TopicPartition
import zio._
import zio.kafka.ZIOSpecDefaultSlf4j
import zio.kafka.consumer.Consumer.{ AutoOffsetStrategy, OffsetRetrieval }
import zio.kafka.consumer.diagnostics.DiagnosticEvent.Finalization
import zio.kafka.consumer.diagnostics.DiagnosticEvent.Finalization.{
ConsumerFinalized,
RunloopFinalized,
SubscriptionFinalized
}
import zio.kafka.consumer.diagnostics.{ DiagnosticEvent, Diagnostics }
import zio.kafka.producer.TransactionalProducer
import zio.kafka.producer.{ Producer, TransactionalProducer }
import zio.kafka.serde.Serde
import zio.kafka.testkit.KafkaTestUtils._
import zio.kafka.testkit.{ Kafka, KafkaRandom }
Expand All @@ -24,6 +30,7 @@ import zio.test._

import scala.reflect.ClassTag

//noinspection SimplifyAssertInspection
object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
override val kafkaPrefix: String = "consumespec"

Expand Down Expand Up @@ -287,8 +294,8 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
.tap(_ => ZIO.logDebug("Stream completed"))
.provideSomeLayer[Kafka](
consumer(client, Some(group))
) *> keepProducing
.set(false)
)
_ <- keepProducing.set(false)
} yield assertCompletes
},
test("process outstanding commits after a graceful shutdown") {
Expand Down Expand Up @@ -1052,11 +1059,183 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
_ <- produceOne(topic, "key2", "message2")
_ <- recordsOut.take
} yield assertCompletes
}
},
suite("issue #856")(
test(
"Booting a Consumer to do something else than consuming should not fail with `RunloopTimeout` exception"
) {
def test(diagnostics: Diagnostics) =
for {
clientId <- randomClient
settings <- consumerSettings(clientId = clientId)
_ <- Consumer.make(settings, diagnostics = diagnostics)
_ <- ZIO.sleep(1.second)
} yield assertCompletes

for {
diagnostics <- Diagnostics.SlidingQueue.make(1000)
testResult <- ZIO.scoped {
test(diagnostics)
}
finalizationEvents <- diagnostics.queue.takeAll.map(_.filter(_.isInstanceOf[Finalization]))
} yield testResult && assert(finalizationEvents)(hasSameElements(Chunk(ConsumerFinalized)))
},
suite(
"Ordering of finalizers matters. If subscriptions are finalized after the runloop, it creates a deadlock"
)(
test("When not consuming, the Runloop is not started so only the Consumer is finalized") {
def test(diagnostics: Diagnostics): ZIO[Scope & Kafka, Throwable, TestResult] =
for {
clientId <- randomClient
settings <- consumerSettings(clientId = clientId)
_ <- Consumer.make(settings, diagnostics = diagnostics)
} yield assertCompletes

for {
diagnostics <- Diagnostics.SlidingQueue.make(1000)
testResult <- ZIO.scoped {
test(diagnostics)
}
finalizationEvents <- diagnostics.queue.takeAll.map(_.filter(_.isInstanceOf[Finalization]))
} yield testResult && assert(finalizationEvents)(hasSameElements(Chunk(ConsumerFinalized)))
},
test("When consuming, the Runloop is started. The finalization orders matters to avoid a deadlock") {
// This test ensures that we're not inadvertently introducing a deadlock by changing the order of finalizers.

def test(diagnostics: Diagnostics): ZIO[Producer & Scope & Kafka, Throwable, TestResult] =
for {
clientId <- randomClient
topic <- randomTopic
settings <- consumerSettings(clientId = clientId)
consumer <- Consumer.make(settings, diagnostics = diagnostics)
_ <- produceOne(topic, "key1", "message1")
// Starting a consumption session to start the Runloop.
consumed_0 <- consumer
.plainStream(Subscription.manual(topic -> 0), Serde.string, Serde.string)
.take(1)
.runCount
consumed_1 <- consumer
.plainStream(Subscription.manual(topic -> 0), Serde.string, Serde.string)
.take(1)
.runCount
} yield assert(consumed_0)(equalTo(1L)) && assert(consumed_1)(equalTo(1L))

for {
diagnostics <- Diagnostics.SlidingQueue.make(1000)
testResult <- ZIO.scoped {
test(diagnostics)
}
finalizationEvents <- diagnostics.queue.takeAll.map(_.filter(_.isInstanceOf[Finalization]))
} yield testResult && assert(finalizationEvents)(
// The order is very important.
// The subscription must be finalized before the runloop, otherwise it creates a deadlock.
equalTo(
Chunk(
SubscriptionFinalized,
SubscriptionFinalized,
RunloopFinalized,
ConsumerFinalized
)
)
)
},
test(
"Calling `Consumer::stopConsumption` just after starting a forked consumption session should stop the consumption"
) {
val numberOfMessages: Int = 100000
val kvs: Iterable[(String, String)] = Iterable.tabulate(numberOfMessages)(i => (s"key-$i", s"msg-$i"))

def test(diagnostics: Diagnostics): ZIO[Producer & Scope & Kafka, Throwable, TestResult] =
for {
clientId <- randomClient
topic <- randomTopic
settings <- consumerSettings(clientId = clientId)
consumer <- Consumer.make(settings, diagnostics = diagnostics)
_ <- produceMany(topic, kvs)
// Starting a consumption session to start the Runloop.
fiber <- consumer
.plainStream(Subscription.manual(topic -> 0), Serde.string, Serde.string)
.take(numberOfMessages.toLong)
.runCount
.forkScoped
_ <- consumer.stopConsumption
consumed_0 <- fiber.join
} yield assert(consumed_0)(isLessThan(numberOfMessages.toLong))

for {
diagnostics <- Diagnostics.SlidingQueue.make(1000)
testResult <- ZIO.scoped {
test(diagnostics)
}
finalizationEvents <- diagnostics.queue.takeAll.map(_.filter(_.isInstanceOf[Finalization]))
} yield testResult && assert(finalizationEvents)(
// The order is very important.
// The subscription must be finalized before the runloop, otherwise it creates a deadlock.
equalTo(
Chunk(
SubscriptionFinalized,
RunloopFinalized,
ConsumerFinalized
)
)
)
} @@ nonFlaky(5),
test("it's possible to start a new consumption session from a Consumer that had a consumption session stopped previously") {
val numberOfMessages: Int = 100000
val kvs: Iterable[(String, String)] = Iterable.tabulate(numberOfMessages)(i => (s"key-$i", s"msg-$i"))

def test(diagnostics: Diagnostics): ZIO[Producer & Scope & Kafka, Throwable, TestResult] =
for {
clientId <- randomClient
topic <- randomTopic
settings <- consumerSettings(clientId = clientId)
consumer <- Consumer.make(settings, diagnostics = diagnostics)
_ <- produceMany(topic, kvs)
// Starting a consumption session to start the Runloop.
fiber <- consumer
.plainStream(Subscription.manual(topic -> 0), Serde.string, Serde.string)
.take(numberOfMessages.toLong)
.runCount
.forkScoped
_ <- ZIO.sleep(200.millis)
_ <- consumer.stopConsumption
consumed_0 <- fiber.join
_ <- ZIO.logDebug(s"consumed_0: $consumed_0")

_ <- ZIO.logDebug("About to sleep 5 seconds")
_ <- ZIO.sleep(5.seconds)
_ <- ZIO.logDebug("Slept 5 seconds")
consumed_1 <- consumer
.plainStream(Subscription.manual(topic -> 0), Serde.string, Serde.string)
.take(numberOfMessages.toLong)
.runCount
} yield assert(consumed_0)(isGreaterThan(0L) && isLessThan(numberOfMessages.toLong)) &&
assert(consumed_1)(equalTo(numberOfMessages.toLong))

for {
diagnostics <- Diagnostics.SlidingQueue.make(1000)
testResult <- ZIO.scoped {
test(diagnostics)
}
finalizationEvents <- diagnostics.queue.takeAll.map(_.filter(_.isInstanceOf[Finalization]))
} yield testResult && assert(finalizationEvents)(
// The order is very important.
// The subscription must be finalized before the runloop, otherwise it creates a deadlock.
equalTo(
Chunk(
SubscriptionFinalized,
RunloopFinalized,
ConsumerFinalized
)
)
)
}
)
)
)
.provideSome[Scope & Kafka](producer)
.provideSomeShared[Scope](
Kafka.embedded
) @@ withLiveClock @@ TestAspect.sequential @@ timeout(2.minutes)
) @@ withLiveClock @@ sequential @@ timeout(2.minutes)

}
Original file line number Diff line number Diff line change
Expand Up @@ -103,15 +103,13 @@ object KafkaTestUtils {
offsetRetrieval: OffsetRetrieval = OffsetRetrieval.Auto(),
restartStreamOnRebalancing: Boolean = false,
`max.poll.records`: Int = 100, // settings this higher can cause concurrency bugs to go unnoticed
runloopTimeout: Duration = ConsumerSettings.defaultRunloopTimeout,
properties: Map[String, String] = Map.empty
): URIO[Kafka, ConsumerSettings] =
ZIO.serviceWith[Kafka] { (kafka: Kafka) =>
val settings = ConsumerSettings(kafka.bootstrapServers)
.withClientId(clientId)
.withCloseTimeout(5.seconds)
.withPollTimeout(100.millis)
.withRunloopTimeout(runloopTimeout)
.withProperties(
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "earliest",
ConsumerConfig.METADATA_MAX_AGE_CONFIG -> "100",
Expand Down
Loading

0 comments on commit 626029d

Please sign in to comment.