Skip to content

Commit

Permalink
Added summarized combinator to ZSink which returns start time and s…
Browse files Browse the repository at this point in the history
…top time. (zio#6818)

* Added `stopWatched` combinator to return start time and stop time.

* Implemented `summarized` in ZSink.

* Fixed error type

* Match the comment in v2.

* Tweak

* More tweak

* Cleaned up type signature.
  • Loading branch information
ithinkicancode authored May 26, 2022
1 parent a187097 commit ba94542
Showing 1 changed file with 39 additions and 6 deletions.
45 changes: 39 additions & 6 deletions streams/shared/src/main/scala/zio/stream/ZSink.scala
Original file line number Diff line number Diff line change
Expand Up @@ -303,18 +303,40 @@ abstract class ZSink[-R, +E, -I, +L, +Z] private (
} yield push)

/**
* Returns the sink that executes this one and times its execution.
* Summarize a sink by running an effect when the sink starts and again when
* it completes
*/
final def timed: ZSink[R with Clock, E, I, L, (Z, Duration)] =
final def summarized[R1 <: R, E1 >: E, B, C](
summary: ZIO[R1, E1, B]
)(f: (B, B) => C): ZSink[R1, E1, I, L, (Z, C)] =
ZSink {
self.push.zipWith(clock.nanoTime.toManaged_) { (push, start) =>
self.push.zipWith(summary.either.toManaged_) { (push, start) =>
push(_).catchAll {
case (Left(e), leftover) => Push.fail(e, leftover)
case (Right(z), leftover) => clock.nanoTime.flatMap(stop => Push.emit(z -> (stop - start).nanos, leftover))
case (Left(e), leftover) =>
Push.fail(start.fold(identity, _ => e), leftover)

case (Right(z), leftover) =>
start match {
case Left(e) =>
Push.fail(e, leftover)
case Right(start) =>
summary.foldM(
e => Push.fail(e, leftover),
end => Push.emit((z, f(start, end)), leftover)
)
}
}
}
}

/**
* Returns the sink that executes this one and times its execution.
*/
final def timed: ZSink[R with Clock, E, I, L, (Z, Duration)] =
summarized(
clock.nanoTime
)((start, stop) => (stop - start).nanos)

/**
* Converts this sink to a transducer that feeds incoming elements to the sink
* and emits the sink's results as outputs. The sink will be restarted when it
Expand Down Expand Up @@ -958,10 +980,21 @@ object ZSink extends ZSinkPlatformSpecificConstructors {
} yield push
}

/**
* A generalized version of `timed`.
*/
def summarized[R, E, B, C](summary: ZIO[R, E, B])(f: (B, B) => C): ZSink[R, E, Any, Nothing, C] =
ZSink.drain
.summarized(summary)(f)
.map { case (_, c) => c }

/**
* A sink with timed execution.
*/
def timed: ZSink[Clock, Nothing, Any, Nothing, Duration] = ZSink.drain.timed.map(_._2)
def timed: ZSink[Clock, Nothing, Any, Nothing, Duration] =
summarized(
clock.nanoTime
)((start, stop) => (stop - start).nanos)

final class AccessSinkPartiallyApplied[R](private val dummy: Boolean = true) extends AnyVal {
def apply[E, I, L, Z](f: R => ZSink[R, E, I, L, Z]): ZSink[R, E, I, L, Z] =
Expand Down

0 comments on commit ba94542

Please sign in to comment.