Skip to content

Mapping a list asynchronously #4221

Closed
Closed
@singhmanu

Description

Use case

Let's say you have a list and you want to perform some function asynchronously on and return a list from that. It would be convenient to have a function that lets you map things asynchronously without having to right out all of the logic for creating a list of Deferred<T>.

You may also want to do async calls in order and then get the result of a transformed list.

// inside a coroutine
val list: List<Int> = (0..5).toList()
val transformedMap = list.deferredMap(Dispatchers.IO) {
  delay(100L - it * 10L)
  println(it)
}
transformMap.await()
/* output
5
4
3
2
1
0
*/

The Shape of the API

/**
 * Map a list of items to a list of deferred items. The deferred items are then awaited and the resulting list is returned.
 * @param coroutineScope The coroutine scope to use for the async operations
 * @param chunkSize the number of coroutines to run at the same time asynchronously. The iterable is taken and split into chunks of this amount
 * @param throwException if true, when any exception happens in any transform, the exception will be thrown
 * @param stopChunkOnException if true, when any exception happens in any transform, the remaining chunks will not be processed
 * @param transform the function to transform each item
 * @return the list of deferred transformed items
 * @throws Exception if any exception happens in any transform and throwException is true
 */
suspend fun <I, T> Iterable<I>.deferredMap(
    coroutineScope: CoroutineScope,
    chunkSize: Int? = null,
    throwException: Boolean = false,
    stopChunkOnException: Boolean = false,
    noinline transform: suspend CoroutineScope.(I) -> T,
): Deferred<List<T>>

/**
 * Map a list of items to a list of transformed items using coroutines in order
 * @param coroutineScope the scope in which to run the coroutines
 * @param throwException if true, when any exception happens in any transform, the exception will be thrown
 * @param stopOnException if true, when any exception happens in any transform, the rest of the remaining transforms will not be executed
 * @param transform the function to transform each item
 * @return the list of transformed items
 */
suspend inline fun <I, T> Iterable<I>.toAsyncInOrderMap(
    coroutineScope: CoroutineScope,
    throwException: Boolean = false,
    stopOnException: Boolean = false,
    crossinline transform: suspend CoroutineScope.(I) -> T,
): Deferred<List<T>>

Possible Implementation

I've taken a crack at this before so I thought it could be helpful. Here is a possible implementation. Obviously, there might be things I may be missing so I would appreciate the feedback or let me know if this even makes sense to have!

suspend inline fun <I, T> Iterable<I>.deferredMap(
    coroutineScope: CoroutineScope,
    chunkSize: Int? = null,
    throwException: Boolean = false,
    stopChunkOnException: Boolean = false,
    noinline transform: suspend CoroutineScope.(I) -> T,
): Deferred<List<T>> {
    val listTransformer: (Iterable<I>) -> List<Deferred<(() -> T)?>> =
        if (throwException) {
            { list ->
                val items: List<Deferred<() -> T>> =
                    list.map {
                        coroutineScope.async {
                            val transformed = transform(it)
                            val wrappedTransformed: () -> T = { transformed }
                            wrappedTransformed
                        }
                    }
                items
            }
        } else {
            { list ->
                val items: List<Deferred<(() -> T)?>> =
                    list.map {
                        coroutineScope.async {
                            try {
                                val transformed = transform(it)
                                val wrappedTransformed: () -> T = { transformed }
                                wrappedTransformed
                            } catch (e: Exception) {
                                null
                            }
                        }
                    }
                items
            }
        }
    return this.toAsyncMap(
        coroutineScope = coroutineScope,
        chunkSize = chunkSize,
        catchChunkedException = false,
        stopChunkOnException = stopChunkOnException,
        wrappedTransformer = listTransformer,
    )
}

inline fun <I, T> Iterable<I>.toAsyncMap(
    coroutineScope: CoroutineScope,
    chunkSize: Int?,
    catchChunkedException: Boolean,
    stopChunkOnException: Boolean,
    crossinline wrappedTransformer: suspend (Iterable<I>) -> List<Deferred<(() -> T)?>>,
): Deferred<List<T>> = if (chunkSize == null) {
    coroutineScope.async {
        wrappedTransformer(this@toAsyncMap).awaitAll().mapNotNull { it }.map { it() }
    }
} else {
    coroutineScope.async {
        val transformedList = mutableListOf<(() -> T)>()
        this@toAsyncMap.chunked(chunkSize)
            .toAsyncInOrderMap(
                coroutineScope = this,
                throwException = catchChunkedException,
                stopOnException = stopChunkOnException,
            ) { portion: List<I> ->
                val transformedPortion: List<(() -> T)> = wrappedTransformer(portion).awaitAll().mapNotNull { it }
                transformedList.addAll(transformedPortion)
                if (portion.size > transformedPortion.size) {
                    throw IllegalStateException("failed to transform all items in list")
                }
                transformedPortion
            }
        transformedList.map { it() }
    }
}

suspend inline fun <I, T> Iterable<I>.toAsyncInOrderMap(
    coroutineScope: CoroutineScope,
    throwException: Boolean = false,
    stopOnException: Boolean = false,
    crossinline transform: suspend CoroutineScope.(I) -> T,
): Deferred<List<T>> {
    return coroutineScope.async {
        val destination = ArrayList<T>()
        if (throwException) {
            for (item in this@toAsyncInOrderMap) {
                destination += coroutineScope.transform(item)
            }
        } else {
            for (item in this@toAsyncInOrderMap) {
                try {
                    destination += coroutineScope.transform(item)
                } catch (e: Exception) {
                    if (stopOnException) return@async destination
                }
            }
        }
        destination
    }
}

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions