Skip to content

Some streams merged with merge stop emitting when downstream does certain actions #3598

@lukestephenson

Description

@lukestephenson

Hello, I came across a bug in an application of mine where we stop seeing updates on one stream when multiple streams are merged with mergeHaltBoth. I've come up with a fairly minimal reproduction of the issue. Our application will sometimes inspect the first element of the stream to determine how to process the remaining elements of the stream. We make use of the splitHead function used in this example.

Here is the code which demonstrates this issue:

package bug

import cats.effect.{IO, IOApp}
import fs2.{Chunk, Stream}

import scala.concurrent.duration.DurationInt

object FS2Bug extends IOApp.Simple {

  sealed trait Data

  case class Item(value: Int) extends Data
  case object Tick1 extends Data
  case object Tick2 extends Data

  override def run: IO[Unit] = {
    val source = Stream.emits(1 to 100).evalMap(i => IO(Item(i)).delayBy(2.seconds))

    val timer = fs2.Stream.awakeEvery[IO](1.second).evalMap(_ => IO(()).as(Tick1))
    val timer2 = fs2.Stream.awakeEvery[IO](1.second).evalMap(_ => IO(()).as(Tick2))

    val sources = timer2.mergeHaltBoth(source.mergeHaltBoth(timer))

    val writer = splitHead(sources)
      .flatMap { case (head, tail) =>
        splitHead(tail).flatMap { case (head2, tail) =>
          Stream.emit(head) ++ Stream.emit(head2) ++ tail
        }.parEvalMap(3) { i =>
          IO(println(s"Writing ${i.toString}"))
        }
      }

    writer.compile.drain
  }

  def splitHead[F[_], O](in: fs2.Stream[F, O]): fs2.Stream[F, (O, fs2.Stream[F, O])] =
    in.pull.uncons1
      .flatMap {
        case Some((head, tail)) => fs2.Pull.output(Chunk((head, tail)))
        case None => fs2.Pull.done
      }
      .stream
}

The program merges 3 streams together. It splits the head of the stream from the remaining elements, before merging it back together. The source stream will terminate after 100 elements are emitted, the timer streams never terminate.

When run, this program generates output like:

Writing Tick2
Writing Tick1
Writing Item(1)
Writing Tick1
Writing Tick1
Writing Tick1
Writing Item(2)
Writing Tick1
Writing Tick1
Writing Item(3)
Writing Tick1
Writing Tick1
Writing Item(4)
Writing Tick1
Writing Tick1
Writing Item(5)
Writing Tick1
Writing Tick1
Writing Item(6)
Writing Tick1
Writing Tick1

Note how Tick2 from the timer2 stream is only printed once. This should happen at the same frequency as the Tick1 outputs.

This affects fs2 versions:

  • 3.12.0
  • 2.5.10

I'll take a look at getting a PR up with a test to highlight the issue. Thanks

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions