Skip to content

Commit

Permalink
Zio1 stream bench (zio#6445)
Browse files Browse the repository at this point in the history
* stream benchmarks

* stream benchmarks

* scala 2.12 compile fix

* ReentrantLockSpec flaky test
  • Loading branch information
justcoon authored Apr 17, 2022
1 parent 684e11a commit 5a19668
Showing 1 changed file with 214 additions and 1 deletion.
215 changes: 214 additions & 1 deletion benchmarks/src/main/scala/zio/StreamBenchmarks.scala
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import zio.stream._

import java.util.concurrent.TimeUnit
import scala.concurrent.duration.Duration
import scala.concurrent.{Await, ExecutionContextExecutor}
import scala.concurrent.{Await, ExecutionContext, ExecutionContextExecutor, Future}

@State(Scope.Thread)
@BenchmarkMode(Array(Mode.Throughput))
Expand All @@ -23,6 +23,9 @@ class StreamBenchmarks {
@Param(Array("5000"))
var chunkSize: Int = _

@Param(Array("50"))
var parChunkSize: Int = _

implicit val system: ActorSystem = ActorSystem("benchmarks")
implicit val ec: ExecutionContextExecutor = system.dispatcher

Expand Down Expand Up @@ -79,6 +82,216 @@ class StreamBenchmarks {
.map(_.toLong)
.fold(0L)(_ + _)
}

@Benchmark
def fs2MapAccum: Option[(Long, Long)] = {
val chunks = (1 to chunkCount).map(i => FS2Chunk.array(Array.fill(chunkSize)(i)))
FS2Stream(chunks: _*)
.flatMap(FS2Stream.chunk(_))
.mapAccumulate(0L) { case (acc, i) =>
val added = acc + i
(added, added)
}
.covary[CatsIO]
.compile
.last
.unsafeRunSync()
}

@Benchmark
def zioMapAccum: Option[Long] = {
val chunks = (1 to chunkCount).map(i => Chunk.fromArray(Array.fill(chunkSize)(i)))
val result = ZStream
.fromChunks(chunks: _*)
.mapAccum(0L) { case (acc, i) =>
val added = acc + i
(added, added)
}
.runLast

unsafeRun(result)
}

@Benchmark
def akkaTakeWhile: Option[Int] = {
val chunks = (1 to chunkCount).map(i => Array.fill(chunkSize)(i))
val program = AkkaSource
.fromIterator(() => chunks.iterator.flatten)
.takeWhile(i => (i < (chunkCount * chunkSize) / 2))
.toMat(AkkaSink.lastOption)(Keep.right)

Await.result(program.run(), Duration.Inf)
}

@Benchmark
def fs2TakeWhile: Option[Int] = {
val chunks = (1 to chunkCount).map(i => FS2Chunk.array(Array.fill(chunkSize)(i)))
FS2Stream(chunks: _*)
.flatMap(FS2Stream.chunk(_))
.takeWhile(i => (i < (chunkCount * chunkSize) / 2))
.covary[CatsIO]
.compile
.last
.unsafeRunSync()
}

@Benchmark
def zioTakeWhile: Option[Int] = {
val chunks = (1 to chunkCount).map(i => Chunk.fromArray(Array.fill(chunkSize)(i)))
val result = ZStream
.fromChunks(chunks: _*)
.takeWhile(i => (i < (chunkCount * chunkSize) / 2))
.runLast

unsafeRun(result)
}

@Benchmark
def akkaGroupWithin: Long = {
val chunks = (1 to chunkCount).map(i => Array.fill(chunkSize)(i))
val program = AkkaSource
.fromIterator(() => chunks.iterator.flatten)
.groupedWithin(100, Duration(1, TimeUnit.SECONDS))
.toMat(AkkaSink.fold(0L)((c, _) => c + 1L))(Keep.right)

Await.result(program.run(), Duration.Inf)
}

@Benchmark
def fs2GroupWithin: Long = {
implicit val timer = CatsIO.timer(ExecutionContext.global)
val chunks = (1 to chunkCount).map(i => FS2Chunk.array(Array.fill(chunkSize)(i)))
FS2Stream(chunks: _*)
.flatMap(FS2Stream.chunk(_))
.groupWithin[CatsIO](100, Duration(1, TimeUnit.SECONDS))
.covary[CatsIO]
.compile
.fold(0L)((c, _) => c + 1L)
.unsafeRunSync()
}

@Benchmark
def zioGroupWithin: Long = {
val chunks = (1 to chunkCount).map(i => Chunk.fromArray(Array.fill(chunkSize)(i)))
val result = ZStream
.fromChunks(chunks: _*)
.groupedWithin(100, zio.duration.Duration(1, TimeUnit.SECONDS))
.runCount

unsafeRun(result.provideLayer(zio.clock.Clock.live))
}

@Benchmark
def zioGroupBy: Long = {
val chunks = (1 to chunkCount).map(i => Chunk.fromArray(Array.fill(chunkSize)(i)))
val result = ZStream
.fromChunks(chunks: _*)
.groupByKey(_ % 2) { case (k, s) =>
ZStream.fromEffect(s.runCollect.map(vs => k -> vs))
}
.runCount

unsafeRun(result)
}

@Benchmark
def akkaMapPar: Long = {
val chunks = (1 to chunkCount).map(i => Array.fill(parChunkSize)(i))
val program = AkkaSource
.fromIterator(() => chunks.iterator.flatten)
.mapAsync(4)(i => Future.successful(BigDecimal.valueOf(i.toLong).pow(3)))
.toMat(AkkaSink.fold(0L)((c, _) => c + 1L))(Keep.right)

Await.result(program.run(), Duration.Inf)
}

@Benchmark
def fs2MapPar: Long = {
val chunks = (1 to chunkCount).map(i => FS2Chunk.array(Array.fill(parChunkSize)(i)))
FS2Stream(chunks: _*)
.flatMap(FS2Stream.chunk(_))
.mapAsync[CatsIO, BigDecimal](4)(i => CatsIO(BigDecimal.valueOf(i.toLong).pow(3)))
.covary[CatsIO]
.compile
.fold(0L)((c, _) => c + 1L)
.unsafeRunSync()
}

@Benchmark
def zioMapPar: Long = {
val chunks = (1 to chunkCount).map(i => Chunk.fromArray(Array.fill(parChunkSize)(i)))
val result = ZStream
.fromChunks(chunks: _*)
.mapMPar[Any, Nothing, BigDecimal](4)(i => ZIO.succeed(BigDecimal.valueOf(i.toLong).pow(3)))
.runCount
unsafeRun(result)
}

@Benchmark
def akkaMapParUnordered: Long = {
val chunks = (1 to chunkCount).map(i => Array.fill(parChunkSize)(i))
val program = AkkaSource
.fromIterator(() => chunks.iterator.flatten)
.mapAsyncUnordered(4)(i => Future.successful(BigDecimal.valueOf(i.toLong).pow(3)))
.toMat(AkkaSink.fold(0L)((c, _) => c + 1L))(Keep.right)

Await.result(program.run(), Duration.Inf)
}

@Benchmark
def fs2MapParUnordered: Long = {
val chunks = (1 to chunkCount).map(i => FS2Chunk.array(Array.fill(parChunkSize)(i)))
FS2Stream(chunks: _*)
.flatMap(FS2Stream.chunk(_))
.mapAsyncUnordered[CatsIO, BigDecimal](4)(i => CatsIO(BigDecimal.valueOf(i.toLong).pow(3)))
.covary[CatsIO]
.compile
.fold(0L)((c, _) => c + 1L)
.unsafeRunSync()
}

@Benchmark
def zioMapParUnordered: Long = {
val chunks = (1 to chunkCount).map(i => Chunk.fromArray(Array.fill(parChunkSize)(i)))
val result = ZStream
.fromChunks(chunks: _*)
.mapMParUnordered[Any, Nothing, BigDecimal](4)(i => ZIO.succeed(BigDecimal.valueOf(i.toLong).pow(3)))
.runCount
unsafeRun(result)
}

@Benchmark
def akkaZipWith: Long = {
val chunks = (1 to chunkCount).map(i => Array.fill(chunkSize)(i))
val s1 = AkkaSource.fromIterator(() => chunks.iterator.flatten)
val s2 = AkkaSource.fromIterator(() => chunks.iterator.flatten).map(_ * 2L)
val program = s1
.zipWith(s2)(_ + _)
.toMat(AkkaSink.fold(0L)((acc, c) => acc + c))(Keep.right)
Await.result(program.run(), Duration.Inf)
}

@Benchmark
def fs2ZipWith: Long = {
val chunks = (1 to chunkCount).map(i => FS2Chunk.array(Array.fill(chunkSize)(i)))
val s1 = FS2Stream(chunks: _*).flatMap(FS2Stream.chunk(_))
val s2 = FS2Stream(chunks: _*).flatMap(FS2Stream.chunk(_)).map(_ * 2L)
s1.zipWith(s2)(_ + _)
.covary[CatsIO]
.compile
.fold(0L)((acc, c) => acc + c)
.unsafeRunSync()
}

@Benchmark
def zioZipWith: Long = {
val chunks = (1 to chunkCount).map(i => Chunk.fromArray(Array.fill(chunkSize)(i)))
val s1 = ZStream.fromChunks(chunks: _*)
val s2 = ZStream.fromChunks(chunks: _*).map(_ * 2L)
val result = s1.zipWith(s2)(_ + _).runSum
unsafeRun(result)
}

}

@State(Scope.Thread)
Expand Down

0 comments on commit 5a19668

Please sign in to comment.