1
1
/*
2
- * Copyright 2019 David Karnok
2
+ * Copyright 2019-Present David Karnok
3
3
*
4
4
* Licensed under the Apache License, Version 2.0 (the "License");
5
5
* you may not use this file except in compliance with the License.
19
19
import java .util .Objects ;
20
20
import java .util .concurrent .*;
21
21
22
+ import io .reactivex .rxjava3 .annotations .BackpressureKind ;
23
+ import io .reactivex .rxjava3 .annotations .BackpressureSupport ;
24
+ import io .reactivex .rxjava3 .annotations .SchedulerSupport ;
22
25
import io .reactivex .rxjava3 .core .*;
23
26
import io .reactivex .rxjava3 .internal .functions .ObjectHelper ;
24
27
import io .reactivex .rxjava3 .plugins .RxJavaPlugins ;
25
28
26
29
/**
27
- * Sources, transformers and consumers working with fiber-based suspendable methods.
28
- *
30
+ * Sources and transformers working with
31
+ * virtual thread-based executors that allow
32
+ * efficient blocking via suspensions and resumptions.
33
+ * <p>
34
+ * Examples:
35
+ * <pre><code>
36
+ * try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
37
+ * FiberInterop.<Integer>create(emitter -> {
38
+ * for (int i = 0; i < 10; i++) {
39
+ * Thread.sleep(1000);
40
+ * emitter.emit(i);
41
+ * }
42
+ * }, executor)
43
+ * .subscribe(
44
+ * System.out::println,
45
+ * Throwable::printStackTrace,
46
+ * () -> System.out.println("Done")
47
+ * );
48
+ * }
49
+ * </code></pre>
29
50
* @since 0.0.1
30
51
*/
31
52
public final class FiberInterop {
@@ -35,20 +56,70 @@ private FiberInterop() {
35
56
throw new IllegalStateException ("No instances!" );
36
57
}
37
58
59
+ /**
60
+ * Construct a {@link Flowable} and use the given {@code generator}
61
+ * to generate items on demand while running on the given {@link ExecutorService}.
62
+ * <p>
63
+ * Note that backpressure is handled via blocking so it is recommended the provided
64
+ * {@code ExecutorService} uses virtual threads, such as the one returned by
65
+ * {@link Executors#newVirtualThreadPerTaskExecutor()}.
66
+ * @param <T> the element type to emit
67
+ * @param generator the callback used to generate items on demand by the downstream
68
+ * @param executor the target {@code ExecutorService} to use for running the callback
69
+ * @return the new {@code Flowable} instance
70
+ */
71
+ @ BackpressureSupport (BackpressureKind .FULL )
72
+ @ SchedulerSupport (SchedulerSupport .CUSTOM )
73
+ @ SuppressWarnings ("preview" )
38
74
public static <T > Flowable <T > create (FiberGenerator <T > generator , ExecutorService executor ) {
39
75
Objects .requireNonNull (generator , "generator is null" );
40
76
Objects .requireNonNull (executor , "executor is null" );
41
77
return RxJavaPlugins .onAssembly (new FlowableCreateFiberExecutor <>(generator , executor ));
42
78
}
43
79
44
- public static <T , R > FlowableTransformer <T , R > transform (FiberTransformer <T , R > transformer , ExecutorService scheduler ) {
45
- return transform (transformer , scheduler , Flowable .bufferSize ());
80
+ /**
81
+ * Construct a transformer to be used via {@link Flowable#compose(FlowableTransformer)}
82
+ * which can turn an upstream item into zero or more downstream values by running
83
+ * on the given {@link ExecutorService}.
84
+ * <p>
85
+ * Note that backpressure is handled via blocking so it is recommended the provided
86
+ * {@code ExecutorService} uses virtual threads, such as the one returned by
87
+ * {@link Executors#newVirtualThreadPerTaskExecutor()}.
88
+ * @param <T> the upstream element type
89
+ * @param <R> the downstream element type
90
+ * @param transformer the callback whose {@link FiberTransformer#transform(Object, FiberEmitter)} is invoked for each upstream item
91
+ * @param executor the target {@code ExecutorService} to use for running the callback
92
+ * @return the new {@code FlowableTransformer} instance
93
+ */
94
+ @ BackpressureSupport (BackpressureKind .FULL )
95
+ @ SchedulerSupport (SchedulerSupport .CUSTOM )
96
+ @ SuppressWarnings ("preview" )
97
+ public static <T , R > FlowableTransformer <T , R > transform (FiberTransformer <T , R > transformer , ExecutorService executor ) {
98
+ return transform (transformer , executor , Flowable .bufferSize ());
46
99
}
47
100
48
- public static <T , R > FlowableTransformer <T , R > transform (FiberTransformer <T , R > transformer , ExecutorService scheduler , int prefetch ) {
101
+ /**
102
+ * Construct a transformer to be used via {@link Flowable#compose(FlowableTransformer)}
103
+ * which can turn an upstream item into zero or more downstream values by running
104
+ * on the given {@link ExecutorService}.
105
+ * <p>
106
+ * Note that backpressure is handled via blocking so it is recommended the provided
107
+ * {@code ExecutorService} uses virtual threads, such as the one returned by
108
+ * {@link Executors#newVirtualThreadPerTaskExecutor()}.
109
+ * @param <T> the upstream element type
110
+ * @param <R> the downstream element type
111
+ * @param transformer the callback whose {@link FiberTransformer#transform(Object, FiberEmitter)} is invoked for each upstream item
112
+ * @param executor the target {@code ExecutorService} to use for running the callback
113
+ * @param prefetch the number of items to fetch from the upstream.
114
+ * @return the new {@code FlowableTransformer} instance
115
+ */
116
+ @ BackpressureSupport (BackpressureKind .FULL )
117
+ @ SchedulerSupport (SchedulerSupport .CUSTOM )
118
+ @ SuppressWarnings ("preview" )
119
+ public static <T , R > FlowableTransformer <T , R > transform (FiberTransformer <T , R > transformer , ExecutorService executor , int prefetch ) {
49
120
Objects .requireNonNull (transformer , "transformer is null" );
50
- Objects .requireNonNull (scheduler , "scheduler is null" );
121
+ Objects .requireNonNull (executor , "executor is null" );
51
122
ObjectHelper .verifyPositive (prefetch , "prefetch" );
52
- return new FlowableTransformFiberExecutor <>(null , transformer , scheduler , prefetch );
123
+ return new FlowableTransformFiberExecutor <>(null , transformer , executor , prefetch );
53
124
}
54
125
}
0 commit comments