-
Notifications
You must be signed in to change notification settings - Fork 1.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Provide abstraction for cold streams #254
Comments
Let me add here that we are looking at an asynchronous analogue of a standard Kotlin's
and this code is perfectly lazy in the sense that |
Hi, It provides 2 cold producers (Reactor inspiration) : As they are reactive objects, they start producing items only when a client calls a Terminal (final/consuming) operation function. Reactivity is a multiplatform project, with common and platform specific tests and extensions. Providing for example platform specific Solo.toPromise in JS, Solo.toCompletableFuture or Stream.toMulti in JVM (in JDK8 project). I would be really happy if Reactivity can save time, or serve as source of inspiration for this issue |
The biggest issue with It shipped with Implementing custom operators is easy, just add an extension function. Unfortunately,
Ultimately, my use case is that I need a lightweight RxJava for scenarios where I can't use RxJava but want a reactive-state based architecture, and I am hoping co-routines can solve this problem. |
@ScottPierce We can open a separate issue about |
@pull-vert I'm not a big fan of
They are all different ways, but do we really need an abstraction for that last one? Consider this definition:
Isn't this functional type the Let me also quote Eric Meijer's tweet here: https://twitter.com/headinthebox/status/971492821151580160 |
I'd prefer a real type for a few reasons. The These types can and will leak into Java and I'd much prefer a semantically named type than an arbitrary complex function type that only the Kotlin metadata can disambiguate. Ignoring the fact that there's a strong possibility these types will be partially usable from Java, the ability to do simple things like A Eric's tweet does not make sense. Single is a factory of a coroutine/async+awaitable. Your typealias is evidence of this. And since the context of the tweet was Java, when we rasterize the typealias into a proper type (as the Java language requires) and desugar coroutines into their underlying form of callbacks (as the Java language requires) we're left with the exact shape of Rx's single so they're actually exactly the same thing. |
I question the very need of To illustrate. Instead of:
write
Instead of:
do
you can continue this example with On the other hand, we need a dedicated abstraction for asynchronous streams of values simply because there is no other way to represent it in Kotlin, but via a separate type. |
Fair point. I tend to think in a world where strong interop is a must. For
the core coroutines library I'd be happy with a single abstraction on a
multi-value suspending source.
…On Thu, Mar 15, 2018 at 9:59 AM Roman Elizarov ***@***.***> wrote:
I question the very need of Single/Solo. The typealias digression was
just to demonstrate what this concept actually does. I question the very
use-cases for Single/Solo. Why would you ever use it? Why would you need
a dedicated type to denote a reference to a computation that reruns every
time you ask for it. Why would you need this level of indirection? Your
code will be much easier to understand if you use suspending functions
directly.
To illustrate. Instead of:
interface A { fun doSomething(): Single<T> }
write
interface A { suspend fun doSomething(): T }
Instead of:
fun foo() = // return Single<R>
doSomething().map { it.transform() }
do
suspend fun foo() = // return R
doSomething().transform()
you can continue this example with flatMap, etc, etc. The code that does
not use Single/Solo will be invariably more direct and easier to
understand.
On the other hand, we need a dedicated abstraction for asynchronous
streams of values simply because there is no other way to represent it in
Kotlin, but via a separate type.
—
You are receiving this because you commented.
Reply to this email directly, view it on GitHub
<#254 (comment)>,
or mute the thread
<https://github.com/notifications/unsubscribe-auth/AAEEETtozZbVXUPMhCWfONpMXxbvREgyks5tenPQgaJpZM4SNhnC>
.
|
Isn't this suspend fun foo() = // return R
doSomething().transform() eagerly executing I'd think that given a |
Yes, but you wouldn't invoke the method until you needed the value (i.e., when you'd otherwise subscribe). When it's suspend functions all the way down you don't need the abstraction because you can just call the suspending function over and over. |
I like the Solo = Single value cold emmitter because it can provide specific Operators, and some intermediate (= cold/not consuming) Operators on Multi can return a Solo : fun <E> Solo<E>.merge(vararg others: Solo<E>): Multi<E>
inline fun <E, R> Multi<E>.reduce(initial: R, crossinline operation: (acc: R, E) -> R):): Solo<R>
fun <E> Multi<E>.first(): Solo<E> See https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html to see some Operators of Flux that return Mono. After that some Terminal (final/consuming) Solo specific extensions I mentioned before are useful : fun <T> Solo<T>.toPromise(): Promise<T>
fun <E> Solo<E>.toCompletableFuture(
coroutineContext: CoroutineContext = DefaultDispatcher
): CompletableFuture<E> |
The point above is that operators on a single async value and operators
which narrow a multi-async stream into a single value can be simple,
imperative suspending functions.
Conversion to a Promise or CF is straightforward when all you have is
suspending functions.
The most interesting case is multiple singles to a multi. It's conceptually
easy (we already do concatMap all the time often without realizing it) but
I'm curious how terse it can be made syntactically.
…On Thu, Mar 15, 2018, 7:29 PM pull-vert ***@***.***> wrote:
I like the Solo = Single value cold emmitter because it can provide
specific Operators, and some intermediate (= cold/not consuming) Operators
on Multi can return a Solo :
fun <E> Solo<E>.merge(vararg others: Solo<E>): Multi<E>
inline fun <E, R> Multi<E>.reduce(initial: R, crossinline operation: (acc: R, E) -> R):): Solo<R>
fun <E> Multi<E>.first(): Solo<E>
See
https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html
to see some Operators of Flux that return Mono.
After that some Terminal (final/consuming) Solo specific extensions I
mentioned before are useful :
fun <T> Solo<T>.toPromise(): Promise<T>
fun <E> Solo<E>.toCompletableFuture(
coroutineContext: CoroutineContext = DefaultDispatcher
): CompletableFuture<E>
—
You are receiving this because you commented.
Reply to this email directly, view it on GitHub
<#254 (comment)>,
or mute the thread
<https://github.com/notifications/unsubscribe-auth/AAEEES6Bbp6kpWjmTomRcUT-CFrM2w8vks5tevlYgaJpZM4SNhnC>
.
|
Hi, I played with Solo, using a type alias can lead to some interesting collateral effects. typealias AsyncCallable<T> = suspend () -> T
fun <E> AsyncCallable<E>.toCompletableFuture(
coroutineContext: CoroutineContext = DefaultDispatcher
): CompletableFuture<E> = future(coroutineContext) { invoke() }
fun main(vararg args: String) = runBlocking {
val deferred = async { "Hello" }
val deferredFuture = deferred::await.toCompletableFuture()
} Then I tried to implement the interface AsyncSource<E> {
suspend fun consumeEach(consumer: suspend (E) -> Unit)
}
fun <E> buildAsyncSource(builder: suspend AsyncSourceBuilder<E>.() -> Unit): AsyncSource<E> = TODO()
interface AsyncSourceBuilder<E> {
suspend fun yield(item: E)
suspend fun yieldAll(items: AsyncSource<E>) = items.consumeEach { yield(it) }
}
suspend fun <E> AsyncSource<E>.concatMap(block: suspend (E) -> AsyncSource<E>): AsyncSource<E> =
buildAsyncSource {
this@concatMap.consumeEach { item ->
yieldAll(block(item))
}
} |
@pull-vert In
@JakeWharton I don't think we need any kind of "multiple singles to a multi" (
Here we want to get a list of users, then get a profile for each user. In reactive Java world we'd define
Note, that sometimes you'd want to perform this |
@elizarov I understand and agree your example for terminal/consuming Operations. I would prefer the resulting Solo/Single value to be cold and not consuming, to be able to chain Operators until the first terminal/consuming Operator (can have multiple consumers, each will receive all elements). For exemple a stupid Reactive Operator chain Multi
.fromValues("one", "two", "three")
.filter { it.startsWith("t") } // Intermediate cold = returns Multi with "two" and "three"
.first() // Intermediate cold = returns a Mono with "two"
.concatMap { it.splitAndDelay(15)} // Intermediate cold = returns a Multi with "t", "w" and "o" (adding a 15ms delay between each send)
.consumeEach { println(it)} // terminal/consuming operation printing "t", "w" then "o" |
@pull-vert In a world of channels and cold streams we might have the following function defined in some API:
It performs an operation to subscribe to the data stream (establishing network connection, etc) and returns a channel of strings flowing from the server. It is hot. There is no much reason to defer the actual subscription until the terminal operation. It is better to "fail-fast" (return an error immediately) if we have a problem establishing communication channel to the server. The actual processing of this hot data stream would use cold channels in intermediate steps, like this:
There is no much need for special "glue" operators that work on And that is whole beauty. You can take a whole synchronous data processing pipeline with intermediate operators like |
@elizarov Thanks for this clarification I now understand your vision for introducing cold Intermediate Operators to Channels, not using new Coroutine for each Operator with a lot better performance than actual. I am still a bit confused by this chaining of mixed hot and cold Operators, it will require good documentation to be sure in what state (hot instantly usable or cold that will require terminal consuming operator) we are at every step of the chain. If I understand your example delayEach or map cold intermediate Operator will have to be declared as Extension on kotlinx ReceiveChannel, java 8 Stream, kotlin Sequence, kotlin Collection, kotlin Array, maybe more. |
@pull-vert The mental model is simple. There are no cold operators. You just "setup your pipeline" (that is what cold operators do) that is later executed by the terminal operator. Since every chain of operators ultimately ends in a terminal operator, you don't really care what happens in between. It just works. With respect to the kind of operators we need to provide, we have huge advantage over reactive libraries because we only need to define basic ones like |
@elizarov Thanks again now I see and understand exactly the direction for this cold stream operators topic for kotlinx-coroutines. I will continue enrich my pure cold reactive library for fun (strongly based on kotlinx-coroutines), and of course follow what is going on here! |
As an alternative we are developing Reaktive library: https://github.com/badoo/Reaktive |
That's really neat! Do you plan to add support for suspend functions and the concurrent operators? I need my |
Thanks for feedback 😊 suspend functions are not planned and there are already a lot of operators like combineLatest, zip, merge, sample, debounce, throttle. I'll put takeUntill into the queue. You can also open an issue for that to keep track 😉 |
@fvasco If you remove the delayEach, it crashes: val firstFlow = flowOf(1, 2, 3, 4).delayEach(10)
val secondFlow = flowOf("a", "b", "c").delayEach(15) You don't end up with the latest values:
|
Good catch @PaulWoitaschek! Don't use it, I'm not interested to engage myself on this issue. |
I think it would be nice if Then one could always safely start a coroutine from the
|
@jcornaz Making or not making So far we are leaning to a performance-conscious choice -- keep simple operators faster, and it is Ok if more complex operators have to use |
@jcornaz I not agree with you, it is possible to build a This is why I am reluctant of the |
@elizarov. It makes sense, thanks for the explanation.
I agree.
I think it is as easy/hard to build for both, and with the same benefits/problems (like the order of the output not being guaranteed). The only difference is that On a completely different topic: Here is what I'd expect: @FlowPreview
public interface FlowCollector<in T> {
public suspend fun emit(value: T)
} But here is what we have: @FlowPreview
public interface FlowCollector<T> {
public suspend fun emit(value: T)
} |
It's an oversight, I will fix it in What about No new operators will be introduced to channels, existing ones will be eventually deprecated and hidden. |
Ok for the
EDIT: Actually, in the case of a |
@jcornaz I will answer instead of Roman. This is actually a good idea, thanks for pointing it out. It is hard to track ideas and feature requests under this issue, we will close it right after 1.2.0. I've created #1081 to track progress on that |
Some personal consideration regarding
count is reached there is no reason to increment skipped .
Should Lastly in |
Closing this one as fixed in
A good example of how things are different in |
Say then that you renamed |
In some non-suspending reactive libraries,
In this library, the two operators still have the first distinction, but not the second. RxJava, for example, has a I think this naming is a good choice for the library because the cardinality difference is the more important one – operators like |
I'm not sure I follow, you can easily come up with examples that jump threads, for example In other libraries, assuming you're following the spec, you cannot fail or jump threads on
My small concern here is that by making Maybe I'm being too picky about it 🤪but there are precedents where this distinction mattered in other languages. |
That's why he said it has first but not second. It's one-to-one but not
synchronous.
Anyway the suspending map is more like concatMap, not flatMap. You rarely
actually want one-to-many flatMap (which is like mergeMap).
…On Sat, Apr 13, 2019, 12:53 PM Paco ***@***.***> wrote:
In this library, the two operators still have the first distinction, but
not the second.
map { async(CommonPool) { 1 }.await() } is asynchronous
—
You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
<#254 (comment)>,
or mute the thread
<https://github.com/notifications/unsubscribe-auth/AAEEEYbiEl-RItwVsggeJEgYLHniKHVsks5vggtwgaJpZM4SNhnC>
.
|
|
I realize this response is a bit late, but just wanted to get some clarification on @elizarov's comment above:
I don't see how |
@zach-klippenstein |
This commit introduces Coroutines support for `DatabaseClient` functional API via Kotlin extensions that provide suspendable functions prefixed by `await` for `Mono` based APIs. Extensions for `Flux` will be added when Kotlin/kotlinx.coroutines#254 will be fixed. It also provides `asType<Foo>()` extensions useful for Reactive API as well. Original pull request: #63.
This commit introduces Coroutines support for `DatabaseClient` functional API via Kotlin extensions that provide suspendable functions prefixed by `await` for `Mono` based APIs. Extensions for `Flux` will be added when Kotlin/kotlinx.coroutines#254 will be fixed. It also provides `asType<Foo>()` extensions useful for Reactive API as well. Original pull request: #63.
All the currently provided channel abstractions in
kotlinx.coroutines
are hot. The data is being produced regardless of the presence of subscriber. This is good for data sources and applications that are inherently hot, like incoming network and UI-events.However, hot streams are not an ideal solution for cases where data stream is produced on demand. Consider, for example, the following simple code that produces
ReceiveChannel<Int>
:One obvious downside is the
computeNextValue()
is invoked beforesend
, so even when receiver is not ready, the next value gets computed. Of course, it gets suspended insend
if there is no receiver, but it is not as lazy as you get with cold reactive Publisher/Observable/Flowable/Flux/Flow.We need the abstraction for cold streams in
kotlinx.coroutines
that is going to be just as lazy, computing data in "push" mode versus "pull" mode of hot channels that we have now.There are the following related discussions:
The text was updated successfully, but these errors were encountered: