Skip to content

Commit 4e7ddbe

Browse files
authored
Fix issue with multiple partitions halting processing (#52)
* Fix issue with multiple partitions halting processing
1 parent f713859 commit 4e7ddbe

File tree

3 files changed

+11
-10
lines changed

3 files changed

+11
-10
lines changed

ft-coroutines-kafka/src/main/kotlin/tech/figure/kafka/coroutines/channels/ConsumerSendChannel.kt

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -197,7 +197,7 @@ internal class KafkaAckConsumerChannel<K, V>(
197197
log.trace { "preProcessPollSet(tp:$topicPartition count:${records.size})" }
198198
val ackChannel =
199199
Channel<CommitConsumerRecord>(capacity = records.size).also {
200-
context["ack-channel"] = it
200+
context["ack-channel-$topicPartition"] = it
201201
}
202202
return records.map {
203203
val timestamp = System.currentTimeMillis()
@@ -212,7 +212,7 @@ internal class KafkaAckConsumerChannel<K, V>(
212212
context: Map<String, Any>
213213
) {
214214
log.trace { "postProcessPollSet(tp:$topicPartition count:${records.size})" }
215-
val ackChannel = context["ack-channel"]!! as ReceiveChannel<CommitConsumerRecord>
215+
val ackChannel = context["ack-channel-$topicPartition"]!! as ReceiveChannel<CommitConsumerRecord>
216216
if (records.isEmpty()) {
217217
log.trace { "empty record set, not waiting for acks" }
218218
return
@@ -303,20 +303,20 @@ abstract class KafkaConsumerChannel<K, V, R>(
303303
fun run() {
304304
consumer.init()
305305

306-
log.info("starting thread for ${consumer.subscription()}")
306+
log.info { "starting thread for ${consumer.subscription()}" }
307307
runBlocking {
308-
log.info("${coroutineContext.job} running consumer ${consumer.subscription()}")
308+
log.info { "${coroutineContext.job} running consumer ${consumer.subscription()}" }
309309
try {
310310
while (!sendChannel.isClosedForSend) {
311-
log.trace("poll(topics:${consumer.subscription()}) ...")
311+
log.trace { "poll(topics:${consumer.subscription()}) ..." }
312312
val polled =
313313
consumer.poll(Duration.ZERO).ifEmpty { consumer.poll(pollInterval) }
314314
val polledCount = polled.count()
315315
if (polledCount == 0) {
316316
continue
317317
}
318318

319-
log.trace("poll(topics:${consumer.subscription()}) got $polledCount records.")
319+
log.trace { "poll(topics:${consumer.subscription()}) got $polledCount records." }
320320

321321
// Convert to internal types.
322322
val context = mutableMapOf<String, Any>()
@@ -334,7 +334,7 @@ abstract class KafkaConsumerChannel<K, V, R>(
334334
}
335335
}
336336
} finally {
337-
log.info("${coroutineContext.job} shutting down consumer thread")
337+
log.info { "${coroutineContext.job} shutting down consumer thread" }
338338
try {
339339
sendChannel.cancel(CancellationException("consumer shut down"))
340340
consumer.unsubscribe()
@@ -352,7 +352,7 @@ abstract class KafkaConsumerChannel<K, V, R>(
352352
if (!thread.isAlive) {
353353
synchronized(thread) {
354354
if (!thread.isAlive) {
355-
log.info("starting consumer thread")
355+
log.info { "starting consumer thread" }
356356
thread.start()
357357
}
358358
}

ft-coroutines-kafka/src/main/kotlin/tech/figure/kafka/records/AckConsumerRecord.kt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,3 +11,5 @@ class AckedConsumerRecordImpl<K, V>(
1111
record: ConsumerRecord<K, V>,
1212
override val metadata: OffsetAndMetadata
1313
) : AckedConsumerRecord<K, V>, KafkaRecord<K, V> by wrapping(record)
14+
15+
typealias AckedConsumerRecords<K, V> = List<AckedConsumerRecord<K, V>>

ft-coroutines-kafka/src/main/kotlin/tech/figure/kafka/records/UnAckedConsumerRecord.kt

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,5 +59,4 @@ class UnAckedConsumerRecordImpl<K, V>(
5959
""".trimIndent().split('\n').joinToString { it.trim() }
6060
}
6161

62-
@JvmInline
63-
value class UnAckedConsumerRecords<K, V>(val records: List<UnAckedConsumerRecord<K, V>>)
62+
typealias UnAckedConsumerRecords<K, V> = List<UnAckedConsumerRecord<K, V>>

0 commit comments

Comments
 (0)