Skip to content

Commit

Permalink
Fix typos
Browse files Browse the repository at this point in the history
  • Loading branch information
nktpro committed Nov 7, 2019
1 parent 310929e commit 7b6a019
Show file tree
Hide file tree
Showing 9 changed files with 39 additions and 40 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -141,8 +141,8 @@ Akka Streams married to ZIO, which allows utilization of ZIO's effect instead of

- `effectMapAsync`: equivalent to [mapAsync](https://doc.akka.io/docs/akka/current/stream/operators/Source-or-Flow/mapAsync.html#mapasync)
- `effectMapAsyncUnordered`: equivalent to [mapAsyncUnordered](https://doc.akka.io/docs/akka/current/stream/operators/Source-or-Flow/mapAsyncUnordered.html#mapasyncunordered)
- `interruptableEffectMapAsync`: similar to `effectMapAsync` but will interrupt the pending effects when the stream completes. This guarantees no leaks, which is a common problem with mid-stream Futures.
- `interruptableEffectMapAsyncUnordered`
- `interruptibleEffectMapAsync`: similar to `effectMapAsync` but will interrupt the pending effects when the stream completes. This guarantees no leaks, which is a common problem with mid-stream Futures.
- `interruptibleEffectMapAsyncUnordered`
- `switchFlatMapConcat`: Akka Streams does not have an equivalence of Rx's [switchMap](https://www.learnrxjs.io/operators/transformation/switchmap.html), which enables cancellation of the outstanding sub-stream and switching to the new one. This fills in the gap nicely.

To use those, simply import:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,13 +132,13 @@ object Dstreams extends LoggingContext {
requestBuilder
.invoke(Source.fromFutureSource(promise.future).mapMaterializedValue(_ => NotUsed))
.viaMat(KillSwitches.single)(Keep.right)
.via(ZAkkaStreams.interruptableMapAsync(1) { assignment: Req =>
.via(ZAkkaStreams.interruptibleMapAsync(1) { assignment: Req =>
makeSource(assignment).map(s => promise.success(s)) *> Task.fromFuture(_ => promise.future)
})
.toMat(Sink.ignore)(Keep.both)
}

ZAkkaStreams.interruptableGraph(graph, graceful = true)
ZAkkaStreams.interruptibleGraph(graph, graceful = true)
}

def workPool[Req, Res, R <: AkkaEnv](
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ object AkkaStreamInterruptionSampleApp extends AkkaApp {
protected def createEnv(untypedConfig: Config) = ZManaged.environment[AkkaApp.Env]

def run: ZIO[Env, Throwable, Unit] = {
val stream = ZAkkaStreams.interruptableGraph(
val stream = ZAkkaStreams.interruptibleGraph(
ZIO.access[LogEnv] { env =>
Source(1 to 10)
.throttle(1, 1.second)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,10 +224,9 @@ object DeleteIntensiveDbBenchApp extends AkkaApp {
ret <- dbApi
.columnFamily(dbMat.fact)
.batchGetByKeysTask(keys)
} yield
ret.iterator.collect {
case Some(pair) => pair
}.toList
} yield ret.iterator.collect {
case Some(pair) => pair
}.toList

val par = measure(deleteTask, metrics.purgeDuration) zipParRight measure(batchGetTask, metrics.getDuration)

Expand Down Expand Up @@ -319,7 +318,7 @@ object DeleteIntensiveDbBenchApp extends AkkaApp {
)

populateThenPurgeFib <- ZAkkaStreams
.interruptableGraph(
.interruptibleGraph(
ZIO.succeed {
populateSource
.via(dequeueFlow)
Expand All @@ -334,7 +333,7 @@ object DeleteIntensiveDbBenchApp extends AkkaApp {
.fork

responseFib <- ZAkkaStreams
.interruptableGraph(
.interruptibleGraph(
ZIO.succeed(
responseSource
.viaMat(KillSwitches.single)(Keep.right)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ object DstreamSampleApp extends AkkaApp {

Source(1 to Int.MaxValue)
.map(Assignment(_))
.via(ZAkkaStreams.interruptableMapAsyncUnordered(12) { assignment: Assignment =>
.via(ZAkkaStreams.interruptibleMapAsyncUnordered(12) { assignment: Assignment =>
Dstreams
.distribute(assignment) { result: WorkResult[Result] =>
Task.fromFuture { _ =>
Expand All @@ -70,7 +70,7 @@ object DstreamSampleApp extends AkkaApp {
.mapMaterializedValue(f => (ks, f))
}

ZAkkaStreams.interruptableGraph(graphTask, graceful = true)
ZAkkaStreams.interruptibleGraph(graphTask, graceful = true)
}

protected def runWorker(client: DstreamSampleAppClient, id: Int) = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ object KvdbTestSampleApp extends AkkaApp {
)
defaultCf = dbApi.columnFamily(sampleDb.default)
tailFiber <- ZAkkaStreams
.interruptableGraph(
.interruptibleGraph(
ZIO.access[AkkaEnv with LogEnv] { env =>
val log = env.logger

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ object ZAkkaStreams {
}
}

def interruptableGraph[R, A](
def interruptibleGraph[R, A](
make: RIO[R, RunnableGraph[(KillSwitch, Future[A])]],
graceful: Boolean
)(implicit ctx: LogCtx): RIO[AkkaEnv with LogEnv with R, A] = {
Expand All @@ -90,7 +90,7 @@ object ZAkkaStreams {
task.onInterrupt(
UIO {
if (graceful) ks.shutdown()
else ks.abort(new InterruptedException("Stream (interruptableGraph) was interrupted"))
else ks.abort(new InterruptedException("Stream (interruptibleGraph) was interrupted"))
} *> task.fold(
e =>
env.logger
Expand All @@ -102,7 +102,7 @@ object ZAkkaStreams {
}
}

def interruptableMapAsyncM[R, A, B](
def interruptibleMapAsyncM[R, A, B](
parallelism: Int
)(runTask: A => RIO[R, B]): ZIO[AkkaEnv with R, Nothing, Flow[A, B, Future[NotUsed]]] = {
ZIO.runtime[AkkaEnv with R].map { rt =>
Expand All @@ -117,14 +117,14 @@ object ZAkkaStreams {
Future.successful(
Flow[A]
.mapAsync(parallelism) { a =>
val interruptableTask = for {
val interruptibleTask = for {
fib <- runTask(a).fork
c <- (completionPromise.await *> fib.interrupt).fork
ret <- fib.join
_ <- c.interrupt
} yield ret

rt.unsafeRunToFuture(interruptableTask.provide(env))
rt.unsafeRunToFuture(interruptibleTask.provide(env))
}
.watchTermination() { (_, f) =>
f.onComplete { _ =>
Expand All @@ -138,13 +138,13 @@ object ZAkkaStreams {
}
}

def interruptableMapAsync[R, A, B](
def interruptibleMapAsync[R, A, B](
parallelism: Int
)(runTask: A => RIO[R, B])(implicit rt: zio.Runtime[AkkaEnv with R]): Flow[A, B, Future[NotUsed]] = {
rt.unsafeRun(interruptableMapAsyncM(parallelism)(runTask).provide(rt.Environment))
rt.unsafeRun(interruptibleMapAsyncM(parallelism)(runTask).provide(rt.Environment))
}

def interruptableMapAsyncUnorderedM[R, A, B](
def interruptibleMapAsyncUnorderedM[R, A, B](
parallelism: Int,
attributes: Option[Attributes] = None
)(runTask: A => RIO[R, B]): ZIO[AkkaEnv with R, Nothing, Flow[A, B, Future[NotUsed]]] = {
Expand All @@ -160,14 +160,14 @@ object ZAkkaStreams {

val flow = Flow[A]
.mapAsyncUnordered(parallelism) { a =>
val interruptableTask = for {
val interruptibleTask = for {
fib <- runTask(a).fork
c <- (interruption *> fib.interrupt).fork
ret <- fib.join
_ <- c.interrupt
} yield ret

rt.unsafeRunToFuture(interruptableTask.provide(env))
rt.unsafeRunToFuture(interruptibleTask.provide(env))
}

val flowWithAttrs = attributes.fold(flow)(attrs => flow.withAttributes(attrs))
Expand All @@ -186,7 +186,7 @@ object ZAkkaStreams {
}
}

def interruptableLazySource[R, A, B](effect: RIO[R, B]): ZIO[AkkaEnv with R, Nothing, Source[B, Future[NotUsed]]] = {
def interruptibleLazySource[R, A, B](effect: RIO[R, B]): ZIO[AkkaEnv with R, Nothing, Source[B, Future[NotUsed]]] = {
ZIO.runtime[AkkaEnv with R].map { rt =>
val env = rt.Environment
val akkaService = env.akkaService
Expand All @@ -196,15 +196,15 @@ object ZAkkaStreams {
.lazily(() => {
val completionPromise = rt.unsafeRun(zio.Promise.make[Nothing, Unit])

val interruptableTask = for {
val interruptibleTask = for {
fib <- effect.fork
c <- (completionPromise.await *> fib.interrupt).fork
ret <- fib.join
_ <- c.interrupt
} yield ret

Source
.fromFuture(rt.unsafeRunToFuture(interruptableTask.provide(env)))
.fromFuture(rt.unsafeRunToFuture(interruptibleTask.provide(env)))
.watchTermination() { (_, f) =>
f.onComplete { _ =>
val _ = rt.unsafeRun(completionPromise.succeed(()))
Expand Down Expand Up @@ -250,11 +250,11 @@ object ZAkkaStreams {
rt.unsafeRun(switchFlatMapConcatM[R, In, Out](f).provide(rt.Environment))
}

def interruptableMapAsyncUnordered[R, A, B](
def interruptibleMapAsyncUnordered[R, A, B](
parallelism: Int,
attributes: Option[Attributes] = None
)(runTask: A => RIO[R, B])(implicit rt: zio.Runtime[AkkaEnv with R]): Flow[A, B, Future[NotUsed]] = {
rt.unsafeRun(interruptableMapAsyncUnorderedM(parallelism, attributes)(runTask).provide(rt.Environment))
rt.unsafeRun(interruptibleMapAsyncUnorderedM(parallelism, attributes)(runTask).provide(rt.Environment))
}

def mapAsyncM[R, A, B](
Expand Down Expand Up @@ -313,18 +313,18 @@ object ZAkkaStreams {
.via(mapAsyncUnordered[R, Out, Next](parallelism)(runTask))
}

def interruptableEffectMapAsync[R, Next](
def interruptibleEffectMapAsync[R, Next](
parallelism: Int
)(runTask: Out => RIO[R, Next])(implicit rt: zio.Runtime[AkkaEnv with R]): Flow[In, Next, Mat] = {
flow
.via(interruptableMapAsync[R, Out, Next](parallelism)(runTask))
.via(interruptibleMapAsync[R, Out, Next](parallelism)(runTask))
}

def interruptableEffectMapAsyncUnordered[R, Next](
def interruptibleEffectMapAsyncUnordered[R, Next](
parallelism: Int
)(runTask: Out => RIO[R, Next])(implicit rt: zio.Runtime[AkkaEnv with R]): Flow[In, Next, Mat] = {
flow
.via(interruptableMapAsyncUnordered[R, Out, Next](parallelism)(runTask))
.via(interruptibleMapAsyncUnordered[R, Out, Next](parallelism)(runTask))
}

def switchFlatMapConcat[R, Next](
Expand All @@ -350,18 +350,18 @@ object ZAkkaStreams {
.via(mapAsyncUnordered[R, Out, Next](parallelism)(runTask))
}

def interruptableEffectMapAsync[R, Next](
def interruptibleEffectMapAsync[R, Next](
parallelism: Int
)(runTask: Out => RIO[R, Next])(implicit rt: zio.Runtime[AkkaEnv with R]): Source[Next, Mat] = {
source
.via(interruptableMapAsync[R, Out, Next](parallelism)(runTask))
.via(interruptibleMapAsync[R, Out, Next](parallelism)(runTask))
}

def interruptableEffectMapAsyncUnordered[R, Next](
def interruptibleEffectMapAsyncUnordered[R, Next](
parallelism: Int
)(runTask: Out => RIO[R, Next])(implicit rt: zio.Runtime[AkkaEnv with R]): Source[Next, Mat] = {
source
.via(interruptableMapAsyncUnordered[R, Out, Next](parallelism)(runTask))
.via(interruptibleMapAsyncUnordered[R, Out, Next](parallelism)(runTask))
}

def switchFlatMapConcat[R, Next](
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,15 +52,15 @@ final class ZAkkaStreamsTest
rt.unsafeRunToFuture(test.provide(rt.Environment))
}

"interruptableLazySource" should {
"interruptibleLazySource" should {
"interrupt effect" in withEffect {
for {
clock <- ZIO.access[TestClock](c => new ManualClock(Some(c)))
startP <- zio.Promise.make[Nothing, Unit]
startFuture <- startP.await.toFuture
interruptedP <- zio.Promise.make[Nothing, Unit]
interruptedFuture <- interruptedP.await.toFuture
source <- ZAkkaStreams.interruptableLazySource {
source <- ZAkkaStreams.interruptibleLazySource {
startP.succeed(()) *> ZIO
.succeed(1)
.delay(zio.duration.Duration(3, TimeUnit.SECONDS))
Expand Down
2 changes: 1 addition & 1 deletion project/Build.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import sbt.Keys._

//noinspection TypeAnnotation
object Build {
val buildVersion = "1.34.10"
val buildVersion = "1.35.0"

lazy val ITest = config("it") extend Test

Expand Down

0 comments on commit 7b6a019

Please sign in to comment.