Skip to content

Support natural batching in flow #902

Open
@mtopolnik

Description

@mtopolnik

Natural aka. smart batching is a technique in stream processing that optimizes throughput without affecting latency. On the example of a concurrent queue, the consumer has the ability to atomically drain all the items observed at some instant and then process them as a batch. Ideally, the queue should be bounded, giving an upper limit to the batch size and providing backpressure to the sender at the same time.

It's called "natural" batching because there's no imposed batch size: when the traffic is low, it will process each item as soon as it arrives. In that case you don't need any throughput optimizations by batching items together. When the traffic gets higher, the consumer will automatically start processing larger batches, amortizing the fixed latency of a single operation like a database INSERT.

I wrote this sample code that achieves the basic goal:

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*

const val batchLimit = 20

@ObsoleteCoroutinesApi
suspend inline fun <T: Any> ReceiveChannel<T>.consumeBatched(
        handleItems: (List<T>) -> Unit
) {
    val buf = mutableListOf<T>()
    while (true) {
        receiveOrNull()?.also { buf.add(it) } ?: break
        for (x in 2..batchLimit) {
            poll()?.also { buf.add(it) } ?: break
        }
        handleItems(buf)
        buf.clear()
    }
}

We can test it with this:

@ObsoleteCoroutinesApi
@ExperimentalCoroutinesApi
fun main() {
    val chan = generateMockTraffic()
    runBlocking {
        chan.consumeBatched { println("Received items: $it") }
    }
}

@ExperimentalCoroutinesApi
private fun generateMockTraffic(): ReceiveChannel<Int> {
    return GlobalScope.produce(capacity = batchLimit) {
        (1..100).forEach {
            send(it)
            if (it % 10 == 0) {
                delay(1)
            }
        }
    }
}

consumeBatched() polls the queue one item at a time and therefore must additionally impose a batch limit. It would be more optimal if written against a concurrent queue like the Agrona project's OneToOneConcurrentArrayQueue, which supports the drain operation.

Could support for natural batching be considered as a feature to add?


Comment by @qwwdfsad, taken from Stack Overflow:

It depends on the desired API surface. drain member is unlikely to be fit for channel semantics: it constraints implementation, it should somehow expose drain limit and it gives channel more "collection-like" API. E.g. how should drain behave with an unlimited channel? Is it possible to implement drain in an efficient manner (with pre-sized buffer, but avoiding OOMs and unlimited collections) once and use it with any channel implementation?

What could be improved is additional hints from the channel such as expected capacity and count of enqueued elements. They can have a relaxed semantics with default implementation and act like hints to drain extension with some reasonable configurable upper bounds.

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions