Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Try to use zio-profiling with our benchmarks #883

Closed
wants to merge 11 commits into from
13 changes: 11 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,11 @@ def stdSettings(prjName: String) = Seq(
case Some((2, _)) => Seq(compilerPlugin("com.olegpy" %% "better-monadic-for" % "0.3.1"))
case _ => List.empty
}
}
},
libraryDependencies ++= Seq(
compilerPlugin("dev.zio" %% "zio-profiling-tagging-plugin" % "0.2.0"),
"dev.zio" %% "zio-profiling" % "0.2.0"
)
) ++ scalafixSettings

lazy val zioKafka =
Expand Down Expand Up @@ -153,7 +157,12 @@ lazy val zioKafkaBench =
.enablePlugins(JmhPlugin)
.settings(stdSettings("zio-kafka-bench"))
.settings(publish / skip := true)
.settings(libraryDependencies += logback)
.settings(
libraryDependencies ++= Seq(
logback,
"dev.zio" %% "zio-profiling-jmh" % "0.2.0"
)
)
.dependsOn(zioKafka, zioKafkaTestkit)

lazy val zioKafkaExample =
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package zio.kafka.bench

import org.openjdk.jmh.annotations.{ Setup, TearDown }
import zio.kafka.bench.ZioBenchmark.logger
import zio.profiling.sampling.{ SamplingProfiler, SamplingProfilerSupervisor }
import zio.{ ZLayer, _ }

import java.util.UUID
Expand All @@ -12,14 +14,24 @@ trait ZioBenchmark[Environment] {

protected val timeout: Duration = 180.seconds

protected val samplingPeriod: Duration = 10.millis

/**
* Inspired by
* https://github.com/zio/zio-profiling/blob/v0.2.0/zio-profiling/src/main/scala/zio/profiling/sampling/SamplingProfiler.scala#L35-L37
*/
private val supervisorLayer: ULayer[SamplingProfilerSupervisor] =
ZLayer.scoped[Any](SamplingProfiler(samplingPeriod).makeSupervisor).flatMap(env => Runtime.addSupervisor(env.get).map(_ => env))

@Setup
def setup(): Unit =
runtime = Unsafe.unsafe(implicit unsafe =>
zio.Runtime.unsafe.fromLayer(
bootstrap >+>
(Runtime.removeDefaultLoggers >+>
(if (enableLogging) Runtime.addLogger(logger) else ZLayer.empty)) >+>
ZLayer.fromZIO(initialize)
ZLayer.fromZIO(initialize) >+>
supervisorLayer
)
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import zio.kafka.consumer.{ Consumer, ConsumerSettings }
import zio.kafka.producer.Producer
import zio.kafka.testkit.Kafka
import zio.kafka.testkit.KafkaTestUtils.{ consumerSettings, minimalConsumer, produceMany, producer }
import zio.{ durationInt, ULayer, ZIO, ZLayer }
import zio.{ ULayer, ZIO, ZLayer }

import scala.jdk.CollectionConverters._

Expand Down Expand Up @@ -44,9 +44,7 @@ trait ComparisonBenchmark extends ZioBenchmark[Env] {
consumerSettings(
clientId = randomThing("client"),
groupId = Some(randomThing("client")),
`max.poll.records` = 1000, // A more production worthy value
runloopTimeout =
1.hour // Absurdly high timeout to avoid the runloop from being interrupted while we're benchmarking other stuff
`max.poll.records` = 1000 // A more production worthy value
)
)

Expand Down
284 changes: 281 additions & 3 deletions zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,14 @@ import org.apache.kafka.common.TopicPartition
import zio._
import zio.kafka.ZIOSpecDefaultSlf4j
import zio.kafka.consumer.Consumer.{ AutoOffsetStrategy, OffsetRetrieval }
import zio.kafka.consumer.diagnostics.DiagnosticEvent.Finalization
import zio.kafka.consumer.diagnostics.DiagnosticEvent.Finalization.{
ConsumerFinalized,
RunloopFinalized,
SubscriptionFinalized
}
import zio.kafka.consumer.diagnostics.{ DiagnosticEvent, Diagnostics }
import zio.kafka.producer.TransactionalProducer
import zio.kafka.producer.{ Producer, TransactionalProducer }
import zio.kafka.serde.Serde
import zio.kafka.testkit.KafkaTestUtils._
import zio.kafka.testkit.{ Kafka, KafkaRandom }
Expand All @@ -22,8 +28,10 @@ import zio.test.Assertion._
import zio.test.TestAspect._
import zio.test._

import java.util.concurrent.atomic.AtomicInteger
import scala.reflect.ClassTag

//noinspection SimplifyAssertInspection
object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
override val kafkaPrefix: String = "consumespec"

Expand Down Expand Up @@ -1052,11 +1060,281 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
_ <- produceOne(topic, "key2", "message2")
_ <- recordsOut.take
} yield assertCompletes
}
},
suite("issue #856")(
test(
"Booting a Consumer to do something else than consuming should not fail with `RunloopTimeout` exception"
) {
def test(diagnostics: Diagnostics) =
for {
clientId <- randomClient
settings <- consumerSettings(clientId = clientId)
_ <- Consumer.make(settings, diagnostics = diagnostics)
_ <- ZIO.sleep(1.second)
} yield assertCompletes

for {
diagnostics <- Diagnostics.SlidingQueue.make(1000)
testResult <- ZIO.scoped {
test(diagnostics)
}
finalizationEvents <- diagnostics.queue.takeAll.map(_.filter(_.isInstanceOf[Finalization]))
} yield testResult && assert(finalizationEvents)(hasSameElements(Chunk(ConsumerFinalized)))
},
suite(
"Ordering of finalizers matters. If subscriptions are finalized after the runloop, it creates a deadlock"
)(
test("When not consuming, the Runloop is not started so only the Consumer is finalized") {
def test(diagnostics: Diagnostics): ZIO[Scope & Kafka, Throwable, TestResult] =
for {
clientId <- randomClient
settings <- consumerSettings(clientId = clientId)
_ <- Consumer.make(settings, diagnostics = diagnostics)
} yield assertCompletes

for {
diagnostics <- Diagnostics.SlidingQueue.make(1000)
testResult <- ZIO.scoped {
test(diagnostics)
}
finalizationEvents <- diagnostics.queue.takeAll.map(_.filter(_.isInstanceOf[Finalization]))
} yield testResult && assert(finalizationEvents)(hasSameElements(Chunk(ConsumerFinalized)))
},
test("When consuming, the Runloop is started. The finalization orders matters to avoid a deadlock") {
// This test ensures that we're not inadvertently introducing a deadlock by changing the order of finalizers.

def test(diagnostics: Diagnostics): ZIO[Producer & Scope & Kafka, Throwable, TestResult] =
for {
clientId <- randomClient
topic <- randomTopic
settings <- consumerSettings(clientId = clientId)
consumer <- Consumer.make(settings, diagnostics = diagnostics)
_ <- produceOne(topic, "key1", "message1")
// Starting a consumption session to start the Runloop.
consumed_0 <- consumer
.plainStream(Subscription.manual(topic -> 0), Serde.string, Serde.string)
.take(1)
.runCount
consumed_1 <- consumer
.plainStream(Subscription.manual(topic -> 0), Serde.string, Serde.string)
.take(1)
.runCount
} yield assert(consumed_0)(equalTo(1L)) && assert(consumed_1)(equalTo(1L))

for {
diagnostics <- Diagnostics.SlidingQueue.make(1000)
testResult <- ZIO.scoped {
test(diagnostics)
}
finalizationEvents <- diagnostics.queue.takeAll.map(_.filter(_.isInstanceOf[Finalization]))
} yield testResult && assert(finalizationEvents)(
// The order is very important.
// The subscription must be finalized before the runloop, otherwise it creates a deadlock.
equalTo(
Chunk(
SubscriptionFinalized,
SubscriptionFinalized,
RunloopFinalized,
ConsumerFinalized
)
)
)
},
test(
"Calling `Consumer::stopConsumption` just after starting a forked consumption session should stop the consumption"
) {
val numberOfMessages: Int = 100000
val kvs: Iterable[(String, String)] = Iterable.tabulate(numberOfMessages)(i => (s"key-$i", s"msg-$i"))

def test(diagnostics: Diagnostics): ZIO[Producer & Scope & Kafka, Throwable, TestResult] =
for {
clientId <- randomClient
topic <- randomTopic
settings <- consumerSettings(clientId = clientId)
consumer <- Consumer.make(settings, diagnostics = diagnostics)
_ <- produceMany(topic, kvs)
ref = new AtomicInteger(0)
// Starting a consumption session to start the Runloop.
fiber <-
consumer
.plainStream(Subscription.manual(topic -> 0), Serde.string, Serde.string)
.mapChunksZIO(chunks => ZIO.logDebug(s"Consumed ${ref.getAndAdd(chunks.size)} messages").as(chunks))
.take(numberOfMessages.toLong)
.runCount
.fork
_ <- consumer.stopConsumption
consumed_0 <- fiber.join
} yield assert(consumed_0)(isLessThan(numberOfMessages.toLong))

for {
diagnostics <- Diagnostics.SlidingQueue.make(1000)
testResult <- ZIO.scoped {
test(diagnostics)
}
finalizationEvents <- diagnostics.queue.takeAll.map(_.filter(_.isInstanceOf[Finalization]))
} yield testResult && assert(finalizationEvents)(
// The order is very important.
// The subscription must be finalized before the runloop, otherwise it creates a deadlock.
equalTo(
Chunk(
SubscriptionFinalized,
RunloopFinalized,
ConsumerFinalized
)
)
)
} @@ nonFlaky(5),
test(
"it's possible to start a new consumption session from a Consumer that had a consumption session stopped previously"
) {
val numberOfMessages: Int = 100000
val kvs: Iterable[(String, String)] = Iterable.tabulate(numberOfMessages)(i => (s"key-$i", s"msg-$i"))

def test(diagnostics: Diagnostics): ZIO[Producer & Scope & Kafka, Throwable, TestResult] =
for {
clientId <- randomClient
topic <- randomTopic
settings <- consumerSettings(clientId = clientId)
consumer <- Consumer.make(settings, diagnostics = diagnostics)
_ <- produceMany(topic, kvs)
// Starting a consumption session to start the Runloop.
fiber <- consumer
.plainStream(Subscription.manual(topic -> 0), Serde.string, Serde.string)
.take(numberOfMessages.toLong)
.runCount
.forkScoped
_ <- ZIO.sleep(200.millis)
_ <- consumer.stopConsumption
consumed_0 <- fiber.join
_ <- ZIO.logDebug(s"consumed_0: $consumed_0")

_ <- ZIO.logDebug("About to sleep 5 seconds")
_ <- ZIO.sleep(5.seconds)
_ <- ZIO.logDebug("Slept 5 seconds")
consumed_1 <- consumer
.plainStream(Subscription.manual(topic -> 0), Serde.string, Serde.string)
.take(numberOfMessages.toLong)
.runCount
} yield assert(consumed_0)(isGreaterThan(0L) && isLessThan(numberOfMessages.toLong)) &&
assert(consumed_1)(equalTo(numberOfMessages.toLong))

for {
diagnostics <- Diagnostics.SlidingQueue.make(1000)
testResult <- ZIO.scoped {
test(diagnostics)
}
finalizationEvents <- diagnostics.queue.takeAll.map(_.filter(_.isInstanceOf[Finalization]))
} yield testResult && assert(finalizationEvents)(
// The order is very important.
// The subscription must be finalized before the runloop, otherwise it creates a deadlock.
equalTo(
Chunk(
SubscriptionFinalized,
RunloopFinalized,
ConsumerFinalized
)
)
)
},
test(
"Calling `Consumer::stopConsumption` just after starting a forked consumption session should stop the consumption"
) {
val numberOfMessages: Int = 100000
val kvs: Iterable[(String, String)] = Iterable.tabulate(numberOfMessages)(i => (s"key-$i", s"msg-$i"))

def test(diagnostics: Diagnostics): ZIO[Producer & Scope & Kafka, Throwable, TestResult] =
for {
clientId <- randomClient
topic <- randomTopic
settings <- consumerSettings(clientId = clientId)
consumer <- Consumer.make(settings, diagnostics = diagnostics)
_ <- produceMany(topic, kvs)
// Starting a consumption session to start the Runloop.
fiber <- consumer
.plainStream(Subscription.manual(topic -> 0), Serde.string, Serde.string)
.take(numberOfMessages.toLong)
.runCount
.forkScoped
_ <- consumer.stopConsumption
consumed_0 <- fiber.join
} yield assert(consumed_0)(isLessThan(numberOfMessages.toLong))

for {
diagnostics <- Diagnostics.SlidingQueue.make(1000)
testResult <- ZIO.scoped {
test(diagnostics)
}
finalizationEvents <- diagnostics.queue.takeAll.map(_.filter(_.isInstanceOf[Finalization]))
} yield testResult && assert(finalizationEvents)(
// The order is very important.
// The subscription must be finalized before the runloop, otherwise it creates a deadlock.
equalTo(
Chunk(
SubscriptionFinalized,
RunloopFinalized,
ConsumerFinalized
)
)
)
} @@ nonFlaky(5),
test(
"it's possible to start a new consumption session from a Consumer that had a consumption session stopped previously"
) {
val numberOfMessages: Int = 100000
val kvs: Iterable[(String, String)] = Iterable.tabulate(numberOfMessages)(i => (s"key-$i", s"msg-$i"))

def test(diagnostics: Diagnostics): ZIO[Producer & Scope & Kafka, Throwable, TestResult] =
for {
clientId <- randomClient
topic <- randomTopic
settings <- consumerSettings(clientId = clientId)
consumer <- Consumer.make(settings, diagnostics = diagnostics)
_ <- produceMany(topic, kvs)
// Starting a consumption session to start the Runloop.
fiber <- consumer
.plainStream(Subscription.manual(topic -> 0), Serde.string, Serde.string)
.take(numberOfMessages.toLong)
.runCount
.forkScoped
_ <- ZIO.sleep(200.millis)
_ <- consumer.stopConsumption
consumed_0 <- fiber.join
_ <- ZIO.logDebug(s"consumed_0: $consumed_0")

_ <- ZIO.logDebug("About to sleep 5 seconds")
_ <- ZIO.sleep(5.seconds)
_ <- ZIO.logDebug("Slept 5 seconds")
consumed_1 <- consumer
.plainStream(Subscription.manual(topic -> 0), Serde.string, Serde.string)
.take(numberOfMessages.toLong)
.runCount
} yield assert(consumed_0)(isGreaterThan(0L) && isLessThan(numberOfMessages.toLong)) &&
assert(consumed_1)(equalTo(numberOfMessages.toLong))

for {
diagnostics <- Diagnostics.SlidingQueue.make(1000)
testResult <- ZIO.scoped {
test(diagnostics)
}
finalizationEvents <- diagnostics.queue.takeAll.map(_.filter(_.isInstanceOf[Finalization]))
} yield testResult && assert(finalizationEvents)(
// The order is very important.
// The subscription must be finalized before the runloop, otherwise it creates a deadlock.
equalTo(
Chunk(
SubscriptionFinalized,
RunloopFinalized,
ConsumerFinalized
)
)
)
}
)
)
)
.provideSome[Scope & Kafka](producer)
.provideSomeShared[Scope](
Kafka.embedded
) @@ withLiveClock @@ TestAspect.sequential @@ timeout(2.minutes)
) @@ withLiveClock @@ sequential @@ timeout(2.minutes)

}
Loading