Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 8 additions & 11 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,22 +7,22 @@ Colibri is an implementation of the `Observable`, `Observer` and `Subject` conce

Usage:
```scala
libraryDependencies += "com.github.cornerman" %%% "colibri" % "0.1.2"
libraryDependencies += "com.github.cornerman" %%% "colibri" % "0.3.0"
```

For monix support:
For scala.rx support (only Scala 2.x):
```scala
libraryDependencies += "com.github.cornerman" %%% "colibri-monix" % "0.1.2"
libraryDependencies += "com.github.cornerman" %%% "colibri-rx" % "0.3.0"
```

For scala.rx support (only Scala 2.x):
For airstream support:
```scala
libraryDependencies += "com.github.cornerman" %%% "colibri-rx" % "0.1.2"
libraryDependencies += "com.github.cornerman" %%% "colibri-airstream" % "0.3.0"
```

For airstream support:
For zio support (effects only):
```scala
libraryDependencies += "com.github.cornerman" %%% "colibri-airstream" % "0.1.2"
libraryDependencies += "com.github.cornerman" %%% "colibri-zio" % "0.3.0"
```

This library includes a minimal frp library and typeclasses for streaming.
Expand All @@ -34,8 +34,6 @@ We have prepared typeclasses for integrating other streaming libaries:
- `SubscriptionOwner[T]` can let type `T` own a subscription
- `LiftSink[G[_]]` can lift a `Sink` into type `G`
- `LiftSource[H[_]]` can lift a `Source` into type `H`
- `CreateSubject[GH[_]]` how to create subject in `GH`
- `CreateProHandler[GH[_,_]]` how to create subject in `GH` which has differnt input/output types.

Most important here are `Sink` and `Source`. `Source` is a typeclass for Observables, `Sink` is a typeclass for Observers.

Expand All @@ -46,9 +44,8 @@ Throughout the library the type parameters for the `Sink` and `Source` typeclass

Source Code: [Source.scala](colibri/src/main/scala/colibri/Source.scala), [Sink.scala](colibri/src/main/scala/colibri/Sink.scala)

[Implementation for Monix](monix/src/main/scala/colibri/ext/monix/package.scala)

[Implementation for Rx](rx/src/main/scala/colibri/ext/rx/package.scala)

[Implementation for Airstream](airstream/src/main/scala/colibri/ext/airstream/package.scala)

[Implementation for ZIO](zio/src/main/scala/colibri/ext/zio/package.scala)
10 changes: 5 additions & 5 deletions airstream/src/main/scala/colibri/ext/airstream/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,20 @@ package object airstream {

// Sink
implicit object airstreamObserverSink extends colibri.Sink[Observer] {
def onNext[A](sink: Observer[A])(value: A): Unit = sink.onNext(value)
def onError[A](sink: Observer[A])(error: Throwable): Unit = sink.onError(error)
def unsafeOnNext[A](sink: Observer[A])(value: A): Unit = sink.onNext(value)
def unsafeOnError[A](sink: Observer[A])(error: Throwable): Unit = sink.onError(error)
}

// Source
implicit object airstreamObservableSource extends colibri.Source[Observable] {
def subscribe[A](stream: Observable[A])(sink: colibri.Observer[A]): colibri.Cancelable = {
val sub = stream.addObserver(Observer.withRecover(sink.onNext, { case t => sink.onError(t) }))(NoopOwner)
def unsafeSubscribe[A](stream: Observable[A])(sink: colibri.Observer[A]): colibri.Cancelable = {
val sub = stream.addObserver(Observer.withRecover(sink.unsafeOnNext, { case t => sink.unsafeOnError(t) }))(NoopOwner)
colibri.Cancelable(sub.kill)
}
}

// Cancelable
implicit object airstreamSubscriptionCanCancel extends colibri.CanCancel[Subscription] {
def cancel(subscription: Subscription) = subscription.kill()
def unsafeCancel(subscription: Subscription) = subscription.kill()
}
}
32 changes: 17 additions & 15 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,14 @@ inThisBuild(
),
)


lazy val commonSettings = Seq(
crossScalaVersions := Seq("2.12.15", "2.13.8", "3.1.1"),
scalaVersion := "2.13.8",
libraryDependencies ++= Seq(
"org.scalatest" %%% "scalatest" % "3.2.11" % Test,
),
/* scalacOptions --= Seq("-Xfatal-warnings"), // overwrite option from https://github.com/DavidGregory084/sbt-tpolecat */
)

lazy val colibri = project
Expand All @@ -39,9 +41,8 @@ lazy val colibri = project
.settings(
name := "colibri",
libraryDependencies ++= Seq(
"org.typelevel" %%% "cats-core" % "2.7.0",
"org.typelevel" %%% "cats-effect" % "2.5.4",
),
"org.typelevel" %%% "cats-effect" % "3.3.8",
)
)

lazy val jsdom = project
Expand All @@ -67,18 +68,6 @@ lazy val router = project
),
)

lazy val monix = project
.enablePlugins(ScalaJSPlugin)
.dependsOn(colibri)
.in(file("monix"))
.settings(commonSettings)
.settings(
name := "colibri-monix",
libraryDependencies ++= Seq(
"io.monix" %%% "monix" % "3.4.0",
),
)

lazy val rx = project
.enablePlugins(ScalaJSPlugin)
.dependsOn(colibri)
Expand All @@ -103,3 +92,16 @@ lazy val airstream = project
"com.raquo" %%% "airstream" % "0.14.2"
),
)

lazy val zio = project
.enablePlugins(ScalaJSPlugin)
.dependsOn(colibri)
.in(file("zio"))
.settings(commonSettings)
.settings(
name := "colibri-zio",
libraryDependencies ++= Seq(
"dev.zio" %%% "zio" % "1.0.12",
"io.github.cquiroz" %%% "scala-java-time" % "2.3.0"
)
)
7 changes: 5 additions & 2 deletions colibri/src/main/scala/colibri/CanCancel.scala
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
package colibri

trait CanCancel[-T] {
def cancel(cancelable: T): Unit
def unsafeCancel(cancelable: T): Unit

@deprecated("Use unsafeCancel instead", "0.2.7")
@inline final def cancel(cancelable: T): Unit = unsafeCancel(cancelable)
}
object CanCancel {
@inline def apply[T](implicit cancel: CanCancel[T]): CanCancel[T] = cancel
@inline def apply[T](implicit unsafeCancel: CanCancel[T]): CanCancel[T] = unsafeCancel
}
85 changes: 48 additions & 37 deletions colibri/src/main/scala/colibri/Cancelable.scala
Original file line number Diff line number Diff line change
@@ -1,12 +1,20 @@
package colibri

import cats.Monoid
import cats.effect.{IO, Sync, SyncIO}
import cats.implicits._

import scala.scalajs.js

trait Cancelable {
def cancel(): Unit
def unsafeCancel(): Unit

@deprecated("Use unsafeCancel() instead", "0.2.7")
@inline final def cancel(): Unit = unsafeCancel()

final def cancelF[F[_] : Sync]: F[Unit] = Sync[F].delay(unsafeCancel())
final def cancelIO: IO[Unit] = cancelF[IO]
final def cancelSyncIO: SyncIO[Unit] = cancelF[SyncIO]
}
object Cancelable {

Expand All @@ -15,42 +23,45 @@ object Cancelable {
@inline def combine(a: Cancelable, b: Cancelable) = Cancelable.composite(a, b)
}

implicit object cancelCancelable extends CanCancel[Cancelable] {
@inline def cancel(subscription: Cancelable): Unit = subscription.cancel()
implicit object unsafeCancelCancelable extends CanCancel[Cancelable] {
@inline def unsafeCancel(subscription: Cancelable): Unit = subscription.unsafeCancel()
}

class Builder extends Cancelable {
private var buffer = new js.Array[Cancelable]()

def +=(subscription: Cancelable): Unit =
if (buffer == null) {
subscription.cancel()
} else {
buffer.push(subscription)
()
}
def +=(subscription: () => Cancelable): Unit = if (buffer != null) {
val cancelable = subscription()
buffer.push(cancelable)
()
}

def cancel(): Unit =
def unsafeCancel(): Unit =
if (buffer != null) {
buffer.foreach(_.cancel())
buffer.foreach(_.unsafeCancel())
buffer = null
}
}

class Variable extends Cancelable {
private var current: Cancelable = Cancelable.empty

def update(subscription: Cancelable): Unit =
if (current == null) {
subscription.cancel()
} else {
current.cancel()
current = subscription
def update(subscription: () => Cancelable): Unit = if (current != null) {
current.unsafeCancel()

var isCancel = false
current = Cancelable { () =>
isCancel = true
}

def cancel(): Unit =
val cancelable = subscription()
if (isCancel) cancelable.unsafeCancel()
else current = cancelable
}

def unsafeCancel(): Unit =
if (current != null) {
current.cancel()
current.unsafeCancel()
current = null
}
}
Expand All @@ -60,14 +71,14 @@ object Cancelable {
private var subscriptions: js.Array[() => Cancelable] = new js.Array[() => Cancelable]

def switch(): Unit = if (latest != null) {
latest.cancel()
latest.unsafeCancel()
latest = null
if (subscriptions != null && subscriptions.nonEmpty) {
val nextCancelable = subscriptions(0)
val variable = Cancelable.variable()
latest = variable
subscriptions.splice(0, deleteCount = 1)
variable() = nextCancelable()
variable() = nextCancelable
()
}
}
Expand All @@ -76,17 +87,17 @@ object Cancelable {
if (latest == null) {
val variable = Cancelable.variable()
latest = variable
variable() = subscription()
variable() = subscription
} else {
subscriptions.push(subscription)
()
}
}

def cancel(): Unit = if (subscriptions != null) {
def unsafeCancel(): Unit = if (subscriptions != null) {
subscriptions = null
if (latest != null) {
latest.cancel()
latest.unsafeCancel()
latest = null
}
}
Expand All @@ -97,20 +108,20 @@ object Cancelable {
private var isCancel = false

def done(): Unit = if (latest != null) {
latest.cancel()
latest.unsafeCancel()
latest = null
}

def update(subscription: () => Cancelable): Unit = if (latest == null) {
val variable = Cancelable.variable()
latest = variable
variable() = subscription()
variable() = subscription
}

def cancel(): Unit = if (!isCancel) {
def unsafeCancel(): Unit = if (!isCancel) {
isCancel = true
if (latest != null) {
latest.cancel()
latest.unsafeCancel()
latest = null
}
}
Expand All @@ -130,46 +141,46 @@ object Cancelable {
Cancelable({ () =>
counter -= 1
if (counter == 0) {
currentCancelable.cancel()
currentCancelable.unsafeCancel()
currentCancelable = null
}
})
}

def cancel(): Unit = {
def unsafeCancel(): Unit = {
counter = -1
if (currentCancelable != null) {
currentCancelable.cancel()
currentCancelable.unsafeCancel()
currentCancelable = null
}
}
}

object Empty extends Cancelable {
@inline def cancel(): Unit = ()
@inline def unsafeCancel(): Unit = ()
}

@inline def empty = Empty

@inline def apply(f: () => Unit): Cancelable = new Cancelable {
private var isCanceled = false
@inline def cancel() = if (!isCanceled) {
@inline def unsafeCancel() = if (!isCanceled) {
isCanceled = true
f()
}
}

@inline def lift[T: CanCancel](subscription: T): Cancelable = subscription match {
case cancelable: Cancelable => cancelable
case _ => apply(() => CanCancel[T].cancel(subscription))
case _ => apply(() => CanCancel[T].unsafeCancel(subscription))
}

@inline def composite(subscriptions: Cancelable*): Cancelable = compositeFromIterable(subscriptions)
@inline def compositeFromIterable(subscriptions: Iterable[Cancelable]): Cancelable = {
def compositeFromIterable(subscriptions: Iterable[Cancelable]): Cancelable = {
val nonEmptySubscriptions = subscriptions.filter(_ != Cancelable.empty)
if (nonEmptySubscriptions.isEmpty) Cancelable.empty
else new Cancelable {
def cancel() = nonEmptySubscriptions.foreach(_.cancel())
def unsafeCancel() = nonEmptySubscriptions.foreach(_.unsafeCancel())
}
}

Expand Down
Loading