Skip to content
6 changes: 6 additions & 0 deletions io/js/src/main/scala/fs2/io/NodeStream.scala
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,9 @@ sealed trait Readable extends js.Object
*/
@js.native
sealed trait Writable extends js.Object

/** A facade for Node.js `stream.Duplex`. Cast to/from your own bindings.
* @see [[https://nodejs.org/api/stream.html]]
*/
@js.native
sealed trait Duplex extends js.Object with Readable with Writable
6 changes: 3 additions & 3 deletions io/js/src/main/scala/fs2/io/file/PosixFiles.scala
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,7 @@ object PosixFiles {
readAll(path, Flags.r)

override def readAll(path: Path, flags: Flags): Stream[F, Byte] =
fromReadable(
readReadable(
F.delay(
fsMod
.createReadStream(
Expand All @@ -340,7 +340,7 @@ object PosixFiles {
)

override def readRange(path: Path, chunkSize: Int, start: Long, end: Long): Stream[F, Byte] =
fromReadable(
readReadable(
F.delay(
fsMod
.createReadStream(
Expand Down Expand Up @@ -418,7 +418,7 @@ object PosixFiles {
open(path, flags, mode).map(WriteCursor(_, 0L))

override def writeAll(path: Path, flags: Flags, mode: FileAccessMode): Pipe[F, Byte, INothing] =
fromWritable(
writeWritable(
F.delay(
fsMod
.createWriteStream(
Expand Down
182 changes: 101 additions & 81 deletions io/js/src/main/scala/fs2/io/ioplatform.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import cats.effect.kernel.Async
import cats.effect.kernel.Resource
import cats.effect.std.Dispatcher
import cats.effect.std.Queue
import cats.effect.std.Semaphore
import cats.effect.syntax.all._
import cats.syntax.all._
import fs2.internal.jsdeps.node.bufferMod
Expand All @@ -41,34 +40,42 @@ import scala.scalajs.js.|

private[fs2] trait ioplatform {

def fromReadable[F[_]](readable: F[Readable])(implicit F: Async[F]): Stream[F, Byte] =
def readReadable[F[_]](readable: F[Readable], destroyIfNotEnded: Boolean = true)(implicit
F: Async[F]
): Stream[F, Byte] =
Stream
.resource(for {
readable <- Resource.makeCase(readable.map(_.asInstanceOf[streamMod.Readable])) {
case (readable, Resource.ExitCase.Succeeded) =>
if (!readable.readableEnded)
F.delay(readable.destroy())
else
F.unit
F.delay {
if (!readable.readableEnded & destroyIfNotEnded)
readable.destroy()
}
case (readable, Resource.ExitCase.Errored(ex)) =>
F.delay(readable.destroy(js.Error(ex.getMessage())))
case (readable, Resource.ExitCase.Canceled) => F.delay(readable.destroy())
case (readable, Resource.ExitCase.Canceled) =>
F.delay(readable.destroy())
}
dispatcher <- Dispatcher[F]
queue <- Queue.synchronous[F, Unit].toResource
ended <- F.deferred[Either[Throwable, Unit]].toResource
queue <- Queue.synchronous[F, Option[Unit]].toResource
error <- F.deferred[Throwable].toResource
_ <- registerListener0(readable, nodeStrings.readable)(_.on_readable(_, _)) { () =>
dispatcher.unsafeRunAndForget(queue.offer(()))
dispatcher.unsafeRunAndForget(queue.offer(Some(())))
}
_ <- registerListener0(readable, nodeStrings.end)(_.on_end(_, _)) { () =>
dispatcher.unsafeRunAndForget(ended.complete(Right(())))
dispatcher.unsafeRunAndForget(queue.offer(None))
}
_ <- registerListener0(readable, nodeStrings.close)(_.on_close(_, _)) { () =>
dispatcher.unsafeRunAndForget(queue.offer(None))
}
_ <- registerListener[js.Error](readable, nodeStrings.error)(_.on_error(_, _)) { e =>
dispatcher.unsafeRunAndForget(ended.complete(Left(js.JavaScriptException(e))))
dispatcher.unsafeRunAndForget(error.complete(js.JavaScriptException(e)))
}
} yield (readable, queue, ended))
.flatMap { case (readable, queue, ended) =>
Stream.fromQueueUnterminated(queue).interruptWhen(ended) >>
} yield (readable, queue, error))
.flatMap { case (readable, queue, error) =>
Stream
.fromQueueNoneTerminated(queue)
.concurrently(Stream.eval(error.get.flatMap(F.raiseError[Unit]))) >>
Stream.evalUnChunk(
F.delay(
Option(readable.read().asInstanceOf[bufferMod.global.Buffer])
Expand All @@ -77,44 +84,25 @@ private[fs2] trait ioplatform {
)
}

def toReadable[F[_]](s: Stream[F, Byte])(implicit F: Async[F]): Resource[F, Readable] =
for {
dispatcher <- Dispatcher[F]
semaphore <- Semaphore[F](1).toResource
ref <- F.ref(s).toResource
read = semaphore.permit.use { _ =>
Pull
.eval(ref.get)
.flatMap(_.pull.uncons)
.flatMap {
case Some((head, tail)) =>
Pull.eval(ref.set(tail)) >> Pull.output(head)
case None =>
Pull.done
}
.stream
.chunks
.compile
.last
}
readable <- Resource.make {
F.pure {
new streamMod.Readable(streamMod.ReadableOptions().setRead { (readable, size) =>
dispatcher.unsafeRunAndForget(
read.attempt.flatMap {
case Left(ex) => F.delay(readable.destroy(js.Error(ex.getMessage)))
case Right(chunk) => F.delay(readable.push(chunk.map(_.toUint8Array).orNull)).void
}
def toReadable[F[_]](implicit F: Async[F]): Pipe[F, Byte, Readable] =
in =>
Stream.resource(mkDuplex(in)).flatMap { case (duplex, out) =>
Stream
.emit(duplex)
.merge(out.drain)
.concurrently(
Stream.eval(
F.async_[Unit](cb => duplex.asInstanceOf[streamMod.Writable].end(() => cb(Right(()))))
)
})
}
} { readable =>
F.delay(if (!readable.readableEnded) readable.destroy())
)
}
} yield readable.asInstanceOf[Readable]

def fromWritable[F[_]](
writable: F[Writable]
def toReadableResource[F[_]: Async](s: Stream[F, Byte]): Resource[F, Readable] =
s.through(toReadable).compile.resource.lastOrError

def writeWritable[F[_]](
writable: F[Writable],
endAfterUse: Boolean = true
)(implicit F: Async[F]): Pipe[F, Byte, INothing] =
in =>
Stream.eval(writable.map(_.asInstanceOf[streamMod.Writable])).flatMap { writable =>
Expand All @@ -131,77 +119,109 @@ private[fs2] trait ioplatform {
}
} >> go(tail)
case None =>
Pull.eval(F.async_[Unit](cb => writable.end(() => cb(Right(())))))
if (endAfterUse)
Pull.eval(F.async_[Unit](cb => writable.end(() => cb(Right(())))))
else
Pull.done
}

go(in).stream.handleErrorWith { ex =>
Stream.eval(F.delay(writable.destroy(js.Error(ex.getMessage))))
}.drain
}

def mkWritable[F[_]](implicit F: Async[F]): Resource[F, (Writable, Stream[F, Byte])] =
def readWritable[F[_]: Async](f: Writable => F[Unit]): Stream[F, Byte] =
Stream.empty.through(toDuplexAndRead(f))

def toDuplexAndRead[F[_]: Async](f: Duplex => F[Unit]): Pipe[F, Byte, Byte] =
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An even funkier name! Thoughts? 😅

in =>
Stream.resource(mkDuplex(in)).flatMap { case (duplex, out) =>
Stream.eval(f(duplex)).drain.merge(out)
}

private def mkDuplex[F[_]](
in: Stream[F, Byte]
)(implicit F: Async[F]): Resource[F, (Duplex, Stream[F, Byte])] =
for {
dispatcher <- Dispatcher[F]
queue <- Queue.synchronous[F, Option[Chunk[Byte]]].toResource
readQueue <- Queue.bounded[F, Option[Chunk[Byte]]](1).toResource
writeQueue <- Queue.synchronous[F, Option[Chunk[Byte]]].toResource
error <- F.deferred[Throwable].toResource
writable <- Resource.make {
F.pure {
new streamMod.Writable(
duplex <- Resource.make {
F.delay {
new streamMod.Duplex(
streamMod
.WritableOptions()
.setWrite { (writable, chunk, encoding, cb) =>
.DuplexOptions()
.setRead { (duplex, size) =>
val readable = duplex.asInstanceOf[streamMod.Readable]
dispatcher.unsafeRunAndForget(
readQueue.take.attempt.flatMap {
case Left(ex) =>
F.delay(readable.destroy(js.Error(ex.getMessage)))
case Right(chunk) =>
F.delay(readable.push(chunk.map(_.toUint8Array).orNull)).void
}
)
}
.setWrite { (duplex, chunk, encoding, cb) =>
dispatcher.unsafeRunAndForget(
queue
writeQueue
.offer(Some(Chunk.uint8Array(chunk.asInstanceOf[Uint8Array])))
.attempt
.flatMap(e =>
F.delay(
cb(
e.left.toOption.fold[js.Error | Null](null)(e => js.Error(e.getMessage()))
e.left.toOption
.fold[js.Error | Null](null)(e => js.Error(e.getMessage()))
)
)
)
)
}
.setFinal { (writable, cb) =>
.setFinal { (duplex, cb) =>
dispatcher.unsafeRunAndForget(
queue
writeQueue
.offer(None)
.attempt
.flatMap(e =>
F.delay(
cb(
e.left.toOption.fold[js.Error | Null](null)(e => js.Error(e.getMessage()))
e.left.toOption
.fold[js.Error | Null](null)(e => js.Error(e.getMessage()))
)
)
)
)
}
.setDestroy { (writable, err, cb) =>
.setDestroy { (duplex, err, cb) =>
dispatcher.unsafeRunAndForget {
Option(err).fold(F.unit) { err =>
error
.complete(js.JavaScriptException(err))
.attempt
.flatMap(e =>
F.delay(
cb(
e.left.toOption
.fold[js.Error | Null](null)(e => js.Error(e.getMessage()))
)
error
.complete(js.JavaScriptException(err))
.attempt
.flatMap(e =>
F.delay(
cb(
e.left.toOption
.fold[js.Error | Null](null)(e => js.Error(e.getMessage()))
)
)
}
)

}
}
)
}
} { writable =>
F.delay(if (!writable.writableEnded) writable.destroy())
} { duplex =>
F.delay {
val readable = duplex.asInstanceOf[streamMod.Readable]
val writable = duplex.asInstanceOf[streamMod.Writable]
if (!readable.readableEnded | !writable.writableEnded)
readable.destroy()
}
}
stream = Stream
.fromQueueNoneTerminatedChunk(queue)
drainIn = in.enqueueNoneTerminatedChunks(readQueue).drain
out = Stream
.fromQueueNoneTerminatedChunk(writeQueue)
.concurrently(Stream.eval(error.get.flatMap(F.raiseError[Unit])))
} yield (writable.asInstanceOf[Writable], stream)

} yield (duplex.asInstanceOf[Duplex], drainIn.merge(out))
}
43 changes: 31 additions & 12 deletions io/js/src/test/scala/fs2/io/IoPlatformSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,23 +28,42 @@ import org.scalacheck.effect.PropF.forAllF

class IoPlatformSuite extends Fs2Suite {

test("to/from Readable") {
test("to/read Readable") {
forAllF { bytes: Stream[Pure, Byte] =>
toReadable(bytes.covary[IO]).use { readable =>
fromReadable(IO.pure(readable)).compile.toVector.assertEquals(bytes.compile.toVector)
}
bytes
.through(toReadable[IO])
.flatMap { readable =>
readReadable(IO.pure(readable))
}
.compile
.toVector
.assertEquals(bytes.compile.toVector)
}
}

test("mk/from Writable") {
test("read/write Writable") {
forAllF { bytes: Stream[Pure, Byte] =>
mkWritable[IO].use { case (writable, stream) =>
stream
.concurrently(bytes.covary[IO].through(fromWritable(IO.pure(writable))))
.compile
.toVector
.assertEquals(bytes.compile.toVector)
}
readWritable[IO] { writable =>
bytes.covary[IO].through(writeWritable(IO.pure(writable))).compile.drain
}.compile.toVector.assertEquals(bytes.compile.toVector)
}
}

test("toDuplexAndRead") {
forAllF { (bytes1: Stream[Pure, Byte], bytes2: Stream[Pure, Byte]) =>
bytes1
.through {
toDuplexAndRead[IO] { duplex =>
readReadable[IO](IO.pure(duplex))
.merge(bytes2.covary[IO].through(writeWritable[IO](IO.pure(duplex))))
.compile
.toVector
.assertEquals(bytes1.compile.toVector)
}
}
.compile
.toVector
.assertEquals(bytes2.compile.toVector)
}
}

Expand Down