Skip to content

Commit 64ef511

Browse files
authored
Merge pull request #811 from mpilquist/topic/cancellable-files
Changed io.file.pulls API to return cancellable file handles
2 parents f258c2b + ac13c53 commit 64ef511

File tree

5 files changed

+30
-27
lines changed

5 files changed

+30
-27
lines changed

core/shared/src/main/scala/fs2/Pull.scala

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -113,23 +113,38 @@ object Pull {
113113

114114
private sealed trait AlgebraF[F[_],O] { type f[x] = Algebra[F,O,x] }
115115

116+
/** Result of `acquireCancellable`. */
117+
trait Cancellable[+F[_],+R] {
118+
val cancel: Pull[F,Nothing,Unit]
119+
val resource: R
120+
121+
def map[R2](f: R => R2): Cancellable[F,R2]
122+
}
123+
object Cancellable {
124+
def apply[F[_],R](cancel0: Pull[F,Nothing,Unit], r: R): Cancellable[F,R] = new Cancellable[F,R] {
125+
val cancel = cancel0
126+
val resource = r
127+
def map[R2](f: R => R2): Cancellable[F,R2] = apply(cancel, f(r))
128+
}
129+
}
130+
116131
/**
117132
* Acquire a resource within a `Pull`. The cleanup action will be run at the end
118133
* of the `.close` scope which executes the returned `Pull`. The acquired
119134
* resource is returned as the result value of the pull.
120135
*/
121136
def acquire[F[_],R](r: F[R])(cleanup: R => F[Unit]): Pull[F,Nothing,R] =
122-
acquireCancellable(r)(cleanup).map(_._2)
137+
acquireCancellable(r)(cleanup).map(_.resource)
123138

124139
/**
125-
* Like [[acquire]] but the result value is a tuple consisting of a cancellation
140+
* Like [[acquire]] but the result value consists of a cancellation
126141
* pull and the acquired resource. Running the cancellation pull frees the resource.
127142
* This allows the acquired resource to be released earlier than at the end of the
128143
* containing pull scope.
129144
*/
130-
def acquireCancellable[F[_],R](r: F[R])(cleanup: R => F[Unit]): Pull[F,Nothing,(Pull[F,Nothing,Unit],R)] =
145+
def acquireCancellable[F[_],R](r: F[R])(cleanup: R => F[Unit]): Pull[F,Nothing,Cancellable[F,R]] =
131146
Stream.bracketWithToken(r)(Stream.emit, cleanup).open.flatMap { h => h.await1.flatMap {
132-
case ((token, r), _) => Pull.pure((Pull.release(List(token)), r))
147+
case ((token, r), _) => Pull.pure(Cancellable(Pull.release(List(token)), r))
133148
}}
134149

135150
/**

core/shared/src/main/scala/fs2/concurrent.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ object concurrent {
6262
)
6363
} yield gate
6464
}
65-
Pull.acquireCancellable(startInnerStream) { gate => gate.get }.flatMap { case (release, _) => Pull.eval(earlyReleaseRef.setPure(release)) }
65+
Pull.acquireCancellable(startInnerStream) { gate => gate.get }.flatMap { c => Pull.eval(earlyReleaseRef.setPure(c.cancel)) }
6666
}
6767
}
6868

io/src/main/scala/fs2/io/file/FileHandle.scala

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,6 @@ trait FileHandle[F[_]] {
1818
/** Opaque type representing an exclusive lock on a file. */
1919
type Lock
2020

21-
/**
22-
* Close the `FileHandle`.
23-
*/
24-
def close(): F[Unit]
25-
2621
/**
2722
* Force any updates for the underlying file to storage.
2823
* @param metaData If true, also attempts to force file metadata updates to storage.
@@ -104,9 +99,6 @@ private[file] object FileHandle {
10499
new FileHandle[F] {
105100
type Lock = FileLock
106101

107-
override def close(): F[Unit] =
108-
F.delay(chan.close())
109-
110102
override def force(metaData: Boolean): F[Unit] =
111103
F.delay(chan.force(metaData))
112104

@@ -153,9 +145,6 @@ private[file] object FileHandle {
153145
new FileHandle[F] {
154146
type Lock = FileLock
155147

156-
override def close(): F[Unit] =
157-
F.delay(chan.close())
158-
159148
override def force(metaData: Boolean): F[Unit] =
160149
F.delay(chan.force(metaData))
161150

io/src/main/scala/fs2/io/file/file.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,13 +30,13 @@ package object file {
3030
* Reads all data synchronously from the file at the specified `java.nio.file.Path`.
3131
*/
3232
def readAll[F[_]: Suspendable](path: Path, chunkSize: Int): Stream[F, Byte] =
33-
pulls.fromPath(path, List(StandardOpenOption.READ)).flatMap(pulls.readAllFromFileHandle(chunkSize)).close
33+
pulls.fromPath(path, List(StandardOpenOption.READ)).flatMap(c => pulls.readAllFromFileHandle(chunkSize)(c.resource)).close
3434

3535
/**
3636
* Reads all data asynchronously from the file at the specified `java.nio.file.Path`.
3737
*/
3838
def readAllAsync[F[_]](path: Path, chunkSize: Int, executorService: Option[ExecutorService] = None)(implicit F: Async[F]): Stream[F, Byte] =
39-
pulls.fromPathAsync(path, List(StandardOpenOption.READ), executorService).flatMap(pulls.readAllFromFileHandle(chunkSize)).close
39+
pulls.fromPathAsync(path, List(StandardOpenOption.READ), executorService).flatMap(c => pulls.readAllFromFileHandle(chunkSize)(c.resource)).close
4040

4141
/**
4242
* Writes all data synchronously to the file at the specified `java.nio.file.Path`.
@@ -47,7 +47,7 @@ package object file {
4747
s => (for {
4848
in <- s.open
4949
out <- pulls.fromPath(path, StandardOpenOption.WRITE :: flags.toList)
50-
_ <- pulls.writeAllToFileHandle(in, out)
50+
_ <- pulls.writeAllToFileHandle(in, out.resource)
5151
} yield ()).close
5252

5353
/**
@@ -59,7 +59,7 @@ package object file {
5959
s => (for {
6060
in <- s.open
6161
out <- pulls.fromPathAsync(path, StandardOpenOption.WRITE :: flags.toList, executorService)
62-
_ <- _writeAll0(in, out, 0)
62+
_ <- _writeAll0(in, out.resource, 0)
6363
} yield ()).close
6464

6565
private def _writeAll0[F[_]](in: Handle[F, Byte], out: FileHandle[F], offset: Long): Pull[F, Nothing, Unit] = for {

io/src/main/scala/fs2/io/file/pulls.scala

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ import java.nio.file._
77
import java.util.concurrent.ExecutorService
88

99
import fs2.util.{Async,Suspendable}
10-
import fs2.util.syntax._
1110

1211
/** Provides various `Pull`s for working with files. */
1312
object pulls {
@@ -49,15 +48,15 @@ object pulls {
4948
*
5049
* The `Pull` closes the acquired `java.nio.channels.FileChannel` when it is done.
5150
*/
52-
def fromPath[F[_]](path: Path, flags: Seq[OpenOption])(implicit F: Suspendable[F]): Pull[F, Nothing, FileHandle[F]] =
51+
def fromPath[F[_]](path: Path, flags: Seq[OpenOption])(implicit F: Suspendable[F]): Pull[F, Nothing, Pull.Cancellable[F, FileHandle[F]]] =
5352
fromFileChannel(F.delay(FileChannel.open(path, flags: _*)))
5453

5554
/**
5655
* Creates a `Pull` which allows asynchronous file operations against the file at the specified `java.nio.file.Path`.
5756
*
5857
* The `Pull` closes the acquired `java.nio.channels.AsynchronousFileChannel` when it is done.
5958
*/
60-
def fromPathAsync[F[_]](path: Path, flags: Seq[OpenOption], executorService: Option[ExecutorService] = None)(implicit F: Async[F]): Pull[F, Nothing, FileHandle[F]] = {
59+
def fromPathAsync[F[_]](path: Path, flags: Seq[OpenOption], executorService: Option[ExecutorService] = None)(implicit F: Async[F]): Pull[F, Nothing, Pull.Cancellable[F, FileHandle[F]]] = {
6160
import collection.JavaConverters._
6261
fromAsynchronousFileChannel(F.delay(AsynchronousFileChannel.open(path, flags.toSet.asJava, executorService.orNull)))
6362
}
@@ -67,14 +66,14 @@ object pulls {
6766
*
6867
* The `Pull` closes the provided `java.nio.channels.FileChannel` when it is done.
6968
*/
70-
def fromFileChannel[F[_]: Suspendable](channel: F[FileChannel]): Pull[F, Nothing, FileHandle[F]] =
71-
Pull.acquire(channel.map(FileHandle.fromFileChannel[F]))(_.close())
69+
def fromFileChannel[F[_]](channel: F[FileChannel])(implicit F: Suspendable[F]): Pull[F, Nothing, Pull.Cancellable[F, FileHandle[F]]] =
70+
Pull.acquireCancellable(channel)(ch => F.delay(ch.close())).map(_.map(FileHandle.fromFileChannel[F]))
7271

7372
/**
7473
* Given a `java.nio.channels.AsynchronousFileChannel`, will create a `Pull` which allows asynchronous operations against the underlying file.
7574
*
7675
* The `Pull` closes the provided `java.nio.channels.AsynchronousFileChannel` when it is done.
7776
*/
78-
def fromAsynchronousFileChannel[F[_]: Async](channel: F[AsynchronousFileChannel]): Pull[F, Nothing, FileHandle[F]] =
79-
Pull.acquire(channel.map(FileHandle.fromAsynchronousFileChannel[F]))(_.close())
77+
def fromAsynchronousFileChannel[F[_]](channel: F[AsynchronousFileChannel])(implicit F: Async[F]): Pull[F, Nothing, Pull.Cancellable[F, FileHandle[F]]] =
78+
Pull.acquireCancellable(channel)(ch => F.delay(ch.close())).map(_.map(FileHandle.fromAsynchronousFileChannel[F]))
8079
}

0 commit comments

Comments
 (0)