- 
                Notifications
    You must be signed in to change notification settings 
- Fork 1.9k
Open
Description
It makes sense that Flow and all map operations are sequential by default, but it makes sense to allow some heavy operations to be performed in parallel on explicitly provided dispatcher.
I've implemented the feature based on existing flowOn implementation:
@FlowPreview
fun <T, R> Flow<T>.mapParallel(scope: CoroutineScope, bufferSize: Int = 16, transform: suspend (T) -> R) =
    flow {
        val currentContext = coroutineContext.minusKey(Job) // Jobs are ignored
        coroutineScope {
            val channel = produce(currentContext, capacity = bufferSize) {
                collect { value ->
                    send(scope.async { transform(value) })
                }
            }
            (channel as Job).invokeOnCompletion { if (it is CancellationException && it.cause == null) cancel() }
            for (element in channel) {
                emit(element.await())
            }
//
//            val producer = channel as Job
//            if (producer.isCancelled) {
//                producer.join()
//                throw producer.getCancellationException()
//            }
        }
    }It seems to be working fine, but I can't use internal kotlinx,coroutines methods. I think that this use-case require a separate function in the library.
imanushin, dave08, driver733, borowis, fluidsonic and 41 more