Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 19 additions & 4 deletions core/shared/src/main/scala/fs2/Pull.scala
Original file line number Diff line number Diff line change
Expand Up @@ -113,23 +113,38 @@ object Pull {

private sealed trait AlgebraF[F[_],O] { type f[x] = Algebra[F,O,x] }

/** Result of `acquireCancellable`. */
trait Cancellable[+F[_],+R] {
val cancel: Pull[F,Nothing,Unit]
val resource: R

def map[R2](f: R => R2): Cancellable[F,R2]
}
object Cancellable {
def apply[F[_],R](cancel0: Pull[F,Nothing,Unit], r: R): Cancellable[F,R] = new Cancellable[F,R] {
val cancel = cancel0
val resource = r
def map[R2](f: R => R2): Cancellable[F,R2] = apply(cancel, f(r))
}
}

/**
* Acquire a resource within a `Pull`. The cleanup action will be run at the end
* of the `.close` scope which executes the returned `Pull`. The acquired
* resource is returned as the result value of the pull.
*/
def acquire[F[_],R](r: F[R])(cleanup: R => F[Unit]): Pull[F,Nothing,R] =
acquireCancellable(r)(cleanup).map(_._2)
acquireCancellable(r)(cleanup).map(_.resource)

/**
* Like [[acquire]] but the result value is a tuple consisting of a cancellation
* Like [[acquire]] but the result value consists of a cancellation
* pull and the acquired resource. Running the cancellation pull frees the resource.
* This allows the acquired resource to be released earlier than at the end of the
* containing pull scope.
*/
def acquireCancellable[F[_],R](r: F[R])(cleanup: R => F[Unit]): Pull[F,Nothing,(Pull[F,Nothing,Unit],R)] =
def acquireCancellable[F[_],R](r: F[R])(cleanup: R => F[Unit]): Pull[F,Nothing,Cancellable[F,R]] =
Stream.bracketWithToken(r)(Stream.emit, cleanup).open.flatMap { h => h.await1.flatMap {
case ((token, r), _) => Pull.pure((Pull.release(List(token)), r))
case ((token, r), _) => Pull.pure(Cancellable(Pull.release(List(token)), r))
}}

/**
Expand Down
2 changes: 1 addition & 1 deletion core/shared/src/main/scala/fs2/concurrent.scala
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ object concurrent {
)
} yield gate
}
Pull.acquireCancellable(startInnerStream) { gate => gate.get }.flatMap { case (release, _) => Pull.eval(earlyReleaseRef.setPure(release)) }
Pull.acquireCancellable(startInnerStream) { gate => gate.get }.flatMap { c => Pull.eval(earlyReleaseRef.setPure(c.cancel)) }
}
}

Expand Down
11 changes: 0 additions & 11 deletions io/src/main/scala/fs2/io/file/FileHandle.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,6 @@ trait FileHandle[F[_]] {
/** Opaque type representing an exclusive lock on a file. */
type Lock

/**
* Close the `FileHandle`.
*/
def close(): F[Unit]

/**
* Force any updates for the underlying file to storage.
* @param metaData If true, also attempts to force file metadata updates to storage.
Expand Down Expand Up @@ -104,9 +99,6 @@ private[file] object FileHandle {
new FileHandle[F] {
type Lock = FileLock

override def close(): F[Unit] =
F.delay(chan.close())

override def force(metaData: Boolean): F[Unit] =
F.delay(chan.force(metaData))

Expand Down Expand Up @@ -153,9 +145,6 @@ private[file] object FileHandle {
new FileHandle[F] {
type Lock = FileLock

override def close(): F[Unit] =
F.delay(chan.close())

override def force(metaData: Boolean): F[Unit] =
F.delay(chan.force(metaData))

Expand Down
8 changes: 4 additions & 4 deletions io/src/main/scala/fs2/io/file/file.scala
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,13 @@ package object file {
* Reads all data synchronously from the file at the specified `java.nio.file.Path`.
*/
def readAll[F[_]: Suspendable](path: Path, chunkSize: Int): Stream[F, Byte] =
pulls.fromPath(path, List(StandardOpenOption.READ)).flatMap(pulls.readAllFromFileHandle(chunkSize)).close
pulls.fromPath(path, List(StandardOpenOption.READ)).flatMap(c => pulls.readAllFromFileHandle(chunkSize)(c.resource)).close

/**
* Reads all data asynchronously from the file at the specified `java.nio.file.Path`.
*/
def readAllAsync[F[_]](path: Path, chunkSize: Int, executorService: Option[ExecutorService] = None)(implicit F: Async[F]): Stream[F, Byte] =
pulls.fromPathAsync(path, List(StandardOpenOption.READ), executorService).flatMap(pulls.readAllFromFileHandle(chunkSize)).close
pulls.fromPathAsync(path, List(StandardOpenOption.READ), executorService).flatMap(c => pulls.readAllFromFileHandle(chunkSize)(c.resource)).close

/**
* Writes all data synchronously to the file at the specified `java.nio.file.Path`.
Expand All @@ -47,7 +47,7 @@ package object file {
s => (for {
in <- s.open
out <- pulls.fromPath(path, StandardOpenOption.WRITE :: flags.toList)
_ <- pulls.writeAllToFileHandle(in, out)
_ <- pulls.writeAllToFileHandle(in, out.resource)
} yield ()).close

/**
Expand All @@ -59,7 +59,7 @@ package object file {
s => (for {
in <- s.open
out <- pulls.fromPathAsync(path, StandardOpenOption.WRITE :: flags.toList, executorService)
_ <- _writeAll0(in, out, 0)
_ <- _writeAll0(in, out.resource, 0)
} yield ()).close

private def _writeAll0[F[_]](in: Handle[F, Byte], out: FileHandle[F], offset: Long): Pull[F, Nothing, Unit] = for {
Expand Down
13 changes: 6 additions & 7 deletions io/src/main/scala/fs2/io/file/pulls.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import java.nio.file._
import java.util.concurrent.ExecutorService

import fs2.util.{Async,Suspendable}
import fs2.util.syntax._

/** Provides various `Pull`s for working with files. */
object pulls {
Expand Down Expand Up @@ -49,15 +48,15 @@ object pulls {
*
* The `Pull` closes the acquired `java.nio.channels.FileChannel` when it is done.
*/
def fromPath[F[_]](path: Path, flags: Seq[OpenOption])(implicit F: Suspendable[F]): Pull[F, Nothing, FileHandle[F]] =
def fromPath[F[_]](path: Path, flags: Seq[OpenOption])(implicit F: Suspendable[F]): Pull[F, Nothing, Pull.Cancellable[F, FileHandle[F]]] =
fromFileChannel(F.delay(FileChannel.open(path, flags: _*)))

/**
* Creates a `Pull` which allows asynchronous file operations against the file at the specified `java.nio.file.Path`.
*
* The `Pull` closes the acquired `java.nio.channels.AsynchronousFileChannel` when it is done.
*/
def fromPathAsync[F[_]](path: Path, flags: Seq[OpenOption], executorService: Option[ExecutorService] = None)(implicit F: Async[F]): Pull[F, Nothing, FileHandle[F]] = {
def fromPathAsync[F[_]](path: Path, flags: Seq[OpenOption], executorService: Option[ExecutorService] = None)(implicit F: Async[F]): Pull[F, Nothing, Pull.Cancellable[F, FileHandle[F]]] = {
import collection.JavaConverters._
fromAsynchronousFileChannel(F.delay(AsynchronousFileChannel.open(path, flags.toSet.asJava, executorService.orNull)))
}
Expand All @@ -67,14 +66,14 @@ object pulls {
*
* The `Pull` closes the provided `java.nio.channels.FileChannel` when it is done.
*/
def fromFileChannel[F[_]: Suspendable](channel: F[FileChannel]): Pull[F, Nothing, FileHandle[F]] =
Pull.acquire(channel.map(FileHandle.fromFileChannel[F]))(_.close())
def fromFileChannel[F[_]](channel: F[FileChannel])(implicit F: Suspendable[F]): Pull[F, Nothing, Pull.Cancellable[F, FileHandle[F]]] =
Pull.acquireCancellable(channel)(ch => F.delay(ch.close())).map(_.map(FileHandle.fromFileChannel[F]))

/**
* Given a `java.nio.channels.AsynchronousFileChannel`, will create a `Pull` which allows asynchronous operations against the underlying file.
*
* The `Pull` closes the provided `java.nio.channels.AsynchronousFileChannel` when it is done.
*/
def fromAsynchronousFileChannel[F[_]: Async](channel: F[AsynchronousFileChannel]): Pull[F, Nothing, FileHandle[F]] =
Pull.acquire(channel.map(FileHandle.fromAsynchronousFileChannel[F]))(_.close())
def fromAsynchronousFileChannel[F[_]](channel: F[AsynchronousFileChannel])(implicit F: Async[F]): Pull[F, Nothing, Pull.Cancellable[F, FileHandle[F]]] =
Pull.acquireCancellable(channel)(ch => F.delay(ch.close())).map(_.map(FileHandle.fromAsynchronousFileChannel[F]))
}