11package tech.figure.kafka.coroutines.channels
22
3- import tech.figure.kafka.loggingConsumerRebalanceListener
4- import tech.figure.kafka.records.CommitConsumerRecord
5- import tech.figure.kafka.records.UnAckedConsumerRecordImpl
6- import tech.figure.kafka.records.UnAckedConsumerRecords
73import java.util.concurrent.atomic.AtomicInteger
84import kotlin.concurrent.thread
95import kotlin.time.Duration
10- import kotlin.time.ExperimentalTime
116import kotlin.time.toJavaDuration
127import kotlinx.coroutines.CancellationException
138import kotlinx.coroutines.ExperimentalCoroutinesApi
@@ -21,7 +16,73 @@ import kotlinx.coroutines.selects.SelectClause1
2116import mu.KotlinLogging
2217import org.apache.kafka.clients.consumer.Consumer
2318import org.apache.kafka.clients.consumer.ConsumerRebalanceListener
19+ import org.apache.kafka.clients.consumer.ConsumerRecord
20+ import org.apache.kafka.clients.consumer.ConsumerRecords
2421import org.apache.kafka.clients.consumer.KafkaConsumer
22+ import org.apache.kafka.clients.consumer.OffsetAndMetadata
23+ import org.apache.kafka.common.TopicPartition
24+ import tech.figure.kafka.loggingConsumerRebalanceListener
25+ import tech.figure.kafka.records.CommitConsumerRecord
26+ import tech.figure.kafka.records.UnAckedConsumerRecordImpl
27+ import tech.figure.kafka.records.UnAckedConsumerRecords
28+
29+ internal fun <K , V > List <ConsumerRecord <K , V >>.toConsumerRecords () =
30+ groupBy { TopicPartition (it.topic(), it.partition()) }.let (::ConsumerRecords )
31+
32+ /* *
33+ * Default is to create a committable consumer channel for unacknowledged record processing.
34+ *
35+ * @see [kafkaAckConsumerChannel]
36+ */
37+ fun <K , V > kafkaConsumerChannel (
38+ consumerProperties : Map <String , Any >,
39+ topics : Set <String >,
40+ name : String = "kafka-channel",
41+ pollInterval : Duration = DEFAULT_POLL_INTERVAL ,
42+ consumer : Consumer <K , V > = KafkaConsumer (consumerProperties),
43+ rebalanceListener : ConsumerRebalanceListener = loggingConsumerRebalanceListener(),
44+ init : Consumer <K , V >.() -> Unit = { subscribe(topics, rebalanceListener) },
45+ ): ReceiveChannel <UnAckedConsumerRecords <K , V >> = kafkaAckConsumerChannel(consumerProperties, topics, name, pollInterval, consumer, rebalanceListener, init )
46+
47+ /* *
48+ * Create a [ReceiveChannel] for [ConsumerRecords] from kafka.
49+ *
50+ * @param consumerProperties Kafka consumer settings for this channel.
51+ * @param topics Topics to subscribe to. Can be overridden via custom `init` parameter.
52+ * @param name The thread pool's base name for this consumer.
53+ * @param pollInterval Interval for kafka consumer [Consumer.poll] method calls.
54+ * @param consumer The instantiated [Consumer] to use to receive from kafka.
55+ * @param init Callback for initializing the [Consumer].
56+ * @return A non-running [KafkaConsumerChannel] instance that must be started via
57+ * [KafkaConsumerChannel.start].
58+
59+ */
60+ fun <K , V > kafkaNoAckConsumerChannel (
61+ consumerProperties : Map <String , Any >,
62+ topics : Set <String >,
63+ name : String = "kafka-channel",
64+ pollInterval : Duration = DEFAULT_POLL_INTERVAL ,
65+ consumer : Consumer <K , V > = KafkaConsumer (consumerProperties),
66+ rebalanceListener : ConsumerRebalanceListener = loggingConsumerRebalanceListener(),
67+ init : Consumer <K , V >.() -> Unit = { subscribe(topics, rebalanceListener) },
68+ ): ReceiveChannel <ConsumerRecords <K , V >> {
69+ return object :
70+ KafkaConsumerChannel <K , V , ConsumerRecords <K , V >>(
71+ consumerProperties,
72+ topics,
73+ name,
74+ pollInterval,
75+ consumer,
76+ init
77+ ) {
78+ override suspend fun preProcessPollSet (
79+ records : ConsumerRecords <K , V >,
80+ context : MutableMap <String , Any >
81+ ): List <ConsumerRecords <K , V >> {
82+ return listOf (records)
83+ }
84+ }
85+ }
2586
2687/* *
2788 * Create a [ReceiveChannel] for unacknowledged consumer records from kafka.
@@ -32,9 +93,10 @@ import org.apache.kafka.clients.consumer.KafkaConsumer
3293 * @param pollInterval Interval for kafka consumer [Consumer.poll] method calls.
3394 * @param consumer The instantiated [Consumer] to use to receive from kafka.
3495 * @param init Callback for initializing the [Consumer].
35- * @return A non-running [KafkaConsumerChannel] instance that must be started via [KafkaConsumerChannel.start].
96+ * @return A non-running [KafkaConsumerChannel] instance that must be started via
97+ * [KafkaConsumerChannel.start].
3698 */
37- fun <K , V > kafkaConsumerChannel (
99+ fun <K , V > kafkaAckConsumerChannel (
38100 consumerProperties : Map <String , Any >,
39101 topics : Set <String >,
40102 name : String = "kafka-channel",
@@ -43,20 +105,95 @@ fun <K, V> kafkaConsumerChannel(
43105 rebalanceListener : ConsumerRebalanceListener = loggingConsumerRebalanceListener(),
44106 init : Consumer <K , V >.() -> Unit = { subscribe(topics, rebalanceListener) },
45107): ReceiveChannel <UnAckedConsumerRecords <K , V >> {
46- return KafkaConsumerChannel (consumerProperties, topics, name, pollInterval, consumer, init ).also {
47- Runtime .getRuntime().addShutdownHook(
48- Thread {
49- it.cancel()
108+ return KafkaAckConsumerChannel (
109+ consumerProperties,
110+ topics,
111+ name,
112+ pollInterval,
113+ consumer,
114+ init
115+ ).also { Runtime .getRuntime().addShutdownHook(Thread { it.cancel() }) }
116+ }
117+
118+ /* *
119+ * Acking kafka [Consumer] object implementing the [ReceiveChannel] methods.
120+ *
121+ * Note: Must operate in a bound thread context regardless of coroutine assignment due to internal
122+ * kafka threading limitations for poll fetches, acknowledgements, and sends.
123+ *
124+ * @param consumerProperties Kafka consumer settings for this channel.
125+ * @param topics Topics to subscribe to. Can be overridden via custom `init` parameter.
126+ * @param name The thread pool's base name for this consumer.
127+ * @param pollInterval Interval for kafka consumer [Consumer.poll] method calls.
128+ * @param consumer The instantiated [Consumer] to use to receive from kafka.
129+ * @param init Callback for initializing the [Consumer].
130+ */
131+ internal class KafkaAckConsumerChannel <K , V >(
132+ consumerProperties : Map <String , Any >,
133+ topics : Set <String >,
134+ name : String ,
135+ pollInterval : Duration ,
136+ consumer : Consumer <K , V >,
137+ init : Consumer <K , V >.() -> Unit
138+ ) :
139+ KafkaConsumerChannel <K , V , UnAckedConsumerRecords <K , V >>(
140+ consumerProperties,
141+ topics,
142+ name,
143+ pollInterval,
144+ consumer,
145+ init
146+ ) {
147+ override suspend fun preProcessPollSet (
148+ records : ConsumerRecords <K , V >,
149+ context : MutableMap <String , Any >,
150+ ): List <UnAckedConsumerRecords <K , V >> {
151+ log.trace { " preProcessPollSet(${records.count()} )" }
152+ val ackChannel =
153+ Channel <CommitConsumerRecord >(capacity = records.count()).also {
154+ context[" ack-channel" ] = it
50155 }
51- )
156+ val unackedRecords =
157+ records
158+ .groupBy { " ${it.topic()} -${it.partition()} " }
159+ .map {
160+ val timestamp = System .currentTimeMillis()
161+ val records =
162+ it.value.map { UnAckedConsumerRecordImpl (it, ackChannel, timestamp) }
163+ UnAckedConsumerRecords (records)
164+ }
165+ return unackedRecords
166+ }
167+
168+ @Suppress(" unchecked_cast" )
169+ override suspend fun postProcessPollSet (
170+ records : List <UnAckedConsumerRecords <K , V >>,
171+ context : Map <String , Any >
172+ ) {
173+ log.trace { " postProcessPollSet(records:${records.sumOf { it.count() } } )" }
174+ val ackChannel = context[" ack-channel" ]!! as Channel <CommitConsumerRecord >
175+ for (rs in records) {
176+ if (rs.records.isNotEmpty()) {
177+ val count = AtomicInteger (rs.records.size)
178+ while (count.getAndDecrement() > 0 ) {
179+ log.trace { " waiting for ${count.get()} commits" }
180+ val it = ackChannel.receive()
181+ log.trace { " sending to broker ack(${it.duration.toMillis()} ms):${it.asCommitable()} " }
182+ commit(it)
183+ log.trace { " acking the commit back to flow" }
184+ it.commitAck.send(Unit )
185+ }
186+ }
187+ }
188+ ackChannel.close()
52189 }
53190}
54191
55192/* *
56- * Kafka [Consumer] object implementing the [ReceiveChannel] methods.
193+ * Base kafka [Consumer] object implementing the [ReceiveChannel] methods.
57194 *
58- * Note: Must operate in a bound thread context regardless of coroutine assignment due to internal kafka threading
59- * limitations for poll fetches, acknowledgements, and sends.
195+ * Note: Must operate in a bound thread context regardless of coroutine assignment due to internal
196+ * kafka threading limitations for poll fetches, acknowledgements, and sends.
60197 *
61198 * @param consumerProperties Kafka consumer settings for this channel.
62199 * @param topics Topics to subscribe to. Can be overridden via custom `init` parameter.
@@ -65,40 +202,46 @@ fun <K, V> kafkaConsumerChannel(
65202 * @param consumer The instantiated [Consumer] to use to receive from kafka.
66203 * @param init Callback for initializing the [Consumer].
67204 */
68- open class KafkaConsumerChannel <K , V >(
205+ abstract class KafkaConsumerChannel <K , V , R >(
69206 consumerProperties : Map <String , Any >,
70207 topics : Set <String > = emptySet(),
71208 name : String = " kafka-channel" ,
72209 private val pollInterval : Duration = DEFAULT_POLL_INTERVAL ,
73210 private val consumer : Consumer <K , V > = KafkaConsumer (consumerProperties),
74211 private val init : Consumer <K , V >.() -> Unit = { subscribe(topics) },
75- ) : ReceiveChannel<UnAckedConsumerRecords<K, V> > {
212+ ) : ReceiveChannel<R > {
76213 companion object {
77214 private val threadCounter = AtomicInteger (0 )
78215 }
79216
80- private val log = KotlinLogging .logger {}
217+ protected val log = KotlinLogging .logger {}
81218 private val thread =
82- thread(name = " $name -${threadCounter.getAndIncrement()} " , block = { run () }, isDaemon = true , start = false )
83- private val sendChannel = Channel <UnAckedConsumerRecords <K , V >>(Channel .UNLIMITED )
84-
85- private inline fun <T > Channel<T>.use (block : (Channel <T >) -> Unit ) {
86- try {
87- block(this )
88- close()
89- } catch (e: Throwable ) {
90- close(e)
91- }
92- }
219+ thread(
220+ name = " $name -${threadCounter.getAndIncrement()} " ,
221+ block = { run () },
222+ isDaemon = true ,
223+ start = false
224+ )
225+ val sendChannel = Channel <R >(Channel .UNLIMITED )
93226
94- @OptIn(ExperimentalTime ::class )
95- private fun <K , V > Consumer <K , V >.poll (duration : Duration ) =
96- poll(duration.toJavaDuration())
227+ private fun <K , V > Consumer <K , V >.poll (duration : Duration ) = poll(duration.toJavaDuration())
97228
98229 private fun <T , L : Iterable <T >> L.ifEmpty (block : () -> L ): L =
99230 if (count() == 0 ) block() else this
100231
101- @OptIn(ExperimentalCoroutinesApi ::class , ExperimentalTime ::class )
232+ protected abstract suspend fun preProcessPollSet (
233+ records : ConsumerRecords <K , V >,
234+ context : MutableMap <String , Any >
235+ ): List <R >
236+
237+ protected open suspend fun postProcessPollSet (records : List <R >, context : Map <String , Any >) {}
238+
239+ protected fun commit (record : CommitConsumerRecord ): OffsetAndMetadata {
240+ consumer.commitSync(record.asCommitable())
241+ return record.offsetAndMetadata
242+ }
243+
244+ @OptIn(ExperimentalCoroutinesApi ::class )
102245 fun run () {
103246 consumer.init ()
104247
@@ -108,32 +251,31 @@ open class KafkaConsumerChannel<K, V>(
108251 try {
109252 while (! sendChannel.isClosedForSend) {
110253 log.trace(" poll(topics:${consumer.subscription()} ) ..." )
111- val polled = consumer.poll(Duration .ZERO ).ifEmpty { consumer.poll(pollInterval) }
254+ val polled =
255+ consumer.poll(Duration .ZERO ).ifEmpty { consumer.poll(pollInterval) }
112256 val polledCount = polled.count()
113257 if (polledCount == 0 ) {
114258 continue
115259 }
116260
117261 log.trace(" poll(topics:${consumer.subscription()} ) got $polledCount records." )
118- Channel <CommitConsumerRecord >(capacity = polled.count()).use { ackChannel ->
119- for (it in polled.groupBy { " ${it.topic()} -${it.partition()} " }) {
120- val timestamp = System .currentTimeMillis()
121- val records = it.value.map {
122- UnAckedConsumerRecordImpl (it, ackChannel, timestamp)
123- }
124- sendChannel.send(UnAckedConsumerRecords (records))
125- }
126-
127- if (polledCount > 0 ) {
128- val count = AtomicInteger (polledCount)
129- while (count.getAndDecrement() > 0 ) {
130- val it = ackChannel.receive()
131- log.debug { " ack(${it.duration.toMillis()} ms):${it.asCommitable()} " }
132- consumer.commitSync(it.asCommitable())
133- it.commitAck.send(Unit )
134- }
135- }
136- }
262+
263+ // Group by topic-partition to guarantee ordering.
264+ val records =
265+ polled
266+ .groupBy { " ${it.topic()} -${it.partition()} " }
267+ .values
268+ .map { it.toConsumerRecords() }
269+
270+ // Convert to internal types.
271+ val context = mutableMapOf<String , Any >()
272+ val processSet = records.map { preProcessPollSet(it, context) }
273+
274+ // Send down the pipeline for processing
275+ processSet
276+ .onEach { it.map { sendChannel.send(it) } }
277+ // Clean up any processing.
278+ .map { postProcessPollSet(it, context) }
137279 }
138280 } finally {
139281 log.info(" ${coroutineContext.job} shutting down consumer thread" )
@@ -142,7 +284,9 @@ open class KafkaConsumerChannel<K, V>(
142284 consumer.unsubscribe()
143285 consumer.close()
144286 } catch (ex: Exception ) {
145- log.debug { " Consumer failed to be closed. It may have been closed from somewhere else." }
287+ log.debug {
288+ " Consumer failed to be closed. It may have been closed from somewhere else."
289+ }
146290 }
147291 }
148292 }
@@ -162,19 +306,23 @@ open class KafkaConsumerChannel<K, V>(
162306 @ExperimentalCoroutinesApi
163307 override val isClosedForReceive: Boolean = sendChannel.isClosedForReceive
164308
165- @ExperimentalCoroutinesApi
166- override val isEmpty : Boolean = sendChannel.isEmpty
167- override val onReceive : SelectClause1 < UnAckedConsumerRecords < K , V >> get() {
168- start()
169- return sendChannel.onReceive
170- }
309+ @ExperimentalCoroutinesApi override val isEmpty : Boolean = sendChannel.isEmpty
310+ override val onReceive : SelectClause1 < R >
311+ get() {
312+ start()
313+ return sendChannel.onReceive
314+ }
171315
172- override val onReceiveCatching: SelectClause1 <ChannelResult <UnAckedConsumerRecords <K , V >>> get() {
173- start()
174- return sendChannel.onReceiveCatching
175- }
316+ override val onReceiveCatching: SelectClause1 <ChannelResult <R >>
317+ get() {
318+ start()
319+ return sendChannel.onReceiveCatching
320+ }
176321
177- @Deprecated(" Since 1.2.0, binary compatibility with versions <= 1.1.x" , level = DeprecationLevel .HIDDEN )
322+ @Deprecated(
323+ " Since 1.2.0, binary compatibility with versions <= 1.1.x" ,
324+ level = DeprecationLevel .HIDDEN
325+ )
178326 override fun cancel (cause : Throwable ? ): Boolean {
179327 cancel(CancellationException (" cancel" , cause))
180328 return true
@@ -185,22 +333,22 @@ open class KafkaConsumerChannel<K, V>(
185333 sendChannel.cancel(cause)
186334 }
187335
188- override fun iterator (): ChannelIterator <UnAckedConsumerRecords < K , V > > {
336+ override fun iterator (): ChannelIterator <R > {
189337 start()
190338 return sendChannel.iterator()
191339 }
192340
193- override suspend fun receive (): UnAckedConsumerRecords < K , V > {
341+ override suspend fun receive (): R {
194342 start()
195343 return sendChannel.receive()
196344 }
197345
198- override suspend fun receiveCatching (): ChannelResult <UnAckedConsumerRecords < K , V > > {
346+ override suspend fun receiveCatching (): ChannelResult <R > {
199347 start()
200348 return sendChannel.receiveCatching()
201349 }
202350
203- override fun tryReceive (): ChannelResult <UnAckedConsumerRecords < K , V > > {
351+ override fun tryReceive (): ChannelResult <R > {
204352 start()
205353 return sendChannel.tryReceive()
206354 }
0 commit comments