@@ -15,6 +15,7 @@ import kotlinx.coroutines.runBlocking
1515import kotlinx.coroutines.selects.SelectClause1
1616import mu.KotlinLogging
1717import org.apache.kafka.clients.consumer.Consumer
18+ import org.apache.kafka.clients.consumer.ConsumerConfig
1819import org.apache.kafka.clients.consumer.ConsumerRebalanceListener
1920import org.apache.kafka.clients.consumer.ConsumerRecord
2021import org.apache.kafka.clients.consumer.ConsumerRecords
@@ -29,6 +30,21 @@ import tech.figure.kafka.records.UnAckedConsumerRecords
2930internal fun <K , V > List <ConsumerRecord <K , V >>.toConsumerRecords () =
3031 groupBy { TopicPartition (it.topic(), it.partition()) }.let (::ConsumerRecords )
3132
33+ private const val DEFAULT_MAX_POLL_RECORDS = 500
34+ private const val DEFAULT_BUFFER_FACTOR = 3
35+
36+ private val Map <String , Any >.maxPollBufferCapacity get(): Int {
37+ // consumerConfig defined max poll records.
38+ val fromConfig = this [ConsumerConfig .MAX_POLL_RECORDS_CONFIG ] as ? Int
39+ // fallback to kafka default.
40+ val fromKafkaDefault = ConsumerConfig .configDef().defaultValues()[ConsumerConfig .MAX_POLL_RECORDS_CONFIG ] as ? Int
41+ // when all else fails...
42+ val failsafe = DEFAULT_MAX_POLL_RECORDS
43+
44+ val maxPollRecords = fromConfig ? : fromKafkaDefault ? : failsafe
45+ return maxPollRecords * DEFAULT_BUFFER_FACTOR
46+ }
47+
3248/* *
3349 * Default is to create a committable consumer channel for unacknowledged record processing.
3450 *
@@ -38,11 +54,20 @@ fun <K, V> kafkaConsumerChannel(
3854 consumerProperties : Map <String , Any >,
3955 topics : Set <String >,
4056 name : String = "kafka-channel",
57+ bufferCapacity : Int = consumerProperties.maxPollBufferCapacity,
4158 pollInterval : Duration = DEFAULT_POLL_INTERVAL ,
4259 consumer : Consumer <K , V > = KafkaConsumer (consumerProperties),
4360 rebalanceListener : ConsumerRebalanceListener = loggingConsumerRebalanceListener(),
4461 init : Consumer <K , V >.() -> Unit = { subscribe(topics, rebalanceListener) },
45- ): ReceiveChannel <UnAckedConsumerRecords <K , V >> = kafkaAckConsumerChannel(consumerProperties, topics, name, pollInterval, consumer, rebalanceListener, init )
62+ ): ReceiveChannel <UnAckedConsumerRecords <K , V >> = kafkaAckConsumerChannel(consumerProperties, topics, name, bufferCapacity, pollInterval, consumer, rebalanceListener, init )
63+
64+ private fun <K , V > noAckConsumerInit (topics : Set <String >, seekTopicPartitions : Consumer <K , V >.(List <TopicPartition >) -> Unit ): (Consumer <K , V >) -> Unit = { consumer ->
65+ val tps = topics.flatMap {
66+ consumer.partitionsFor(it).map { TopicPartition (it.topic(), it.partition()) }
67+ }
68+ consumer.assign(tps)
69+ consumer.seekTopicPartitions(tps)
70+ }
4671
4772/* *
4873 * Create a [ReceiveChannel] for [ConsumerRecords] from kafka.
@@ -55,25 +80,25 @@ fun <K, V> kafkaConsumerChannel(
5580 * @param init Callback for initializing the [Consumer].
5681 * @return A non-running [KafkaConsumerChannel] instance that must be started via
5782 * [KafkaConsumerChannel.start].
58-
5983 */
6084fun <K , V > kafkaNoAckConsumerChannel (
6185 consumerProperties : Map <String , Any >,
6286 topics : Set <String >,
6387 name : String = "kafka-channel",
88+ bufferCapacity : Int = consumerProperties.maxPollBufferCapacity,
6489 pollInterval : Duration = DEFAULT_POLL_INTERVAL ,
6590 consumer : Consumer <K , V > = KafkaConsumer (consumerProperties),
66- rebalanceListener : ConsumerRebalanceListener = loggingConsumerRebalanceListener(),
67- init : Consumer <K , V >.() -> Unit = { subscribe(topics, rebalanceListener) },
91+ seekTopicPartitions : Consumer <K , V >.(List <TopicPartition >) -> Unit = {},
6892): ReceiveChannel <ConsumerRecords <K , V >> {
6993 return object :
7094 KafkaConsumerChannel <K , V , ConsumerRecords <K , V >>(
7195 consumerProperties,
7296 topics,
7397 name,
98+ bufferCapacity,
7499 pollInterval,
75100 consumer,
76- init
101+ noAckConsumerInit(topics, seekTopicPartitions),
77102 ) {
78103 override suspend fun preProcessPollSet (
79104 records : ConsumerRecords <K , V >,
@@ -100,6 +125,7 @@ fun <K, V> kafkaAckConsumerChannel(
100125 consumerProperties : Map <String , Any >,
101126 topics : Set <String >,
102127 name : String = "kafka-channel",
128+ bufferCapacity : Int = consumerProperties.maxPollBufferCapacity,
103129 pollInterval : Duration = DEFAULT_POLL_INTERVAL ,
104130 consumer : Consumer <K , V > = KafkaConsumer (consumerProperties),
105131 rebalanceListener : ConsumerRebalanceListener = loggingConsumerRebalanceListener(),
@@ -109,6 +135,7 @@ fun <K, V> kafkaAckConsumerChannel(
109135 consumerProperties,
110136 topics,
111137 name,
138+ bufferCapacity,
112139 pollInterval,
113140 consumer,
114141 init
@@ -132,6 +159,7 @@ internal class KafkaAckConsumerChannel<K, V>(
132159 consumerProperties : Map <String , Any >,
133160 topics : Set <String >,
134161 name : String ,
162+ bufferCapacity : Int ,
135163 pollInterval : Duration ,
136164 consumer : Consumer <K , V >,
137165 init : Consumer <K , V >.() -> Unit
@@ -140,6 +168,7 @@ internal class KafkaAckConsumerChannel<K, V>(
140168 consumerProperties,
141169 topics,
142170 name,
171+ bufferCapacity,
143172 pollInterval,
144173 consumer,
145174 init
@@ -206,6 +235,7 @@ abstract class KafkaConsumerChannel<K, V, R>(
206235 consumerProperties : Map <String , Any >,
207236 topics : Set <String > = emptySet(),
208237 name : String = " kafka-channel" ,
238+ bufferCapacity : Int = consumerProperties.maxPollBufferCapacity,
209239 private val pollInterval : Duration = DEFAULT_POLL_INTERVAL ,
210240 private val consumer : Consumer <K , V > = KafkaConsumer (consumerProperties),
211241 private val init : Consumer <K , V >.() -> Unit = { subscribe(topics) },
@@ -222,8 +252,7 @@ abstract class KafkaConsumerChannel<K, V, R>(
222252 isDaemon = true ,
223253 start = false
224254 )
225- val sendChannel = Channel <R >(Channel .UNLIMITED )
226-
255+ private val sendChannel = Channel <R >(capacity = bufferCapacity)
227256 private fun <K , V > Consumer <K , V >.poll (duration : Duration ) = poll(duration.toJavaDuration())
228257
229258 private fun <T , L : Iterable <T >> L.ifEmpty (block : () -> L ): L =
0 commit comments