Skip to content

Commit 554664c

Browse files
committed
fixes
1 parent 4c5e1bf commit 554664c

File tree

4 files changed

+107
-103
lines changed

4 files changed

+107
-103
lines changed

cli/src/main/kotlin/tech/figure/kafka/cli/RetryMain.kt

Lines changed: 15 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,40 +1,34 @@
11
package tech.figure.kafka.cli
22

33
import ch.qos.logback.classic.Level
4-
import tech.figure.coroutines.retry.flow.retryFlow
5-
import tech.figure.coroutines.retry.tryOnEachProcess
6-
import tech.figure.coroutines.tryOnEach
7-
import tech.figure.kafka.records.acking
8-
import tech.figure.kafka.coroutines.channels.kafkaConsumerChannel
9-
import tech.figure.kafka.coroutines.channels.kafkaProducerChannel
10-
import tech.figure.kafka.coroutines.retry.KAFKA_RETRY_ATTEMPTS_HEADER
11-
import tech.figure.kafka.coroutines.retry.flow.KafkaFlowRetry
12-
import tech.figure.kafka.coroutines.retry.store.inMemoryConsumerRecordStore
13-
import tech.figure.kafka.coroutines.retry.toByteArray
14-
import tech.figure.kafka.coroutines.retry.toInt
15-
import tech.figure.kafka.coroutines.retry.tryOnEach
16-
import kotlin.time.Duration.Companion.days
174
import kotlin.time.Duration.Companion.seconds
5+
import kotlin.time.ExperimentalTime
186
import kotlinx.coroutines.Dispatchers
197
import kotlinx.coroutines.delay
208
import kotlinx.coroutines.flow.collect
219
import kotlinx.coroutines.flow.consumeAsFlow
22-
import kotlinx.coroutines.flow.map
2310
import kotlinx.coroutines.launch
2411
import kotlinx.coroutines.runBlocking
2512
import mu.KotlinLogging
2613
import org.apache.kafka.clients.CommonClientConfigs
27-
import org.apache.kafka.clients.admin.AdminClient
28-
import org.apache.kafka.clients.admin.NewTopic
2914
import org.apache.kafka.clients.consumer.ConsumerConfig
3015
import org.apache.kafka.clients.consumer.ConsumerRecord
3116
import org.apache.kafka.clients.producer.ProducerConfig
3217
import org.apache.kafka.clients.producer.ProducerRecord
3318
import org.apache.kafka.common.serialization.ByteArrayDeserializer
3419
import org.apache.kafka.common.serialization.ByteArraySerializer
35-
import kotlin.time.ExperimentalTime
20+
import tech.figure.coroutines.retry.flow.retryFlow
21+
import tech.figure.coroutines.tryOnEach
22+
import tech.figure.kafka.coroutines.channels.kafkaConsumerChannel
23+
import tech.figure.kafka.coroutines.channels.kafkaProducerChannel
24+
import tech.figure.kafka.coroutines.retry.KAFKA_RETRY_ATTEMPTS_HEADER
25+
import tech.figure.kafka.coroutines.retry.flow.KafkaFlowRetry
26+
import tech.figure.kafka.coroutines.retry.store.inMemoryConsumerRecordStore
27+
import tech.figure.kafka.coroutines.retry.toByteArray
28+
import tech.figure.kafka.coroutines.retry.toInt
29+
import tech.figure.kafka.coroutines.retry.tryOnEach
30+
import tech.figure.kafka.records.acking
3631

37-
@OptIn(ExperimentalTime::class)
3832
fun main() = runBlocking {
3933
log {
4034
"ROOT".level = Level.DEBUG
@@ -77,9 +71,10 @@ fun main() = runBlocking {
7771
}
7872
}
7973

80-
private fun someHandler(): suspend (ConsumerRecord<ByteArray, ByteArray>) -> Unit = fn@{
74+
private fun someHandler(): suspend (List<ConsumerRecord<ByteArray, ByteArray>>) -> Unit = fn@{
8175
val log = KotlinLogging.logger {}
82-
val retryAttempt = it.headers().lastHeader(KAFKA_RETRY_ATTEMPTS_HEADER)?.value()?.toInt()
76+
val firstRec = it.firstOrNull() ?: return@fn
77+
val retryAttempt = firstRec.headers().lastHeader(KAFKA_RETRY_ATTEMPTS_HEADER)?.value()?.toInt()
8378

8479
// Let it pass on attempt 5
8580
// val index = it.key().toInt()

ft-coroutines-kafka-retry/src/main/kotlin/tech/figure/kafka/coroutines/retry/flow/KafkaFlowRetry.kt

Lines changed: 39 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,18 @@
11
package tech.figure.kafka.coroutines.retry.flow
22

3-
import tech.figure.coroutines.retry.store.RetryRecord
4-
import tech.figure.coroutines.retry.flow.FlowRetry
5-
import tech.figure.coroutines.retry.store.RetryRecordStore
6-
import tech.figure.kafka.coroutines.retry.DEFAULT_RECORD_REPROCESS_GROUP_SIZE
7-
import tech.figure.kafka.coroutines.retry.KAFKA_RETRY_ATTEMPTS_HEADER
8-
import tech.figure.kafka.coroutines.retry.toByteArray
93
import java.time.OffsetDateTime
104
import kotlinx.coroutines.flow.asFlow
115
import mu.KotlinLogging
126
import org.apache.kafka.clients.consumer.ConsumerRecord
137
import org.apache.kafka.common.header.Header
148
import org.apache.kafka.common.header.Headers
159
import org.apache.kafka.common.header.internals.RecordHeader
10+
import tech.figure.coroutines.retry.flow.FlowRetry
11+
import tech.figure.coroutines.retry.store.RetryRecord
12+
import tech.figure.coroutines.retry.store.RetryRecordStore
13+
import tech.figure.kafka.coroutines.retry.DEFAULT_RECORD_REPROCESS_GROUP_SIZE
14+
import tech.figure.kafka.coroutines.retry.KAFKA_RETRY_ATTEMPTS_HEADER
15+
import tech.figure.kafka.coroutines.retry.toByteArray
1616

1717
/**
1818
* Retry a flow of kafka records.
@@ -22,10 +22,10 @@ import org.apache.kafka.common.header.internals.RecordHeader
2222
* @param groupSize Process a max of this many elements each poll loop.
2323
*/
2424
open class KafkaFlowRetry<K, V>(
25-
private val handlers: Map<String, suspend (ConsumerRecord<K, V>) -> Unit>,
26-
private val store: RetryRecordStore<ConsumerRecord<K, V>>,
25+
private val handlers: Map<String, suspend (List<ConsumerRecord<K, V>>) -> Unit>,
26+
private val store: RetryRecordStore<List<ConsumerRecord<K, V>>>,
2727
private val groupSize: Int = DEFAULT_RECORD_REPROCESS_GROUP_SIZE,
28-
) : FlowRetry<ConsumerRecord<K, V>> {
28+
) : FlowRetry<List<ConsumerRecord<K, V>>> {
2929
private val log = KotlinLogging.logger {}
3030

3131
override suspend fun hasNext(): Boolean = !store.isEmpty()
@@ -34,40 +34,52 @@ open class KafkaFlowRetry<K, V>(
3434
attemptRange: IntRange,
3535
olderThan: OffsetDateTime,
3636
limit: Int,
37-
) = store.select(attemptRange, olderThan, limit).sortedByDescending { it.lastAttempted }.asFlow()
37+
) =
38+
store
39+
.select(attemptRange, olderThan, limit)
40+
.sortedByDescending { it.lastAttempted }
41+
.asFlow()
3842

39-
override suspend fun send(
40-
item: ConsumerRecord<K, V>,
41-
e: Throwable
42-
) {
43-
log.debug { "adding record to retry queue key:${item.key()} source:${item.topic()}-${item.partition()}" }
43+
override suspend fun send(item: List<ConsumerRecord<K, V>>, e: Throwable) {
44+
log.debug {
45+
"adding record to retry queue count:${item.size} sources:${item.map { "${it.topic()}-${it.partition()}" } }"
46+
}
4447
store.insert(item, e)
4548
}
4649

47-
override suspend fun onSuccess(
48-
item: RetryRecord<ConsumerRecord<K, V>>
49-
) {
50-
log.debug { "successful reprocess attempt:${item.attempt} key:${item.data.key()} source:${item.data.topic()}-${item.data.partition()}" }
50+
override suspend fun onSuccess(item: RetryRecord<List<ConsumerRecord<K, V>>>) {
51+
log.debug {
52+
"successful reprocess attempt:${item.attempt} count:${item.data.size} sources:${item.data.map { "${it.topic()}-${it.partition()}" } }"
53+
}
5154
store.remove(item.data)
5255
}
5356

54-
override suspend fun onFailure(item: RetryRecord<ConsumerRecord<K, V>>, e: Throwable) {
55-
log.debug { "failed reprocess attempt:${item.attempt} Error: ${item.lastException} key:${item.data.key()} source:${item.data.topic()}-${item.data.partition()}" }
57+
override suspend fun onFailure(item: RetryRecord<List<ConsumerRecord<K, V>>>, e: Throwable) {
58+
log.debug {
59+
"failed reprocess attempt:${item.attempt} Error: ${item.lastException} count:${item.data.size} sources:${item.data.map { "${it.topic()}-${it.partition()}" } }"
60+
}
5661
store.update(item.data, e)
5762
}
5863

5964
override suspend fun process(
60-
item: ConsumerRecord<K, V>,
65+
item: List<ConsumerRecord<K, V>>,
6166
attempt: Int,
6267
) {
63-
val topic = item.topic()
64-
val handler = handlers[topic] ?: throw RuntimeException("topic '$topic' not handled by this retry handler")
68+
val topic = item.firstOrNull()?.topic() ?: return
69+
val handler =
70+
handlers[topic]
71+
?: throw RuntimeException("topic '$topic' not handled by this retry handler")
6572

66-
log.debug { "processing key:${item.key()} attempt:$attempt source:${item.topic()}-${item.partition()}" }
67-
handler(item.setHeader(KAFKA_RETRY_ATTEMPTS_HEADER, attempt.toByteArray()))
73+
log.debug {
74+
"processing count:${item.size} sources:${item.map { "${it.topic()}-${it.partition()}" } }"
75+
}
76+
handler(item.map { it.setHeader(KAFKA_RETRY_ATTEMPTS_HEADER, attempt.toByteArray()) })
6877
}
6978

70-
private fun <K, V> ConsumerRecord<K, V>.setHeader(key: String, value: ByteArray): ConsumerRecord<K, V> = apply {
79+
private fun <K, V> ConsumerRecord<K, V>.setHeader(
80+
key: String,
81+
value: ByteArray
82+
): ConsumerRecord<K, V> = apply {
7183
fun Headers.addOrUpdate(header: Header): Headers {
7284
val h = find { it.key() == header.key() }
7385
if (h == null) {

ft-coroutines-kafka-retry/src/main/kotlin/tech/figure/kafka/coroutines/retry/store/InMemoryConsumerRecordStore.kt

Lines changed: 39 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -6,56 +6,51 @@ import tech.figure.coroutines.retry.store.RetryRecord
66
import tech.figure.coroutines.retry.store.RetryRecordStore
77

88
fun <K, V> inMemoryConsumerRecordStore(
9-
data: MutableList<RetryRecord<ConsumerRecord<K, V>>> = mutableListOf()
10-
) = object : RetryRecordStore<ConsumerRecord<K, V>> {
11-
override suspend fun isEmpty(): Boolean =
12-
data.isEmpty()
13-
14-
override suspend fun select(
15-
attemptRange: IntRange,
16-
lastAttempted: OffsetDateTime,
17-
limit: Int,
18-
): List<RetryRecord<ConsumerRecord<K, V>>> {
19-
return data
20-
.filter { it.attempt in attemptRange && it.lastAttempted.isBefore(lastAttempted) }
21-
.sortedBy { it.data.timestamp() }
22-
.take(limit)
23-
}
9+
data: MutableList<RetryRecord<List<ConsumerRecord<K, V>>>> = mutableListOf()
10+
) =
11+
object : RetryRecordStore<List<ConsumerRecord<K, V>>> {
12+
override suspend fun isEmpty(): Boolean = data.isEmpty()
13+
14+
override suspend fun select(
15+
attemptRange: IntRange,
16+
lastAttempted: OffsetDateTime,
17+
limit: Int,
18+
): List<RetryRecord<List<ConsumerRecord<K, V>>>> {
19+
return data
20+
.filter { it.attempt in attemptRange && it.lastAttempted.isBefore(lastAttempted) }
21+
.sortedBy { it.data.firstOrNull()?.timestamp() }
22+
.take(limit)
23+
}
2424

25-
override suspend fun get(
26-
item: ConsumerRecord<K, V>
27-
): RetryRecord<ConsumerRecord<K, V>>? {
28-
return data.firstOrNull(recordMatches(item))
29-
}
25+
override suspend fun get(
26+
item: List<ConsumerRecord<K, V>>
27+
): RetryRecord<List<ConsumerRecord<K, V>>>? {
28+
return data.firstOrNull(recordMatches(item))
29+
}
3030

31-
override suspend fun insert(item: ConsumerRecord<K, V>, e: Throwable?) {
32-
data += RetryRecord(item, lastException = e?.localizedMessage.orEmpty())
33-
}
31+
override suspend fun insert(item: List<ConsumerRecord<K, V>>, e: Throwable?) {
32+
data += RetryRecord(item, lastException = e?.localizedMessage.orEmpty())
33+
}
3434

35-
override suspend fun update(item: ConsumerRecord<K, V>, e: Throwable?) {
36-
val record = get(item) ?: error("record not found")
35+
override suspend fun update(item: List<ConsumerRecord<K, V>>, e: Throwable?) {
36+
val record = get(item) ?: error("record not found")
3737

38-
// Find and update the record in the data set.
39-
data[data.indexOf(record)].also {
40-
it.data = item
41-
it.attempt++
42-
it.lastAttempted = OffsetDateTime.now()
43-
it.lastException = e?.localizedMessage.orEmpty()
38+
// Find and update the record in the data set.
39+
data[data.indexOf(record)].also {
40+
it.data = item
41+
it.attempt++
42+
it.lastAttempted = OffsetDateTime.now()
43+
it.lastException = e?.localizedMessage.orEmpty()
44+
}
4445
}
45-
}
4646

47-
override suspend fun remove(item: ConsumerRecord<K, V>) {
48-
data.removeAll(recordMatches(item))
49-
}
47+
override suspend fun remove(item: List<ConsumerRecord<K, V>>) {
48+
data.removeAll(recordMatches(item))
49+
}
5050

51-
private fun <K, V> recordMatches(other: ConsumerRecord<K, V>): (RetryRecord<ConsumerRecord<K, V>>) -> Boolean {
52-
return {
53-
with(it.data) {
54-
key() == other.key() &&
55-
value() == other.value() &&
56-
topic() == other.topic() &&
57-
partition() == other.partition()
58-
}
51+
private fun <K, V> recordMatches(
52+
other: List<ConsumerRecord<K, V>>
53+
): (RetryRecord<List<ConsumerRecord<K, V>>>) -> Boolean {
54+
return { it == other }
5955
}
6056
}
61-
}

ft-coroutines-kafka/src/main/kotlin/tech/figure/kafka/coroutines/channels/ConsumerSendChannel.kt

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -148,15 +148,14 @@ fun <K, V> kafkaAckConsumerChannel(
148148
init: Consumer<K, V>.() -> Unit = { subscribe(topics, rebalanceListener) },
149149
): ReceiveChannel<List<UnAckedConsumerRecord<K, V>>> {
150150
return KafkaAckConsumerChannel(
151-
consumerProperties,
152-
topics,
153-
name,
154-
bufferCapacity,
155-
pollInterval,
156-
consumer,
157-
init
158-
)
159-
.also { Runtime.getRuntime().addShutdownHook(Thread { it.cancel() }) }
151+
consumerProperties,
152+
topics,
153+
name,
154+
bufferCapacity,
155+
pollInterval,
156+
consumer,
157+
init
158+
).also { Runtime.getRuntime().addShutdownHook(Thread { it.cancel() }) }
160159
}
161160

162161
/**
@@ -225,8 +224,8 @@ internal class KafkaAckConsumerChannel<K, V>(
225224
log.trace { "waiting for ${latch.count} more acks" }
226225
val it = ackChannel.receive()
227226
latch.countDown()
228-
229-
log.trace { "ack received: ${it.offsetAndMetadata}" }
227+
228+
log.trace { "ack received: ${it.topicPartition} => ${it.offsetAndMetadata}" }
230229
log.trace {
231230
" -> sending to broker ack(${it.duration.toMillis()}ms):${it.asCommitable()}"
232231
}
@@ -293,7 +292,10 @@ abstract class KafkaConsumerChannel<K, V, R>(
293292
}
294293

295294
protected fun commit(record: CommitConsumerRecord): OffsetAndMetadata {
296-
consumer.commitSync(record.asCommitable())
295+
val committable = record.asCommitable()
296+
log.trace { "trying commit => $committable" }
297+
consumer.commitSync(committable)
298+
log.trace { "trying commit success! $committable" }
297299
return record.offsetAndMetadata
298300
}
299301

0 commit comments

Comments
 (0)