Skip to content

Commit b4d852f

Browse files
committed
Update readme
1 parent 517dbed commit b4d852f

File tree

1 file changed

+47
-15
lines changed

1 file changed

+47
-15
lines changed

README.md

Lines changed: 47 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
# RxJavaFiberInterop
2-
Library for interoperation between RxJava 3 and Project Loom's Fibers.
2+
3+
Library for interoperation between RxJava 3 and JDK 21's Virtual Threads (Project Loom).
34

45
<a href='https://github.com/akarnokd/RxJavaFiberInterop/actions?query=workflow%3A%22Java+CI+with+Gradle%22'><img src='https://github.com/akarnokd/RxJavaFiberInterop/workflows/Java%20CI%20with%20Gradle/badge.svg'></a>
56
[![codecov.io](http://codecov.io/github/akarnokd/RxJavaFiberInterop/coverage.svg?branch=master)](http://codecov.io/github/akarnokd/RxJavaFiberInterop?branch=master)
@@ -12,22 +13,43 @@ dependencies {
1213
}
1314
```
1415

15-
Always requires the latest Loom build from [https://jdk.java.net/21/](https://jdk.java.net/21/)
16-
17-
*Note that Loom is in early access and the API, naming and usage keeps changing, a lot.*
16+
Requires a JDK that has Virtual Threads as standard feature (i.e., not preview), such as [https://jdk.java.net/21/](https://jdk.java.net/21/).
1817

1918
# Components
2019

2120
## FiberInterop
2221

22+
### Schedulers
23+
24+
Currently, the Virtual Thread API does not offer public means to specify the carrier thread(pool) thus it is not possible to use RxJava `Scheduler`s as such.
25+
26+
You can use the `Schedulers.from` though to convert the Fork-Join-pool backed standard Virtual Thread Executor into an RxJava `Scheduler`:
27+
28+
```java
29+
var vte = Executors.newVirtualThreadExecutor();
30+
Scheduler vtScheduler = Schedulers.from(vte);
31+
32+
// sometime later
33+
vte.close();
34+
```
35+
36+
You can then use `vtScheduler` from the example with `subscribeOn` and `observeOn` to let traditional functional callbacks to block virtually:
37+
38+
```
39+
Observable.fromCallable(() -> someBlockingNetworkCall())
40+
.subscribeOn(vtScheduler)
41+
.observeOn(vtScheduler)
42+
.map(v -> someOtherBlockingCall(v))
43+
.observeOn(uiThread)
44+
.subscribe(v -> label.setText(v), e -> label.setText(e.toString()));
45+
```
46+
47+
:information_source: You need the special operators below to make RxJava's non-blocking backpressure into virtually blocked backpressure.
48+
2349
### create
2450

2551
Creates a `Flowable` from a generator callback, that can emit via `FiberEmitter`, run on an `ExecutorService` provided by the user and
26-
is suspended automatically upon backpressure.
27-
28-
:warning: Since version 0.0.15 of this library and with the current latest Loom preview 18b5, there is no public way to
29-
specify a carrier thread beneath a virtual thread executor anymore. Therefore, the single-argument and `Scheduler`-argumented overloads
30-
of `create` have been removed.
52+
is suspended automatically upon backpressure. The callback is executed inside the virtual thread thus you can call the usual blocking APIs and get suspensions the same way.
3153

3254
The created `Flowable` will complete once the callback returns normally or with an error if the callback throws an exception.
3355

@@ -46,12 +68,7 @@ try (var scope = Executors.newVirtualThreadExecutor()) {
4668

4769
### transform
4870

49-
Transforms each upstream value via a callback that can emit zero or more values for each of those upstream values, run on an `ExecutorService` provided by the user and
50-
is suspended automatically upon backpressure.
51-
52-
:warning: Since version 0.0.15 of this library and with the current latest Loom preview 18b5, there is no public way to
53-
specify a carrier thread beneath a virtual thread executor anymore. Therefore, the single-argument and `Scheduler`-argumented overloads
54-
of `create` have been removed.
71+
Transforms each upstream value via a callback that can emit zero or more values for each of those upstream values, run on an `ExecutorService` provided by the user and is suspended automatically upon backpressure. The callback is executed inside the virtual thread thus you can call the usual blocking APIs and get suspensions the same way.
5572

5673
```java
5774
try (var scope = Executors.newVirtualThreadExecutor()) {
@@ -65,3 +82,18 @@ try (var scope = Executors.newVirtualThreadExecutor()) {
6582
.assertResult(1, 2, 2, 3, 3, 4, 4, 5, 5, 6);
6683
}
6784
```
85+
86+
### blockingXXX
87+
88+
RxJava uses `java.util.concurrent` locks and `CountDownLatches` via its `blockingXXX` which will automatically work within a virtual thread. Therefore, there is no need for a separate interop operator. Just block.
89+
90+
```java
91+
try (var scope = Executors.newVirtualThreadExecutor()) {
92+
scope.submit(() -> {
93+
var v = Flowable.just(1)
94+
.delay(1, TimeUnit.SECONDS)
95+
.blockingLast();
96+
System.out.println(v);
97+
});
98+
}
99+
```

0 commit comments

Comments
 (0)