Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ lazy val commonSettings = Seq(
"org.scalatest" %%% "scalatest" % "3.2.17" % Test,
),
/* scalacOptions --= Seq("-Xfatal-warnings"), // overwrite option from https://github.com/DavidGregory084/sbt-tpolecat */
Test / scalacOptions --= Seq("-Wnonunit-statement"), // otherwise there is a warning for every test-assertion
)

lazy val colibri = project
Expand Down
12 changes: 5 additions & 7 deletions colibri/src/main/scala/colibri/Cancelable.scala
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ object Cancelable {
def unsafeAdd(subscription: () => Cancelable): Unit = if (buffer != null) {
val cancelable = subscription()
if (buffer == null) cancelable.unsafeCancel()
else buffer.push(cancelable)
else buffer.push(cancelable): Unit
()
}

Expand Down Expand Up @@ -106,7 +106,7 @@ object Cancelable {
val nextCancelable = subscriptions(0)
val variable = Cancelable.variable()
latest = variable
subscriptions.splice(0, deleteCount = 1)
subscriptions.splice(0, deleteCount = 1): Unit
variable.unsafeAdd(nextCancelable)
variable.unsafeFreeze()
()
Expand All @@ -124,8 +124,7 @@ object Cancelable {
variable.unsafeAdd(subscription)
variable.unsafeFreeze()
} else {
subscriptions.push(subscription)
()
subscriptions.push(subscription): Unit
}
}

Expand Down Expand Up @@ -231,11 +230,10 @@ object Cancelable {
}

def unsafeAdd(subscription: () => Cancelable): Unit = if (buffer != null) {
buffer.push(subscription)
buffer.push(subscription): Unit
if (currentCancelables != null) {
currentCancelables.push(subscription())
currentCancelables.push(subscription()): Unit
}
()
}

def unsafeFreeze(): Unit = {
Expand Down
6 changes: 3 additions & 3 deletions colibri/src/main/scala/colibri/Observable.scala
Original file line number Diff line number Diff line change
Expand Up @@ -417,7 +417,7 @@ object Observable {
Observer.create(
{ input =>
val index = tasks.length
tasks.push(null)
tasks.push(null): Unit

val effect = f(input)

Expand Down Expand Up @@ -943,7 +943,7 @@ object Observable {
source.unsafeSubscribe(
Observer.create[A](
{ value =>
seqA.push(value)
seqA.push(value): Unit
send()
},
sink.unsafeOnError,
Expand All @@ -952,7 +952,7 @@ object Observable {
sourceB.unsafeSubscribe(
Observer.create[B](
{ value =>
seqB.push(value)
seqB.push(value): Unit
send()
},
sink.unsafeOnError,
Expand Down
6 changes: 3 additions & 3 deletions colibri/src/main/scala/colibri/Observer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@ object Observer {

def createFromPromise[A](promise: Promise[A]): Observer[A] =
new Observer[A] {
def unsafeOnNext(value: A): Unit = { promise.trySuccess(value); () }
def unsafeOnError(error: Throwable): Unit = { promise.tryFailure(error); () }
def unsafeOnNext(value: A): Unit = promise.trySuccess(value): Unit
def unsafeOnError(error: Throwable): Unit = promise.tryFailure(error): Unit
}

def product[A, B](fa: Observer[A], fb: Observer[B]): Observer[(A, B)] = new Observer[(A, B)] {
Expand Down Expand Up @@ -133,7 +133,7 @@ object Observer {
}

def contracollect[B](f: PartialFunction[B, A]): Observer[B] = new Observer[B] {
def unsafeOnNext(value: B): Unit = recovered({ f.runWith(sink.unsafeOnNext)(value); () }, unsafeOnError)
def unsafeOnNext(value: B): Unit = recovered(f.runWith(sink.unsafeOnNext)(value): Unit, unsafeOnError)
def unsafeOnError(error: Throwable): Unit = sink.unsafeOnError(error)
}

Expand Down
2 changes: 1 addition & 1 deletion colibri/src/main/scala/colibri/Subject.scala
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ final class PublishSubject[A] extends Observer[A] with Observable[A] {

def unsafeSubscribe(sink: Observer[A]): Cancelable = {
val observer = Observer.lift(sink)
subscribers.push(observer)
subscribers.push(observer): Unit
Cancelable { () =>
if (isRunning) subscribers = JSArrayHelper.removeElementCopied(subscribers)(observer)
else JSArrayHelper.removeElement(subscribers)(observer)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,7 @@ private[colibri] object RunEffectExecution {

Cancelable.withIsEmpty(isCancel) { () =>
isCancel = true
cancelRun()
()
cancelRun(): Unit
}
}
}
5 changes: 2 additions & 3 deletions colibri/src/main/scala/colibri/helpers/JSArrayHelper.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,7 @@ import scala.scalajs.js
private[colibri] object JSArrayHelper {
def removeElement[T](array: js.Array[T])(element: T): Unit = {
val index = array.indexOf(element)
if (index != -1) array.splice(index, deleteCount = 1)
()
if (index != -1) array.splice(index, deleteCount = 1): Unit
}

def removeElementCopied[T](array: js.Array[T])(element: T): js.Array[T] = {
Expand All @@ -15,7 +14,7 @@ private[colibri] object JSArrayHelper {
val newArray = js.Array[T]()
var i = 0
while (i < array.length) {
if (i != index) newArray.push(array(i))
if (i != index) newArray.push(array(i)): Unit
i += 1
}
newArray
Expand Down
2 changes: 1 addition & 1 deletion project/plugins.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,6 @@ addSbtPlugin("ch.epfl.scala" % "sbt-scalajs-bundler" % "0.21.1")
addSbtPlugin("com.github.sbt" % "sbt-ci-release" % "1.5.12")

// sane scalac options
addSbtPlugin("io.github.davidgregory084" % "sbt-tpolecat" % "0.3.1")
addSbtPlugin("io.github.davidgregory084" % "sbt-tpolecat" % "0.4.4")

addSbtPlugin("com.thoughtworks.sbt-scala-js-map" % "sbt-scala-js-map" % "4.1.1")
25 changes: 10 additions & 15 deletions zio/src/main/scala/colibri/ext/zio/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -47,30 +47,25 @@ package object zio extends ZioLowPrio {
}

private final class SinkZIOWithRuntime[Env](runtime: Runtime[Env]) extends Sink[zio.RSink[Env, *]] {
override def unsafeOnNext[A](sink: zio.RSink[Env, A])(value: A): Unit = {
Unsafe.unsafe(implicit u => runtime.unsafe.run(ZStream.succeed(value).run(sink)))
()
}
override def unsafeOnNext[A](sink: zio.RSink[Env, A])(value: A): Unit =
Unsafe.unsafe(implicit u => runtime.unsafe.run(ZStream.succeed(value).run(sink)).getOrThrow())

override def unsafeOnError[A](sink: zio.RSink[Env, A])(error: Throwable): Unit = {
Unsafe.unsafe(implicit u => runtime.unsafe.run(ZStream.fail(error).run(sink)))
()
}
override def unsafeOnError[A](sink: zio.RSink[Env, A])(error: Throwable): Unit =
Unsafe.unsafe(implicit u => runtime.unsafe.run(ZStream.fail(error).run(sink)).getOrThrow())
}

private final class SourceZIOWithRuntime[Env](runtime: Runtime[Env]) extends Source[zio.RStream[Env, *]] {
override def unsafeSubscribe[A](source: zio.RStream[Env, A])(sink: Observer[A]): Cancelable = {
val canceler = Unsafe.unsafe(implicit u =>
runtime.unsafe.runToFuture(
Unsafe.unsafe { implicit u =>
val canceler = runtime.unsafe.runToFuture(
source
.onError(cause => ZIO.succeed(sink.unsafeOnError(cause.squash)))
.foreach(value => ZIO.succeed(sink.unsafeOnNext(value))),
),
)
)

Cancelable.withIsEmpty(canceler.isCompleted) { () =>
canceler.cancel()
()
Cancelable.withIsEmpty(canceler.isCompleted) { () =>
canceler.cancel(): Unit
}
}
}
}
Expand Down