-
Notifications
You must be signed in to change notification settings - Fork 1.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add Subscribable interfaces #274
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should provide builders for SubscribableValue
and SubscribableVariable
.
} | ||
} | ||
} | ||
set(value) { | ||
sendBlocking(value) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
offer
should be enough.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
offer
would never throw any exception. But I could do if (!offer(value)) throw IllegalStateException()
.
Or do we prefer to silently fail if the broadcast channel is closed ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are you sure?
Line 201 in fc6e31e
override fun offer(element: E): Boolean { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes sorry, I was wrong. I'll use offer
. At least I learned something :-)
public interface SubscribableVariable<T> : SubscribableValue<T>, ReadWriteProperty<Any, T> { | ||
public override var value: T | ||
|
||
override fun getValue(thisRef: Any, property: KProperty<*>): T = value |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it already defined in SubscribableValue
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, but Kotlin compiler enforce to redeclare it, because it is inherited from two interfaces (SubscribableValue
and ReadWriteProperty
)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok
*/ | ||
public val value: T | ||
|
||
public val valueOrNull: T? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should support lateinit
?
Is Deferred<SubscribableValue>
a good replacement?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think yes, we should support it, because this is the actual current behavior of ConflatedBroadcastChannel
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, discard this point.
We should provide also
However this implementation should be lazy and GC friendly. |
Yes, I'll propose something. |
Definitely yes for the If a fun <T1, T2, R> SubscribableValue<T1>.combineWith(other: SubscribableValue<T2>, combine: (T1, T2) -> R): SusbcribableValue<R> Should I also provide an adapter for JavaFx: |
Here is a wild API suggestion. Is it looking like what you imagined @fvasco ? interface SubscribableValueBuilderScope<in T> : CoroutineScope {
suspend fun yield(value: T)
}
interface SubscribableValueJob<T> : SubscribableValue<T>, Job
fun <T> SubscribableVariable(initialValue: T): SubscribableVariable<T> = ConflatedBroadcastChannel(initialValue)
fun <T> buildSubscribableValue(
context: CoroutineContext = DefaultDispatcher,
builderAction: suspend SubscribableValueBuilderScope<T>.() -> Unit
): SubscribableValueJob<T> = TODO()
fun usageExample() {
val variable = SubscribableVariable<Int>(0)
variable.value = 42
val currentTime: SubscribableValueJob<Instant> = buildSubscribableValue {
while (isActive) {
delay(1, TimeUnit.SECONDS)
yield(Instant.now())
}
}
currentTime.cancel()
} |
Instead I am considering another kind of actor fun <E> SubscribableValue<E>.launchObserver(
context: CoroutineContext = DefaultDispatcher,
parent: Job? = null,
block: suspend ActorScope<E>.() -> Unit
): Job This requires explicit |
Actually more like
For my use case too. So I won't add About the /**
* Start a job that consume each elements of the channel and execute [onEach] of each of them.
*/
fun <E> ReceiveChannel<E>.launchConsumer(context: CoroutineContext = DefaultDispatcher, onEach: suspend (E) -> Unit): Job =
launch(context) { consumeEach { onEach(it) } } I feel the same Idea than behind your I am not particulary found of the And anyway, I do not need it personally. Because I use JavaFx, I would most of the time simply bind a JavaFx component to the result of And for other usages, it is not really an hassle to do: launch {
observableValue.openSubscription().consumeEach {
// do something usefull
}
} But this is purely personal opinion. I will add it, if you guys want it. |
Well, |
Ok, I added some comments and a test for What should we do for Let me know if I should correct or do something. I'll open new PRs for the |
Mark it for removal using a |
@@ -47,7 +47,7 @@ public interface BroadcastChannel<E> : SendChannel<E> { | |||
* The resulting channel shall be [closed][SubscriptionReceiveChannel.close] to unsubscribe from this | |||
* broadcast channel. | |||
*/ | |||
public fun openSubscription(): SubscriptionReceiveChannel<E> | |||
override fun openSubscription(): SubscriptionReceiveChannel<E> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Resume public
modifier.
is State<*> -> { | ||
if (state.value === UNDEFINED) throw IllegalStateException("No value") | ||
return state.value as E | ||
override var value: E |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Resume public
modifier.
@Suppress("UNCHECKED_CAST") | ||
public val valueOrNull: E? get() { | ||
override val valueOrNull: E? get() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Resume public
modifier.
* | ||
* @throws IllegalStateException If no value has been set yet | ||
*/ | ||
override fun getValue(thisRef: Any, property: KProperty<*>): T = value |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add public
modifier.
*/ | ||
public override var value: T | ||
|
||
override fun getValue(thisRef: Any, property: KProperty<*>): T = value |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add public
modifier.
* | ||
* @throws ClosedSendChannelException If the implementation has been terminated | ||
*/ | ||
override fun setValue(thisRef: Any, property: KProperty<*>, value: T) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add public
modifier.
* | ||
* Reading this property may throw the cause exception if the implementation has been terminated with an error. | ||
* | ||
* Use [valueOrNull] to get `null` when the value is not set or `openSubscription().consume { receive() }` to suspend until a value is available. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
openSubscription().first()
?
* | ||
* Use [valueOrNull] to get `null` when the value is not set or `openSubscription().consume { receive() }` to suspend until a value is available. | ||
* | ||
* @throws IllegalStateException When reading the property and no value has been set yet |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What happen if channel is closed?
Can you test some negative paths? |
Yes, good Idea, I'll add some for |
Ok, I added some test for negative paths (channel with no value set yet, channel closed, channel closed with a cause) |
Just wanted to say really great work to you guys.
|
Should we support better lazy initialization? public interface SubscribableValue<out T> : Subscribable<T>, ReadOnlyProperty<Any?, T> {
public fun isInitialized(): Boolean
public val value: T
...
}
public suspend fun <T> SubscribableValue<T>.awaitValue() =
when {
isInitialized() -> value
else -> openSubscription().first()
} |
@fvasco Maybe So if value can't be lazy, maybe enforce providing an initial value? I guess that making a |
@bohsen and @dave08: thank you. It's good to hear that it'll be helpful.
As it is already supported in the way that it is possible create a But how should it behave if the channel is closed? Should it fail? Or should it return false? |
The idea is to provide a better support for the lazy initialization adding (sorry for unglish :) |
Yes I got it. Sorry if I wasn't clear ;-) I added EDIT: Note, that I choose to make |
It might be acceptable. Tip: remember to change |
Yes good catch. I did it.
I'm not especially found of it, but returning |
And what if we rename |
Sorry @jcornaz, Can you consider this design instead? public interface SubscribableValue<out T> : Subscribable<T>, ReadOnlyProperty<Any?, T> {
public val value: T
public override fun getValue(thisRef: Any?, property: KProperty<*>): T = value
}
public interface SubscribableVariable<T> : SubscribableValue<T>, ReadWriteProperty<Any?, T> {
public override var value: T
public override fun getValue(thisRef: Any?, property: KProperty<*>): T = value
public override fun setValue(thisRef: Any?, property: KProperty<*>, value: T) {
this.value = value
}
}
fun <T> SubscribableValue(value: T) = SubscribableVariableImpl(value)
fun <T> SubscribableVariable(value: T) = SubscribableVariableImpl(value)
internal class SubscribableVariableImpl<T>(value: T) : SubscribableVariable<T> {
private val channel = ConflatedBroadcastChannel(value)
public override var value: T
get() = channel.value
set(value) {
channel.offer(value)
}
public override fun openSubscription() = channel.openSubscription()
} No errors, no checks, no corner cases. Take your time. |
I think you make a very good point.
That's appealing indeed... It would be simpler and safer. Sounds very good. In the other hand, I do need lazy initialization in practice. But it doesn't have to be in |
Here is the API I'd propose: (implementation code of public interface Subscribable<out T> {
fun openSubscription(): SubscribtionReceiveChannel<T>
}
public interface SubscribableValue<out T> : Subscribable<T>, ReadOnlyProperty<Any?, T> {
// Never throw, always succeed
public val value: T
}
public interface SubscribableVariable<T> : SubscribableValue<T>, ReadWriteProperty<Any?, T> {
// Never throw, always succeed
public override var value: T
}
public interface LateinitSubscribableValue<out T> : Subscribable<T>, ReadOnlyProperty<Any?, T> {
// never throw
public val isInitialized: Boolean
// throw only if no value set yet
public val value: T
// never throw
public val valueOrNull: T?
}
// Never throw. May suspend until a value is available
suspend fun <T> LateinitSubscribableValue<T>.awaitValue(): T = openSubscription().first()
public interface LateinitSubscribableVariable<T> : LateinitSubscribableValue<T>, ReadWriteProperty<Any?, T> {
// only the getter throw if no value set yet
public override var value: T
} Edit: I renamed |
It is possible to replace |
So I had this crazy idea toady. Instead of The key observation is that you try to write |
This is very interesting an sounds promising. However, But I still think it is a good Idea, and it is going in a very good direction. So here is an alternative proposition: What if we make Here is a quick draft of the idea: master...jcornaz:spike/consumable |
Iterators are inherently slow. My idea for |
Sounds like something we are planning to add in reactor-core as an extension. fromAsyncIterable/toAsyncIterable (in addition to fromCoroutine/toCoroutine). We might be able to use that new contract instead of creating a new AsyncIterable of our own. |
I wrote a little example using interface Consumable<E> {
fun consumeEach(consumer: suspend ConsumerScope<E>.(E) -> Unit): E?
}
interface ConsumerScope<E> {
suspend fun exit(exitValue: E? = null): Nothing
}
fun Consumable<Int>.sum(): Int {
var sum = 0
consumeEach { sum += it }
return sum
}
fun <E> Consumable<E>.first() = consumeEach { exit(it) } |
Just to clarify, should I update the PR to provide and use Or is this PR only waiting for design decision? |
We clearly need a prototype to make this decision. It is tightly bound to the decision on cold streams abstraction in #254, since it is basically the same "subscribable" abstraction that we are talking about here. It does not look that we have place for two different abstractions here. We already have too many of them. The proof of concept should demonstrate that all the interesting operators like |
* Add `Subscribable`, `SubscribableValue` and `SubscribableVariable` interfaces * Make `BroadcastChannel` implement `Subscribable` * Add a simple implementation of `SubscribableVariable`
Are you going to support Subscribable as dataflow concurrency in Oz ?https://www.info.ucl.ac.be/~pvr/GeneralOverview.pdf I like it , Data-Driven Concurrency is cool ! see also: Dataflow Concurrency |
@SolomonSun2010 data-flow concurrency is already possible with channels. And there is active design discussion about how to provide cold stream in #254. |
69dc390
to
eaf9b7c
Compare
bc68d63
to
3179683
Compare
When #254 is almost here, I think it is time to close this PR. During our design phase, we have a lot of discussions on whether There is also a natural desire to use channels as subjects and for this, we have |
No problem, all these consideration can be cherry picked when needed. |
Resolve #245
This PR add the following:
Subscribable
:Represent anything from which it is possible to open a subscription
SubscribableValue
:Represent a
Subscriblable
from which it is always possible to get a current value.SubscriblableVariable
:Mutable
SubscribableValue
allowing to set the current valueSubscribableValue(value)
andSubscribableVariable(intialValue)
:Which create instances for the corresponding interfaces.
This PR also make
BroadcastChannel
a sub interface ofSubscribable
.Decisions made:
ConflatedBroadacstChannel
do not implementSusbcribableVariable
. This makes the overall usage simpler and safer to use.map
andcombine
or adapter for JavaFXObservableValue
won't be part of this PR, but may be made and discussed in another issue/PR