-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Coroutines in Okio. #531
Coroutines in Okio. #531
Conversation
This makes it possible for suspending functions to call Okio, and Okio might not block when called. Initially almost everything will block anyway because we haven’t written nonblocking backends (Sockets, Files, Pipes), and we haven’t written non-blocking transforms (compression, hashing). But it shouldn’t be too difficult to make those things non-blocking. Callers must opt-into the non-blocking operations with the Async suffix. We also have an internal Suspendable suffix when the caller is a coroutine but we aren’t sure whether the implementation is. This code is bad because it copy-pastes a bunch of behavior for the async variant. We could delegate from non-coroutine to coroutine, but then I think we have to allocate. Is there a better way? This doesn’t yet offer the full compliment of Async methods because I don’t think callers should use them. In particular, doing a suspending call to read each byte is likely to be extremely inefficient. It’s better to do coarse async calls like require or request and then use sync calls for the pieces. |
Sorry to ask here, but since it's probably the follow up to that PR :) Is there any plan in future to extend those to OkHTTP at all to make the synchronous calls like execute suspendable, or not? Other solution is to enqueue and suspendCancellableCoroutine but require a tons more of internal changes in code. Since proper migration to coroutines will be a lot of works for everyone, it would be nice to know what are the plans on Ohttp/Moshi/Okio so we can plan our new internals API for the consumers of those libraries. |
I’m hoping we can reach the point where you can have more in-flight HTTP calls than threads. I don’t yet know how this will work in practice. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is exciting! A few general comments:
Okio might not block when called
This contract concerns me a bit. If I’m trying to write some non-blocking coroutine code that calls out to Okio, I’ll have to wrap all my *Async()
calls in a withContext(IO) { … }
anyway since it’s perfectly valid for the function to block. That defeats the point of providing a suspend
api in the first place, doesn’t it? Additionally, at Kotlin Conf this year, Roman Elizarov recommended following the convention that functions marked as suspend
do not block (video at time). Not following that convention here is going to be surprising.
Instead, for the default implementations of the async methods that delegate to the blocking ones, we could just wrap those in withContext(IO) { … }
, which is likely what consuming code would be doing manually anyway without the suspend APIs. Then, when underlying streams natively support non-blocking IO, its basically just an optimization.
doing a suspending call to read each byte is likely to be extremely inefficient
For Sink
/Source
, where a read/write of each single byte will actually context switch, definitely. However, for the buffered versions of the interfaces, individual reads/writes will only block to read an entire chunk anyway. In the fast path, where buffered data is already available, none of the functions will suspend and the coroutines runtime is optimized for that case.
try { | ||
val toRead = takeAsync(byteCount) | ||
return super.readAsync(sink, toRead) | ||
} catch (e: InterruptedException) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this exception wrapped? Should this handle CancellationException
s too?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just a thing to be consistent with our blocking APIs, which would otherwise need to throw this checked exception. And yes, we should figure out cancelation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah right, it's a checked exception. I think coroutine cancellation should just work.
|
||
while (true) { | ||
val now = System.nanoTime() | ||
val byteCountOrWaitNanos: Long = synchronized(this) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another blocking call in a coroutine. The coroutine-friendly Mutex
doesn't provide a non-suspend API so unless we want to write our own Mutex that supports both, I think we either have to do this or convert all synchronization to use coroutines.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see two kinds of synchronized
; when you would wait for blocking code and when you would wait for non-blocking code. This is the latter, and could be replaced with compareAndSet() in a loop. In practice that means it'll rarely stall here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
... and we have a project planned where we replace lots of synchronized
blocks with compareAndSet()
loops. The big drawback is that the latter requires an allocation on every use so we replace a risk of blocking with a certainty of allocating.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea, I'm not too worried. Even if this does block it will be for such a short period of time it's probably fine.
The only potential issue with automatically wrapping calls in
I'm not sure what the story looks like for Kotlin/Native, but I'm sure we can solve that problem when we get there. |
Really good advice. I'm gonna update this PR to do |
IO is not always good, 64 thread is a lot and for network / DB access some kind of limits are often wanted. I have hope that Kotlin/kotlinx.coroutines#261 comes so allow such controls, but until then I'll use my dispatcher (and thread pool) for IO with DB and remote hosts I talk to. Allowing control of load. If you force switching to IO then you force costly context switching without bringing anything for the consumer. |
ef9d762
to
afae35b
Compare
@@ -51,10 +53,24 @@ interface Sink : Closeable, Flushable { | |||
@Throws(IOException::class) | |||
fun write(source: Buffer, byteCount: Long) | |||
|
|||
/** Non-blocking variant of [write]. Uses the IO dispatcher if blocking is necessary. */ | |||
@Throws(IOException::class) | |||
@JvmDefault |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Egorand I got this working so I don’t need separate interfaces. Yay!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And hoorah for D8 for making default methods available on all Androids!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Niice!
try { | ||
val toRead = takeAsync(byteCount) | ||
return super.readAsync(sink, toRead) | ||
} catch (e: InterruptedException) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just a thing to be consistent with our blocking APIs, which would otherwise need to throw this checked exception. And yes, we should figure out cancelation.
@Tolriq The current
As for limiting concurrency, how are you doing that without coroutines? Why won't that strategy work with them? |
We could also provide an API like E.g.: fun Sink.withAsyncContext(context: CoroutineContext): Sink = object : Sink by this {
suspend fun writeAsync(source: Buffer, byteCount: Long) = withContext(context) {
write(source, byteCount)
}
suspend fun flushAsync() = withContext(context) {
flush()
}
suspend fun closeAsync() = withContext(context) {
close()
}
} And similar for |
I’m not thinking in coroutines yet, but... I expect our async strategy won’t be to configure contexts on streams. Instead we attempt to get as much |
Well a real life example is that I need to talk to a bugged server that only support 4 blocking threads. Some of the blocking commands can take up to 1 minute like sync, and I also always need to have at least one thread free for commands that should always run instantly. The normal solution is to use a dedicated ThreadPool limited to max 3 with .asDispatcher and use that as the coroutine context for all the potentially blocking commands. Now in a coroutine world, suspend function are non blocking but run in the coroutine context. So with that coroutine context I could totally directly do things like
Since the function run on the coroutine context, it's limited to the 3 threads and in theory there's no issue. Now if internally the function switch to IO, then despite all my configuration being OK, I can now have up to 64 active connection to the server and it's not good.
So IMO this is an issue specially for all the people that would not expect the switch and will have an hard time understanding. @zach-klippenstein I gave the explanations about threads in original post ;) |
@Tolriq Limiting the number of threads your dispatcher uses is only one way to limit concurrency. Another way to achieve that is to use a pool of coroutines, instead of threads, eg the worker pool pattern. This has the added advantage of sharing threads with the existing standard thread pools as I mentioned above. It might be a bit more verbose, but given how important this behavior is to your system, I would think erring on the explicit side could be a good thing. I think, more generally, in the ideal future, all libraries, apps, etc would only use the standard dispatcher threadpools for almost all of their work, because it reduces the total number of distinct threads in the system and reduces thread context switching even when two bits of code that don't know anything about the others' internals are communicating. The Kotlin team has already said they plan to ship an implementation of the worker pool pattern eventually, but it's simple enough that it's already feasible to implement one-off when you need it. |
@swankjesse Yea that I'm also wondering if there's some utility in keeping the |
I've since talked to Roman Elizarov about that and he also suggest worker pool to handle both blocking and non blocking, but the talk he directed me to is really unclear about how to do that at an higher level. Channels are still experimental and Actor should be very self contained. I have currently no idea about how to apply this pattern globally to limit any of X functions to at least 3 at the same time. Do you have any more details about how it could currently be applied in that way? Typically my app can talk to many different systems. |
@Tolriq This is getting off-topic for this PR, but I think one solution could be fairly straightforward. Since thread pools are pervasive in your architecture, I'm assuming you're using |
One more argument for keeping the dispatcher private is that the use of any dispatcher at all is an implementation detail of the default implementation of the asynchronous methods. A particular Providing any API to set the dispatcher used for the default implementation leaks the implementation and gives consumers a false sense of control when there's no guarantee that dispatcher will be honored. |
@zach-klippenstein Yes I'm there and I'll open a thread there to continue when I'm more advanced, but just to finish threadpool where the only way I found to handle that and coroutines seemed a way out to have a way better internals. Just want to be sure that the fact that some libraries move to withContext(IO) does not break any pattern I decide to go with. This is a larger discussion about coroutines and blocking/non blocking network and I'm pretty sure the decisions you guys make for okio/okhttp will serve as root for many others due to how respected you are. (That's why I'm asking here too by giving some special needs that where maybe not seen during design process). Edit: And your solution does just remove the threadpools but not the callbacks so still keeping the same messing internals :( |
@swankjesse Here's one idea: #532 |
@Tolriq The more edge cases the better! 🙂 That solution is definitely not ideal, but I don't know your architecture. Again, happy to dig into this more on slack. I think the contract of okio's async methods should be simply that they won't block the current thread. It shouldn't make any guarantees about dispatchers or threads, because there are likely to be implementations that don't use dispatchers (or even threads, in the case of JavaScript) behind the scenes anyway. |
But isn't using a dispatcher in Okio already an hack to try to have a blocking function non blocking? It's just a convenience for consumers, but if a method can't be made really non blocking why not let the consumer handle that as we already do? To me this is the real question. |
It's only a hack inasmuch as any code offloading blocking work to a dedicated thread pool is a hack. I think the whole premise of this PR is that it is useful for the standard Okio interfaces to have suspend support, so they can be composed without blocking, wrap callback-based APIs, etc. In order for that to work though, every implementation needs to handle both cases. Nothing's stopping you from still calling the blocking methods if you really want to. |
How about making dispatcher a function parameter and defaulting it to @JvmOverloads suspend fun writeAsync(
source: Buffer,
byteCount: Long,
dispatcher: CoroutineDispatcher = Dispatchers.IO
) = withContext(dispatcher) {
write(source, byteCount)
} |
The difference is that every library that use dedicated pool like OkHTTP or Glide do give control over that thread pool. Here's we are talking about library using a default pool of 64 threads. (64MB of memory if wrongly used, ...) Since English is not my native language, I'd just want to add that I'm not arguing for my cause as in all cases users will adapt to the decisions. I just want to be sure that such important decisions are fully thought. (And actually this discussion also helps thinking about how to architecture apps in general, since we are at the beginning of coroutines). You are shaping the future here ;) Questions goes with cancelling too, how blocking function called on IO thread can be cancelled? Cancellation is cooperative, so for socket based, usually you use suspdencancelablecoroutines and close the socket at cancel moment. Edit: Link to slack about the worker pool https://kotlinlang.slack.com/archives/C1CFAFJSK/p1541415543349000 |
I understand the gravity of getting the API right here, and I definitely appreciate this discussion!
That's a great question, definitely needs some more exploration I think. Also, the
Again, I think it really comes down to the fact that the API can't, and shouldn't, make any guarantees about how threading is (or isn't) handled for the async functions, if for no other reason than what I've already stated above: streams might be backed with IO primitives that are callback-based and don't give you any control over threads themselves (asynchronous NIO sockets, everything in javascript). Also, neither Kotlin/Native nor KotlinJS have support for threads at all. I think as coroutines become more pervasive, we'll stop thinking in terms of "thread pools" because it doesn't make sense to keep reaching all the way down to such a low-level concept. Thread pools make sense when you're working with threads. Other patterns, such as worker pools, make sense for coroutines. It makes sense for higher-level libraries like OkHTTP to provide configurable thread pools for blocking calls, because that's the only way to limit concurrency when you're working with blocking calls and threads. However, for coroutine-based async calls, thread pools are the wrong tool. I'm not sure exactly what it would look like yet, but OkHTTP could, for example, give you control over some worker pool implementation. |
The thing is that from my understanding by using dispatchers to simulate async calls from blocking one you are introducing that tool and could generate issue. That's where I'm still blocked in this discussion and want to be sure, I have problems understanding why using non configurable dispatcher to simulate async calls is a good solution to the problem. I see that a same public API will have different implementation for JS or Android or others, but if implementation can have issues on some OS like no control or problems to cancel then the implementation start to be something that is important enough to impact the API definition. |
Just chatted with @swankjesse offline about timeouts and cancellation, here's what we came up with:
In particular, that second point ensures that Kotlin's try {
val sendMoneyResponse = withTimeout(1000) { paymentService.sendMoney(sendMoneyRequest) }
} catch (e: TimeoutCancellationException) {
// network call timed out, socket will have been closed.
} Timeouts will already clean up after themselves. Operations are wrapped in something that looks like this, which means that even if timeout.enter()
try {
doWork()
} finally {
timeout.exit()
} |
Sounds nice. Only question would be if there's things in Okio that could be needed to be cancelled without socket close or that socket being closed in that case could trigger an issue? Simple example that I suppose is handled already but using any of the closable api with .use {} would trigger a close after a close when the call is cancelled. I guess double close are ignored and all is OK everywhere just thinking out loud is there's no other cases. My Okio usage is relatively basic. |
@Egorand Dispatcher is an implementation detail of the default implementation that shoves blocking calls onto a different dispatcher, so exposing it is leaking implementation specifics. Async calls implemented by callback APIs (e.g. NIO's async sockets, Javascript) would ignore that parameter anyway, in which case it would not only be irrelevant but would probably be confusing and misleading. |
Good to know @zach-klippenstein. I was thinking that dispatchers are similar to RxJava's |
@Egorand They're very close, so it's a useful mental model, in some cases. However, remember that coroutines are essentially just callbacks. So when you pass a callback to a method, it might get The nice thing about coroutine dispatchers and contexts is that, if I care about what context my coroutine runs in, I specify that explicitly, and then if code I'm calling out to jumps onto other threads, sends continuations through 3rd-party libraries, or whatever, when my code gets around to being executed again, I am guaranteed it will be running in the context I initially asked for. So unless you're doing something very strange, it shouldn't matter how a suspend function does asynchronous work. |
I still don't agree ;) On limited ressources devices those implementation details can have impacts. Like you decide to go in a special thread pool with priority real time (Ok I push the concept a little, but since you switch context you can switch to actually anything it's just implementation details from your saying) this would be an issue for any consumer. Same on slack #coroutine a guy did not understand why moving from asynctask to coroutines did impact it's UI thread while all was in background thread as before. (Due to difference in number of threads and thread priorities). I know I repeat myself a lot and I'm sorry about that, but a lot of people do not dig in implementation details and have hard times understanding what happens when problems arise that are often hard to diagnose and tests. To me there's blocking calls and there's non blocking calls. Those should not switch context without user knowing. Then there's conveniences function that can make blocking calls non blocking via the usage of contexts and dispatchers, those should be clearly identified about that fact and what dispatcher will be used. I still do not buy that the fact that on some cases there's no context switch due to the function being non blocking, magically makes the implementation details for the cases where it is blocking a non problem. I'm probably wrong as you are all more experienced, but so far I still can't see a proper explanation that can make someone like me with threading experience believe that threading is a non issue in that special case. I really hate to not be better at English to be sure the message is clear but I hope it can be resumed as: |
This is a work in progress.
afae35b
to
321ad04
Compare
channel.write(source, byteCount) | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NIO!
val instance: SelectRunner by lazy { | ||
val result = SelectRunner() | ||
result.start() | ||
result |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit:
result | |
SelectRunner().apply { start() } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd written that and then used undo ’cause I’d line wrapped it and it seemed fancier than necessary.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this the best way to have a worker thread standing by? I was wondering if there's a more coroutines-idiomatic way to kick this off.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You could turn the thread into a coroutine on the IO dispatcher. Saves you from spinning up another thread, especially when there aren't any pending operations – if the queue is empty, you could suspend the coroutine instead of blocking the thread.
|
||
// If the channel has been closed, fail the task. | ||
if (!key.isValid) { | ||
continuation.resumeWithException(IOException("canceled")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
key.isValid
will also be false if the key has been canceled, right? I know you haven't implemented it yet but you have a todo above – if the key was canceled due to coroutine cancelation, the coroutine will already be in the canceled state and this resumeWithException
will just report the IOException
to the uncaught exception handler. That's probably not what we want in this case, instead we should just not create the IOException
at all and return true immediately.
If the key is invalid for some other reason (e.g. channel was closed, as you've commented), then we should still resume with exception.
There's at least two approaches to this:
- Rely on the continuation itself as the source-of-truth for whether the key is invalid due to coroutine cancelation or not, or
- Keep track explicitly ourselves.
The naïve way to implement 1 be just checking the active flag:
if (!key.isValid) {
if (continuation.isActive) continuation.resumeWithException(IOException("canceled"))
return true
}
But that creates a race where the IOException
might get reported if the coroutine was canceled after the channel was closed. That might be ok since the key wasn't canceled due to coroutine cancelation, so technically we should still throw the exception. Another solution would be to use the atomic resume API:
if (!key.isValid) {
continuation.tryResumeWithException(IOException("canceled"))?.let(continuation::completeResume)
return true
}
But that API is marked internal so that's just asking for trouble.
The safest solution is to do something like add an atomic boolean to Task
that we can set to false in invokeOnCancellation
before we cancel the key, and then check here instead of the isActive
flag.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep. We'll need to handle both coroutine cancellation and socket close.
|
||
suspend fun <T> SelectableChannel.selectAsync(ops: Int, block: () -> T): T { | ||
val selectRunner = SelectRunner.instance | ||
// TODO: cancel the key when the coroutine is canceled? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did you have any ideas for how to implement this? Task
could probably vend a CompletionHandler
that we could directly pass to invokeOnCancellation
.
I don't know the NIO select apis at all, but according to the docs canceling a key can potentially block, so we probably shouldn't cancel the key direction from that handler (CompletionHandler
s are invoked synchronously). Maybe we could enqueue a special "cancelation task" that would actually cancel the key in the future? I think it's fine if the key/channel doesn't get canceled immediately as long as it doesn't try to do any wasted IO in the future (which we could ensure using our own flag as discussed above).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No cancelation task required, if we just set the flag on the existing task then wake up the selector the task can just cancel itself.
// The channel is ready! Call the block. | ||
if (key.readyOps() != 0) { | ||
try { | ||
val result = block() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we might also need to double-check for coroutine cancellation here, depending on how we implement that.
Not sure if these are commonly accepted definitions, but Asynchronous: Doesn't block the current thread, but might block some other thread. If these definitions make sense, would it make sense to have the methods named |
Does it make sense to break off the async capabilities into their own async classes like Since this is specifically for coroutines, maybe offer an artifact with those classes? That being said, it might also make sense to look into how ktor implements io with coroutines. |
Abandoning this for now. Loom is a nicer solution. |
@swankjesse |
My understanding is that it's "nicer" because it's a drop-in solution – you don't have to rewrite all your functions as |
Yes! In particular, the compiled bytecode doesn’t need to be dramatically larger and slower to support being suspended. Instead, the VM does it internally using the existing datastructures it already keeps. |
This is a work in progress.