Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.selects.SelectClause1
import mu.KotlinLogging
import org.apache.kafka.clients.consumer.Consumer
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.clients.consumer.ConsumerRecords
Expand All @@ -29,6 +30,21 @@ import tech.figure.kafka.records.UnAckedConsumerRecords
internal fun <K, V> List<ConsumerRecord<K, V>>.toConsumerRecords() =
groupBy { TopicPartition(it.topic(), it.partition()) }.let(::ConsumerRecords)

private const val DEFAULT_MAX_POLL_RECORDS = 500
private const val DEFAULT_BUFFER_FACTOR = 3

private val Map<String, Any>.maxPollBufferCapacity get(): Int {
// consumerConfig defined max poll records.
val fromConfig = this[ConsumerConfig.MAX_POLL_RECORDS_CONFIG] as? Int
// fallback to kafka default.
val fromKafkaDefault = ConsumerConfig.configDef().defaultValues()[ConsumerConfig.MAX_POLL_RECORDS_CONFIG] as? Int
// when all else fails...
val failsafe = DEFAULT_MAX_POLL_RECORDS

val maxPollRecords = fromConfig ?: fromKafkaDefault ?: failsafe
return maxPollRecords * DEFAULT_BUFFER_FACTOR
}

/**
* Default is to create a committable consumer channel for unacknowledged record processing.
*
Expand All @@ -38,11 +54,20 @@ fun <K, V> kafkaConsumerChannel(
consumerProperties: Map<String, Any>,
topics: Set<String>,
name: String = "kafka-channel",
bufferCapacity: Int = consumerProperties.maxPollBufferCapacity,
pollInterval: Duration = DEFAULT_POLL_INTERVAL,
consumer: Consumer<K, V> = KafkaConsumer(consumerProperties),
rebalanceListener: ConsumerRebalanceListener = loggingConsumerRebalanceListener(),
init: Consumer<K, V>.() -> Unit = { subscribe(topics, rebalanceListener) },
): ReceiveChannel<UnAckedConsumerRecords<K, V>> = kafkaAckConsumerChannel(consumerProperties, topics, name, pollInterval, consumer, rebalanceListener, init)
): ReceiveChannel<UnAckedConsumerRecords<K, V>> = kafkaAckConsumerChannel(consumerProperties, topics, name, bufferCapacity, pollInterval, consumer, rebalanceListener, init)

private fun <K, V> noAckConsumerInit(topics: Set<String>, seekTopicPartitions: Consumer<K, V>.(List<TopicPartition>) -> Unit): (Consumer<K, V>) -> Unit = { consumer ->
val tps = topics.flatMap {
consumer.partitionsFor(it).map { TopicPartition(it.topic(), it.partition()) }
}
consumer.assign(tps)
consumer.seekTopicPartitions(tps)
}

/**
* Create a [ReceiveChannel] for [ConsumerRecords] from kafka.
Expand All @@ -55,25 +80,25 @@ fun <K, V> kafkaConsumerChannel(
* @param init Callback for initializing the [Consumer].
* @return A non-running [KafkaConsumerChannel] instance that must be started via
* [KafkaConsumerChannel.start].

*/
fun <K, V> kafkaNoAckConsumerChannel(
consumerProperties: Map<String, Any>,
topics: Set<String>,
name: String = "kafka-channel",
bufferCapacity: Int = consumerProperties.maxPollBufferCapacity,
pollInterval: Duration = DEFAULT_POLL_INTERVAL,
consumer: Consumer<K, V> = KafkaConsumer(consumerProperties),
rebalanceListener: ConsumerRebalanceListener = loggingConsumerRebalanceListener(),
init: Consumer<K, V>.() -> Unit = { subscribe(topics, rebalanceListener) },
seekTopicPartitions: Consumer<K, V>.(List<TopicPartition>) -> Unit = {},
): ReceiveChannel<ConsumerRecords<K, V>> {
return object :
KafkaConsumerChannel<K, V, ConsumerRecords<K, V>>(
consumerProperties,
topics,
name,
bufferCapacity,
pollInterval,
consumer,
init
noAckConsumerInit(topics, seekTopicPartitions),
) {
override suspend fun preProcessPollSet(
records: ConsumerRecords<K, V>,
Expand All @@ -100,6 +125,7 @@ fun <K, V> kafkaAckConsumerChannel(
consumerProperties: Map<String, Any>,
topics: Set<String>,
name: String = "kafka-channel",
bufferCapacity: Int = consumerProperties.maxPollBufferCapacity,
pollInterval: Duration = DEFAULT_POLL_INTERVAL,
consumer: Consumer<K, V> = KafkaConsumer(consumerProperties),
rebalanceListener: ConsumerRebalanceListener = loggingConsumerRebalanceListener(),
Expand All @@ -109,6 +135,7 @@ fun <K, V> kafkaAckConsumerChannel(
consumerProperties,
topics,
name,
bufferCapacity,
pollInterval,
consumer,
init
Expand All @@ -132,6 +159,7 @@ internal class KafkaAckConsumerChannel<K, V>(
consumerProperties: Map<String, Any>,
topics: Set<String>,
name: String,
bufferCapacity: Int,
pollInterval: Duration,
consumer: Consumer<K, V>,
init: Consumer<K, V>.() -> Unit
Expand All @@ -140,6 +168,7 @@ internal class KafkaAckConsumerChannel<K, V>(
consumerProperties,
topics,
name,
bufferCapacity,
pollInterval,
consumer,
init
Expand Down Expand Up @@ -206,6 +235,7 @@ abstract class KafkaConsumerChannel<K, V, R>(
consumerProperties: Map<String, Any>,
topics: Set<String> = emptySet(),
name: String = "kafka-channel",
bufferCapacity: Int = consumerProperties.maxPollBufferCapacity,
private val pollInterval: Duration = DEFAULT_POLL_INTERVAL,
private val consumer: Consumer<K, V> = KafkaConsumer(consumerProperties),
private val init: Consumer<K, V>.() -> Unit = { subscribe(topics) },
Expand All @@ -222,8 +252,7 @@ abstract class KafkaConsumerChannel<K, V, R>(
isDaemon = true,
start = false
)
val sendChannel = Channel<R>(Channel.UNLIMITED)

private val sendChannel = Channel<R>(capacity = bufferCapacity)
private fun <K, V> Consumer<K, V>.poll(duration: Duration) = poll(duration.toJavaDuration())

private fun <T, L : Iterable<T>> L.ifEmpty(block: () -> L): L =
Expand Down