Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 16 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@

inThisBuild(Seq(
version := "0.1.0-SNAPSHOT",

Expand Down Expand Up @@ -101,11 +102,25 @@ lazy val rx = project
)
)

lazy val zio = project
.enablePlugins(ScalaJSPlugin)
.dependsOn(colibri)
.in(file("zio"))
.settings(commonSettings, jsSettings)
.settings(
name := "colibri-zio",

libraryDependencies ++= Seq(
"dev.zio" %%% "zio" % "1.0.11",
"dev.zio" %%% "zio-streams" % "1.0.11"
)
)

lazy val root = project
.in(file("."))
.settings(
name := "colibri-root",

skip in publish := true,
)
.aggregate(colibri, monix, rx, router)
.aggregate(colibri, monix, rx, zio, router)
20 changes: 20 additions & 0 deletions zio/src/main/scala/colibri/ext/zio/ops.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package colibri.ext.zio

import _root_.zio.stream.{Stream, UStream, ZSink, ZStream}
import _root_.zio.{Fiber, Chunk, Ref, Runtime, UIO, ZEnv, ZIO, URIO, IO, Task}
import colibri._

object ops {
implicit class RichZStreamCompanion(val unused: ZStream.type) {
def fromSink[R, E, A](fn: ZSink[R, E, A, A, Unit] => IO[E, Unit]): ZStream[R, E, A] = ZStream.effectAsyncM { emit =>
fn(
ZSink.foreach[Any, E, A](value => Task.fromFuture(_ => emit.single(value)).ignore)
.foldM(error => ZSink.fromEffect(Task.fromFuture(_ => emit.fail(error)).ignore), _ => ZSink.drain)
)
}
}

implicit class RichSink[-R, +E, -A, +L, +Z](val stream: ZSink[R, E, A, L, Z]) extends AnyVal {
def redirect
}
}
50 changes: 50 additions & 0 deletions zio/src/main/scala/colibri/ext/zio/package.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package colibri.ext

import _root_.zio.stream.{Stream, UStream, ZSink, ZStream}
import _root_.zio.{Ref, Runtime, UIO, ZEnv, URIO}
import colibri._

package object zio {
type TSink[A] = ZSink[Any, Throwable, A, A, Unit]

// Sink

implicit def zioSinkSink(implicit runtime: Runtime[ZEnv]): Sink[TSink] = new Sink[TSink] {
def onNext[A](sink: TSink[A])(value: A): Unit =
runtime.unsafeRun(UStream(value).run(sink))

def onError[A](sink: TSink[A])(error: Throwable): Unit =
runtime.unsafeRun(ZStream.fail(error).run(sink))
}

implicit object zioSinkLiftSink extends LiftSink[TSink] {
def lift[G[_]: Sink, A](sink: G[A]): TSink[A] = ZSink
.foreach[Any, Throwable, A](elem => UIO(Sink[G].onNext(sink)(elem)))
.foldM(error => ZSink.fromEffect(UIO(Sink[G].onError(sink)(error))), _ => ZSink.drain)
}

// Source

type TStream[A] = ZStream[Any, Throwable, A]

implicit def zioStreamSource(implicit runtime: Runtime[ZEnv]): Source[TStream] = new Source[TStream] {
def subscribe[G[_] : Sink, A](source: TStream[A])(sink: G[_ >: A]): Cancelable = {
val canceler = runtime.unsafeRunToFuture(
source
.onError(cause => UIO(Sink[G].onError(sink)(cause.squash)))
.foreach(value => UIO(Sink[G].onNext(sink)(value)))
)
Cancelable(() => canceler.cancel())
}
}

implicit object zioStreamLiftSource extends LiftSource[TStream] {
override def lift[G[_] : Source, A](source: G[A]): TStream[A] = Stream.effectAsyncInterrupt { emit =>
val cancelable = Source[G].subscribe(source)(new Observer[A] {
override def onNext(value: A): Unit = { emit.single(value); () }
override def onError(error: Throwable): Unit = { emit.fail(error); () }
})
Left(URIO(cancelable.cancel()))
}
}
}
13 changes: 13 additions & 0 deletions zio/src/test/scala/colibri/ZIOOpsSpec.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package colibri

import colibri.ext.zio._
// import colibri.ext.zio.ops._

import zio._
import zio.stream._
import org.scalatest.matchers.should.Matchers
import org.scalatest.flatspec.AsyncFlatSpec

class ZIOOpsSpec extends AsyncFlatSpec with Matchers {

}