Skip to content

Merging streams causes 2 chunks to be emmited instead of 1 #1987

@ghost

Description

I'm running into the issue about mergeing 2 fs2.Streams and processing the resulting Stream further. Consider the following example:

  Ref.of[IO, Int](0).map(ref => {
    fs2.Stream.never[IO].merge(fs2.Stream.repeatEval(ref.get)).evalMap(value => {
      IO(println(s"Got value $value")) >> IO.sleep(1.second) >> ref.set(value + 1)
    })
  }).flatMap(_.compile.drain).unsafeRunSync()

Expected output:

Got value 0
Got value 1
Got value 2
Got value 3
...

Actual output:

Got value 0
Got value 0
Got value 1
Got value 1
Got value 2
Got value 2
Got value 3
Got value 3
...

As far as I can tell from digging into the implementation of merge the issue is with the Semaphore(1). Instead of emitting 1 chunk it puts 2 chunks immediately which is not really expected.

Increasing the number of permits to n causes n+1 chunks to be emitted at once.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions