-
-
Notifications
You must be signed in to change notification settings - Fork 622
Description
Hello, I came across a bug in an application of mine where we stop seeing updates on one stream when multiple streams are merged with mergeHaltBoth. I've come up with a fairly minimal reproduction of the issue. Our application will sometimes inspect the first element of the stream to determine how to process the remaining elements of the stream. We make use of the splitHead function used in this example.
Here is the code which demonstrates this issue:
package bug
import cats.effect.{IO, IOApp}
import fs2.{Chunk, Stream}
import scala.concurrent.duration.DurationInt
object FS2Bug extends IOApp.Simple {
sealed trait Data
case class Item(value: Int) extends Data
case object Tick1 extends Data
case object Tick2 extends Data
override def run: IO[Unit] = {
val source = Stream.emits(1 to 100).evalMap(i => IO(Item(i)).delayBy(2.seconds))
val timer = fs2.Stream.awakeEvery[IO](1.second).evalMap(_ => IO(()).as(Tick1))
val timer2 = fs2.Stream.awakeEvery[IO](1.second).evalMap(_ => IO(()).as(Tick2))
val sources = timer2.mergeHaltBoth(source.mergeHaltBoth(timer))
val writer = splitHead(sources)
.flatMap { case (head, tail) =>
splitHead(tail).flatMap { case (head2, tail) =>
Stream.emit(head) ++ Stream.emit(head2) ++ tail
}.parEvalMap(3) { i =>
IO(println(s"Writing ${i.toString}"))
}
}
writer.compile.drain
}
def splitHead[F[_], O](in: fs2.Stream[F, O]): fs2.Stream[F, (O, fs2.Stream[F, O])] =
in.pull.uncons1
.flatMap {
case Some((head, tail)) => fs2.Pull.output(Chunk((head, tail)))
case None => fs2.Pull.done
}
.stream
}
The program merges 3 streams together. It splits the head of the stream from the remaining elements, before merging it back together. The source stream will terminate after 100 elements are emitted, the timer streams never terminate.
When run, this program generates output like:
Writing Tick2
Writing Tick1
Writing Item(1)
Writing Tick1
Writing Tick1
Writing Tick1
Writing Item(2)
Writing Tick1
Writing Tick1
Writing Item(3)
Writing Tick1
Writing Tick1
Writing Item(4)
Writing Tick1
Writing Tick1
Writing Item(5)
Writing Tick1
Writing Tick1
Writing Item(6)
Writing Tick1
Writing Tick1
Note how Tick2 from the timer2 stream is only printed once. This should happen at the same frequency as the Tick1 outputs.
This affects fs2 versions:
- 3.12.0
- 2.5.10
I'll take a look at getting a PR up with a test to highlight the issue. Thanks