Skip to content

Commit f713859

Browse files
authored
Merge pull request #51 from FigureTechnologies/mtps/kafka-fixes
Fix kafka polling processes
2 parents d221e0f + 554664c commit f713859

File tree

9 files changed

+238
-197
lines changed

9 files changed

+238
-197
lines changed

README.md

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,14 +29,19 @@ A collection of kotlin coroutines flow based projects to create channels and flo
2929
<artifactId>ft-coroutines-core</artifactId>
3030
<version>${version}</version>
3131
</dependency>
32+
<dependency>
33+
<groupId>tech.figure.coroutines</groupId>
34+
<artifactId>ft-coroutines-retry</artifactId>
35+
<version>${version}</version>
36+
</dependency>
3237
<dependency>
3338
<groupId>tech.figure.coroutines</groupId>
3439
<artifactId>ft-coroutines-kafka</artifactId>
3540
<version>${version}</version>
3641
</dependency>
3742
<dependency>
3843
<groupId>tech.figure.coroutines</groupId>
39-
<artifactId>ft-coroutines-retry</artifactId>
44+
<artifactId>ft-coroutines-kafka-retry</artifactId>
4045
<version>${version}</version>
4146
</dependency>
4247
</dependencies>
@@ -50,8 +55,10 @@ In `build.gradle`:
5055

5156
```groovy
5257
implementation 'tech.figure.coroutines:ft-coroutines-core:${version}'
53-
implementation 'tech.figure.coroutines:ft-coroutines-kafka:${version}'
5458
implementation 'tech.figure.coroutines:ft-coroutines-retry:${version}'
59+
60+
implementation 'tech.figure.coroutines:ft-coroutines-kafka:${version}'
61+
implementation 'tech.figure.coroutines:ft-coroutines-kafka-retry:${version}'
5562
```
5663

5764
#### Kotlin
@@ -60,9 +67,18 @@ In `build.gradle.kts`:
6067

6168
```kotlin
6269
implementation("tech.figure.coroutines", "ft-coroutines-core", version)
63-
implementation("tech.figure.coroutines", "ft-coroutines-kafka", version)
6470
implementation("tech.figure.coroutines", "ft-coroutines-retry", version)
71+
72+
implementation("tech.figure.coroutines", "ft-coroutines-kafka", version)
73+
implementation("tech.figure.coroutines", "ft-coroutines-kafka-retry", version)
6574
```
6675

67-
## Usage
76+
## Libraries
77+
78+
[Coroutines-Core](ft-coroutines-core) Contains common coroutines helper functions, such as `tryMap`, `tryOnEach` and `chunked`.
79+
80+
[Coroutines-Retry](ft-coroutines-retry) Contains core retry logic.
81+
82+
[Coroutines-Kafka](ft-coroutines-kafka) Contains kafka methods and helpers.
6883

84+
[Coroutines-Kafka-Retry](ft-coroutines-kafka-retry) Contains retry logic for kafka.

cli/src/main/kotlin/tech/figure/kafka/cli/RetryMain.kt

Lines changed: 15 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,40 +1,34 @@
11
package tech.figure.kafka.cli
22

33
import ch.qos.logback.classic.Level
4-
import tech.figure.coroutines.retry.flow.retryFlow
5-
import tech.figure.coroutines.retry.tryOnEachProcess
6-
import tech.figure.coroutines.tryOnEach
7-
import tech.figure.kafka.records.acking
8-
import tech.figure.kafka.coroutines.channels.kafkaConsumerChannel
9-
import tech.figure.kafka.coroutines.channels.kafkaProducerChannel
10-
import tech.figure.kafka.coroutines.retry.KAFKA_RETRY_ATTEMPTS_HEADER
11-
import tech.figure.kafka.coroutines.retry.flow.KafkaFlowRetry
12-
import tech.figure.kafka.coroutines.retry.store.inMemoryConsumerRecordStore
13-
import tech.figure.kafka.coroutines.retry.toByteArray
14-
import tech.figure.kafka.coroutines.retry.toInt
15-
import tech.figure.kafka.coroutines.retry.tryOnEach
16-
import kotlin.time.Duration.Companion.days
174
import kotlin.time.Duration.Companion.seconds
5+
import kotlin.time.ExperimentalTime
186
import kotlinx.coroutines.Dispatchers
197
import kotlinx.coroutines.delay
208
import kotlinx.coroutines.flow.collect
219
import kotlinx.coroutines.flow.consumeAsFlow
22-
import kotlinx.coroutines.flow.map
2310
import kotlinx.coroutines.launch
2411
import kotlinx.coroutines.runBlocking
2512
import mu.KotlinLogging
2613
import org.apache.kafka.clients.CommonClientConfigs
27-
import org.apache.kafka.clients.admin.AdminClient
28-
import org.apache.kafka.clients.admin.NewTopic
2914
import org.apache.kafka.clients.consumer.ConsumerConfig
3015
import org.apache.kafka.clients.consumer.ConsumerRecord
3116
import org.apache.kafka.clients.producer.ProducerConfig
3217
import org.apache.kafka.clients.producer.ProducerRecord
3318
import org.apache.kafka.common.serialization.ByteArrayDeserializer
3419
import org.apache.kafka.common.serialization.ByteArraySerializer
35-
import kotlin.time.ExperimentalTime
20+
import tech.figure.coroutines.retry.flow.retryFlow
21+
import tech.figure.coroutines.tryOnEach
22+
import tech.figure.kafka.coroutines.channels.kafkaConsumerChannel
23+
import tech.figure.kafka.coroutines.channels.kafkaProducerChannel
24+
import tech.figure.kafka.coroutines.retry.KAFKA_RETRY_ATTEMPTS_HEADER
25+
import tech.figure.kafka.coroutines.retry.flow.KafkaFlowRetry
26+
import tech.figure.kafka.coroutines.retry.store.inMemoryConsumerRecordStore
27+
import tech.figure.kafka.coroutines.retry.toByteArray
28+
import tech.figure.kafka.coroutines.retry.toInt
29+
import tech.figure.kafka.coroutines.retry.tryOnEach
30+
import tech.figure.kafka.records.acking
3631

37-
@OptIn(ExperimentalTime::class)
3832
fun main() = runBlocking {
3933
log {
4034
"ROOT".level = Level.DEBUG
@@ -77,9 +71,10 @@ fun main() = runBlocking {
7771
}
7872
}
7973

80-
private fun someHandler(): suspend (ConsumerRecord<ByteArray, ByteArray>) -> Unit = fn@{
74+
private fun someHandler(): suspend (List<ConsumerRecord<ByteArray, ByteArray>>) -> Unit = fn@{
8175
val log = KotlinLogging.logger {}
82-
val retryAttempt = it.headers().lastHeader(KAFKA_RETRY_ATTEMPTS_HEADER)?.value()?.toInt()
76+
val firstRec = it.firstOrNull() ?: return@fn
77+
val retryAttempt = firstRec.headers().lastHeader(KAFKA_RETRY_ATTEMPTS_HEADER)?.value()?.toInt()
8378

8479
// Let it pass on attempt 5
8580
// val index = it.key().toInt()
Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,19 @@
11
package tech.figure.kafka.coroutines.retry
22

3+
import kotlinx.coroutines.flow.Flow
4+
import org.apache.kafka.clients.consumer.ConsumerRecord
35
import tech.figure.coroutines.retry.flow.FlowProcessor
46
import tech.figure.coroutines.retry.tryOnEachProcess
57
import tech.figure.coroutines.tryOnEach
68
import tech.figure.kafka.coroutines.retry.flow.lifted
79
import tech.figure.kafka.records.UnAckedConsumerRecord
8-
import tech.figure.kafka.records.UnAckedConsumerRecords
9-
import kotlinx.coroutines.flow.Flow
10-
import org.apache.kafka.clients.consumer.ConsumerRecord
1110

1211
/**
1312
* Kafka specific implementation of [tryOnEach] to lift the [UnAckedConsumerRecord] into a basic [ConsumerRecord] for processing.
1413
*
1514
* @param flowProcessor The [FlowProcessor] to use for processing the stream of records.
1615
* @return The original flow.
1716
*/
18-
fun <K, V> Flow<UnAckedConsumerRecords<K, V>>.tryOnEach(
19-
flowProcessor: FlowProcessor<ConsumerRecord<K, V>>
20-
): Flow<UnAckedConsumerRecords<K, V>> = tryOnEachProcess(flowProcessor.lifted())
17+
fun <K, V> Flow<List<UnAckedConsumerRecord<K, V>>>.tryOnEach(
18+
flowProcessor: FlowProcessor<List<ConsumerRecord<K, V>>>
19+
): Flow<List<UnAckedConsumerRecord<K, V>>> = tryOnEachProcess(flowProcessor.lifted())

ft-coroutines-kafka-retry/src/main/kotlin/tech/figure/kafka/coroutines/retry/flow/FlowExtensions.kt

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,23 @@
11
package tech.figure.kafka.coroutines.retry.flow
22

3+
import org.apache.kafka.clients.consumer.ConsumerRecord
34
import tech.figure.coroutines.retry.flow.FlowProcessor
45
import tech.figure.kafka.coroutines.retry.tryOnEach
56
import tech.figure.kafka.records.UnAckedConsumerRecord
6-
import tech.figure.kafka.records.UnAckedConsumerRecords
7-
import org.apache.kafka.clients.consumer.ConsumerRecord
87

98
/**
109
* Lift the [FlowProcessor] from [UnAckedConsumerRecord] to [ConsumerRecord] type.
1110
*
1211
* This is needed to match the flow type of [tryOnEach] when processing with [KafkaFlowRetry].
1312
*/
14-
fun <K, V> FlowProcessor<ConsumerRecord<K, V>>.lifted(): FlowProcessor<UnAckedConsumerRecords<K, V>> {
15-
return object : FlowProcessor<UnAckedConsumerRecords<K, V>> {
16-
override suspend fun send(item: UnAckedConsumerRecords<K, V>, e: Throwable) {
17-
item.forEach { this@lifted.send(it.toConsumerRecord(), e) }
13+
fun <K, V> FlowProcessor<List<ConsumerRecord<K, V>>>.lifted(): FlowProcessor<List<UnAckedConsumerRecord<K, V>>> {
14+
return object : FlowProcessor<List<UnAckedConsumerRecord<K, V>>> {
15+
override suspend fun send(item: List<UnAckedConsumerRecord<K, V>>, e: Throwable) {
16+
this@lifted.send(item.map { it.toConsumerRecord() }, e)
1817
}
1918

20-
override suspend fun process(item: UnAckedConsumerRecords<K, V>, attempt: Int) {
21-
item.forEach { this@lifted.process(it.toConsumerRecord()) }
19+
override suspend fun process(item: List<UnAckedConsumerRecord<K, V>>, attempt: Int) {
20+
this@lifted.process(item.map { it.toConsumerRecord() })
2221
}
2322
}
2423
}

ft-coroutines-kafka-retry/src/main/kotlin/tech/figure/kafka/coroutines/retry/flow/KafkaFlowRetry.kt

Lines changed: 39 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,18 @@
11
package tech.figure.kafka.coroutines.retry.flow
22

3-
import tech.figure.coroutines.retry.store.RetryRecord
4-
import tech.figure.coroutines.retry.flow.FlowRetry
5-
import tech.figure.coroutines.retry.store.RetryRecordStore
6-
import tech.figure.kafka.coroutines.retry.DEFAULT_RECORD_REPROCESS_GROUP_SIZE
7-
import tech.figure.kafka.coroutines.retry.KAFKA_RETRY_ATTEMPTS_HEADER
8-
import tech.figure.kafka.coroutines.retry.toByteArray
93
import java.time.OffsetDateTime
104
import kotlinx.coroutines.flow.asFlow
115
import mu.KotlinLogging
126
import org.apache.kafka.clients.consumer.ConsumerRecord
137
import org.apache.kafka.common.header.Header
148
import org.apache.kafka.common.header.Headers
159
import org.apache.kafka.common.header.internals.RecordHeader
10+
import tech.figure.coroutines.retry.flow.FlowRetry
11+
import tech.figure.coroutines.retry.store.RetryRecord
12+
import tech.figure.coroutines.retry.store.RetryRecordStore
13+
import tech.figure.kafka.coroutines.retry.DEFAULT_RECORD_REPROCESS_GROUP_SIZE
14+
import tech.figure.kafka.coroutines.retry.KAFKA_RETRY_ATTEMPTS_HEADER
15+
import tech.figure.kafka.coroutines.retry.toByteArray
1616

1717
/**
1818
* Retry a flow of kafka records.
@@ -22,10 +22,10 @@ import org.apache.kafka.common.header.internals.RecordHeader
2222
* @param groupSize Process a max of this many elements each poll loop.
2323
*/
2424
open class KafkaFlowRetry<K, V>(
25-
private val handlers: Map<String, suspend (ConsumerRecord<K, V>) -> Unit>,
26-
private val store: RetryRecordStore<ConsumerRecord<K, V>>,
25+
private val handlers: Map<String, suspend (List<ConsumerRecord<K, V>>) -> Unit>,
26+
private val store: RetryRecordStore<List<ConsumerRecord<K, V>>>,
2727
private val groupSize: Int = DEFAULT_RECORD_REPROCESS_GROUP_SIZE,
28-
) : FlowRetry<ConsumerRecord<K, V>> {
28+
) : FlowRetry<List<ConsumerRecord<K, V>>> {
2929
private val log = KotlinLogging.logger {}
3030

3131
override suspend fun hasNext(): Boolean = !store.isEmpty()
@@ -34,40 +34,52 @@ open class KafkaFlowRetry<K, V>(
3434
attemptRange: IntRange,
3535
olderThan: OffsetDateTime,
3636
limit: Int,
37-
) = store.select(attemptRange, olderThan, limit).sortedByDescending { it.lastAttempted }.asFlow()
37+
) =
38+
store
39+
.select(attemptRange, olderThan, limit)
40+
.sortedByDescending { it.lastAttempted }
41+
.asFlow()
3842

39-
override suspend fun send(
40-
item: ConsumerRecord<K, V>,
41-
e: Throwable
42-
) {
43-
log.debug { "adding record to retry queue key:${item.key()} source:${item.topic()}-${item.partition()}" }
43+
override suspend fun send(item: List<ConsumerRecord<K, V>>, e: Throwable) {
44+
log.debug {
45+
"adding record to retry queue count:${item.size} sources:${item.map { "${it.topic()}-${it.partition()}" } }"
46+
}
4447
store.insert(item, e)
4548
}
4649

47-
override suspend fun onSuccess(
48-
item: RetryRecord<ConsumerRecord<K, V>>
49-
) {
50-
log.debug { "successful reprocess attempt:${item.attempt} key:${item.data.key()} source:${item.data.topic()}-${item.data.partition()}" }
50+
override suspend fun onSuccess(item: RetryRecord<List<ConsumerRecord<K, V>>>) {
51+
log.debug {
52+
"successful reprocess attempt:${item.attempt} count:${item.data.size} sources:${item.data.map { "${it.topic()}-${it.partition()}" } }"
53+
}
5154
store.remove(item.data)
5255
}
5356

54-
override suspend fun onFailure(item: RetryRecord<ConsumerRecord<K, V>>, e: Throwable) {
55-
log.debug { "failed reprocess attempt:${item.attempt} Error: ${item.lastException} key:${item.data.key()} source:${item.data.topic()}-${item.data.partition()}" }
57+
override suspend fun onFailure(item: RetryRecord<List<ConsumerRecord<K, V>>>, e: Throwable) {
58+
log.debug {
59+
"failed reprocess attempt:${item.attempt} Error: ${item.lastException} count:${item.data.size} sources:${item.data.map { "${it.topic()}-${it.partition()}" } }"
60+
}
5661
store.update(item.data, e)
5762
}
5863

5964
override suspend fun process(
60-
item: ConsumerRecord<K, V>,
65+
item: List<ConsumerRecord<K, V>>,
6166
attempt: Int,
6267
) {
63-
val topic = item.topic()
64-
val handler = handlers[topic] ?: throw RuntimeException("topic '$topic' not handled by this retry handler")
68+
val topic = item.firstOrNull()?.topic() ?: return
69+
val handler =
70+
handlers[topic]
71+
?: throw RuntimeException("topic '$topic' not handled by this retry handler")
6572

66-
log.debug { "processing key:${item.key()} attempt:$attempt source:${item.topic()}-${item.partition()}" }
67-
handler(item.setHeader(KAFKA_RETRY_ATTEMPTS_HEADER, attempt.toByteArray()))
73+
log.debug {
74+
"processing count:${item.size} sources:${item.map { "${it.topic()}-${it.partition()}" } }"
75+
}
76+
handler(item.map { it.setHeader(KAFKA_RETRY_ATTEMPTS_HEADER, attempt.toByteArray()) })
6877
}
6978

70-
private fun <K, V> ConsumerRecord<K, V>.setHeader(key: String, value: ByteArray): ConsumerRecord<K, V> = apply {
79+
private fun <K, V> ConsumerRecord<K, V>.setHeader(
80+
key: String,
81+
value: ByteArray
82+
): ConsumerRecord<K, V> = apply {
7183
fun Headers.addOrUpdate(header: Header): Headers {
7284
val h = find { it.key() == header.key() }
7385
if (h == null) {

ft-coroutines-kafka-retry/src/main/kotlin/tech/figure/kafka/coroutines/retry/store/InMemoryConsumerRecordStore.kt

Lines changed: 39 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -6,56 +6,51 @@ import tech.figure.coroutines.retry.store.RetryRecord
66
import tech.figure.coroutines.retry.store.RetryRecordStore
77

88
fun <K, V> inMemoryConsumerRecordStore(
9-
data: MutableList<RetryRecord<ConsumerRecord<K, V>>> = mutableListOf()
10-
) = object : RetryRecordStore<ConsumerRecord<K, V>> {
11-
override suspend fun isEmpty(): Boolean =
12-
data.isEmpty()
13-
14-
override suspend fun select(
15-
attemptRange: IntRange,
16-
lastAttempted: OffsetDateTime,
17-
limit: Int,
18-
): List<RetryRecord<ConsumerRecord<K, V>>> {
19-
return data
20-
.filter { it.attempt in attemptRange && it.lastAttempted.isBefore(lastAttempted) }
21-
.sortedBy { it.data.timestamp() }
22-
.take(limit)
23-
}
9+
data: MutableList<RetryRecord<List<ConsumerRecord<K, V>>>> = mutableListOf()
10+
) =
11+
object : RetryRecordStore<List<ConsumerRecord<K, V>>> {
12+
override suspend fun isEmpty(): Boolean = data.isEmpty()
13+
14+
override suspend fun select(
15+
attemptRange: IntRange,
16+
lastAttempted: OffsetDateTime,
17+
limit: Int,
18+
): List<RetryRecord<List<ConsumerRecord<K, V>>>> {
19+
return data
20+
.filter { it.attempt in attemptRange && it.lastAttempted.isBefore(lastAttempted) }
21+
.sortedBy { it.data.firstOrNull()?.timestamp() }
22+
.take(limit)
23+
}
2424

25-
override suspend fun get(
26-
item: ConsumerRecord<K, V>
27-
): RetryRecord<ConsumerRecord<K, V>>? {
28-
return data.firstOrNull(recordMatches(item))
29-
}
25+
override suspend fun get(
26+
item: List<ConsumerRecord<K, V>>
27+
): RetryRecord<List<ConsumerRecord<K, V>>>? {
28+
return data.firstOrNull(recordMatches(item))
29+
}
3030

31-
override suspend fun insert(item: ConsumerRecord<K, V>, e: Throwable?) {
32-
data += RetryRecord(item, lastException = e?.localizedMessage.orEmpty())
33-
}
31+
override suspend fun insert(item: List<ConsumerRecord<K, V>>, e: Throwable?) {
32+
data += RetryRecord(item, lastException = e?.localizedMessage.orEmpty())
33+
}
3434

35-
override suspend fun update(item: ConsumerRecord<K, V>, e: Throwable?) {
36-
val record = get(item) ?: error("record not found")
35+
override suspend fun update(item: List<ConsumerRecord<K, V>>, e: Throwable?) {
36+
val record = get(item) ?: error("record not found")
3737

38-
// Find and update the record in the data set.
39-
data[data.indexOf(record)].also {
40-
it.data = item
41-
it.attempt++
42-
it.lastAttempted = OffsetDateTime.now()
43-
it.lastException = e?.localizedMessage.orEmpty()
38+
// Find and update the record in the data set.
39+
data[data.indexOf(record)].also {
40+
it.data = item
41+
it.attempt++
42+
it.lastAttempted = OffsetDateTime.now()
43+
it.lastException = e?.localizedMessage.orEmpty()
44+
}
4445
}
45-
}
4646

47-
override suspend fun remove(item: ConsumerRecord<K, V>) {
48-
data.removeAll(recordMatches(item))
49-
}
47+
override suspend fun remove(item: List<ConsumerRecord<K, V>>) {
48+
data.removeAll(recordMatches(item))
49+
}
5050

51-
private fun <K, V> recordMatches(other: ConsumerRecord<K, V>): (RetryRecord<ConsumerRecord<K, V>>) -> Boolean {
52-
return {
53-
with(it.data) {
54-
key() == other.key() &&
55-
value() == other.value() &&
56-
topic() == other.topic() &&
57-
partition() == other.partition()
58-
}
51+
private fun <K, V> recordMatches(
52+
other: List<ConsumerRecord<K, V>>
53+
): (RetryRecord<List<ConsumerRecord<K, V>>>) -> Boolean {
54+
return { it == other }
5955
}
6056
}
61-
}

0 commit comments

Comments
 (0)