Closed
Description
Use case
Let's say you have a list and you want to perform some function asynchronously on and return a list from that. It would be convenient to have a function that lets you map things asynchronously without having to right out all of the logic for creating a list of Deferred<T>
.
You may also want to do async calls in order and then get the result of a transformed list.
// inside a coroutine
val list: List<Int> = (0..5).toList()
val transformedMap = list.deferredMap(Dispatchers.IO) {
delay(100L - it * 10L)
println(it)
}
transformMap.await()
/* output
5
4
3
2
1
0
*/
The Shape of the API
/**
* Map a list of items to a list of deferred items. The deferred items are then awaited and the resulting list is returned.
* @param coroutineScope The coroutine scope to use for the async operations
* @param chunkSize the number of coroutines to run at the same time asynchronously. The iterable is taken and split into chunks of this amount
* @param throwException if true, when any exception happens in any transform, the exception will be thrown
* @param stopChunkOnException if true, when any exception happens in any transform, the remaining chunks will not be processed
* @param transform the function to transform each item
* @return the list of deferred transformed items
* @throws Exception if any exception happens in any transform and throwException is true
*/
suspend fun <I, T> Iterable<I>.deferredMap(
coroutineScope: CoroutineScope,
chunkSize: Int? = null,
throwException: Boolean = false,
stopChunkOnException: Boolean = false,
noinline transform: suspend CoroutineScope.(I) -> T,
): Deferred<List<T>>
/**
* Map a list of items to a list of transformed items using coroutines in order
* @param coroutineScope the scope in which to run the coroutines
* @param throwException if true, when any exception happens in any transform, the exception will be thrown
* @param stopOnException if true, when any exception happens in any transform, the rest of the remaining transforms will not be executed
* @param transform the function to transform each item
* @return the list of transformed items
*/
suspend inline fun <I, T> Iterable<I>.toAsyncInOrderMap(
coroutineScope: CoroutineScope,
throwException: Boolean = false,
stopOnException: Boolean = false,
crossinline transform: suspend CoroutineScope.(I) -> T,
): Deferred<List<T>>
Possible Implementation
I've taken a crack at this before so I thought it could be helpful. Here is a possible implementation. Obviously, there might be things I may be missing so I would appreciate the feedback or let me know if this even makes sense to have!
suspend inline fun <I, T> Iterable<I>.deferredMap(
coroutineScope: CoroutineScope,
chunkSize: Int? = null,
throwException: Boolean = false,
stopChunkOnException: Boolean = false,
noinline transform: suspend CoroutineScope.(I) -> T,
): Deferred<List<T>> {
val listTransformer: (Iterable<I>) -> List<Deferred<(() -> T)?>> =
if (throwException) {
{ list ->
val items: List<Deferred<() -> T>> =
list.map {
coroutineScope.async {
val transformed = transform(it)
val wrappedTransformed: () -> T = { transformed }
wrappedTransformed
}
}
items
}
} else {
{ list ->
val items: List<Deferred<(() -> T)?>> =
list.map {
coroutineScope.async {
try {
val transformed = transform(it)
val wrappedTransformed: () -> T = { transformed }
wrappedTransformed
} catch (e: Exception) {
null
}
}
}
items
}
}
return this.toAsyncMap(
coroutineScope = coroutineScope,
chunkSize = chunkSize,
catchChunkedException = false,
stopChunkOnException = stopChunkOnException,
wrappedTransformer = listTransformer,
)
}
inline fun <I, T> Iterable<I>.toAsyncMap(
coroutineScope: CoroutineScope,
chunkSize: Int?,
catchChunkedException: Boolean,
stopChunkOnException: Boolean,
crossinline wrappedTransformer: suspend (Iterable<I>) -> List<Deferred<(() -> T)?>>,
): Deferred<List<T>> = if (chunkSize == null) {
coroutineScope.async {
wrappedTransformer(this@toAsyncMap).awaitAll().mapNotNull { it }.map { it() }
}
} else {
coroutineScope.async {
val transformedList = mutableListOf<(() -> T)>()
this@toAsyncMap.chunked(chunkSize)
.toAsyncInOrderMap(
coroutineScope = this,
throwException = catchChunkedException,
stopOnException = stopChunkOnException,
) { portion: List<I> ->
val transformedPortion: List<(() -> T)> = wrappedTransformer(portion).awaitAll().mapNotNull { it }
transformedList.addAll(transformedPortion)
if (portion.size > transformedPortion.size) {
throw IllegalStateException("failed to transform all items in list")
}
transformedPortion
}
transformedList.map { it() }
}
}
suspend inline fun <I, T> Iterable<I>.toAsyncInOrderMap(
coroutineScope: CoroutineScope,
throwException: Boolean = false,
stopOnException: Boolean = false,
crossinline transform: suspend CoroutineScope.(I) -> T,
): Deferred<List<T>> {
return coroutineScope.async {
val destination = ArrayList<T>()
if (throwException) {
for (item in this@toAsyncInOrderMap) {
destination += coroutineScope.transform(item)
}
} else {
for (item in this@toAsyncInOrderMap) {
try {
destination += coroutineScope.transform(item)
} catch (e: Exception) {
if (stopOnException) return@async destination
}
}
}
destination
}
}