-
-
Notifications
You must be signed in to change notification settings - Fork 622
Closed
Description
I'm running into the issue about mergeing 2 fs2.Streams and processing the resulting Stream further. Consider the following example:
Ref.of[IO, Int](0).map(ref => {
fs2.Stream.never[IO].merge(fs2.Stream.repeatEval(ref.get)).evalMap(value => {
IO(println(s"Got value $value")) >> IO.sleep(1.second) >> ref.set(value + 1)
})
}).flatMap(_.compile.drain).unsafeRunSync()
Expected output:
Got value 0
Got value 1
Got value 2
Got value 3
...
Actual output:
Got value 0
Got value 0
Got value 1
Got value 1
Got value 2
Got value 2
Got value 3
Got value 3
...
As far as I can tell from digging into the implementation of merge the issue is with the Semaphore(1). Instead of emitting 1 chunk it puts 2 chunks immediately which is not really expected.
Increasing the number of permits to n causes n+1 chunks to be emitted at once.
Reactions are currently unavailable
Metadata
Metadata
Assignees
Labels
No labels