Library for interoperation between RxJava 3 and Project Loom's Fibers.
dependencies {
implementation "com.github.akarnokd:rxjava3-fiber-interop:0.0.16"
}
Always requires the latest Loom build from https://jdk.java.net/loom/
Note that Loom is in early access and the API, naming and usage keeps changing, a lot.
Creates a Flowable
from a generator callback, that can emit via FiberEmitter
, run on an ExecutorService
provided by the user and
is suspended automatically upon backpressure.
Scheduler
-argumented overloads
of create
have been removed.
The created Flowable
will complete once the callback returns normally or with an error if the callback throws an exception.
try (var scope = Executors.newVirtualThreadExecutor()) {
FiberInterop.create(emitter -> {
for (int i = 1; i <= 5; i++) {
emitter.emit(1);
}
}, scope)
.test()
.awaitDone(5, TimeUnit.SECONDS)
.assertResult(1, 2, 3, 4, 5);
}
Transforms each upstream value via a callback that can emit zero or more values for each of those upstream values, run on an ExecutorService
provided by the user and
is suspended automatically upon backpressure.
Scheduler
-argumented overloads
of create
have been removed.
try (var scope = Executors.newVirtualThreadExecutor()) {
Flowable.range(1, 5)
.compose(FiberInterop.transform((value, emitter) -> {
emitter.emit(value);
emitter.emit(value + 1);
}, scope))
.test()
.awaitDone(5, TimeUnit.SECONDS)
.assertResult(1, 2, 2, 3, 3, 4, 4, 5, 5, 6);
}