Skip to content

Commit 9d3a38d

Browse files
authored
Retry bugfixes (#35)
* Update retry store to separate out update and add and remove mutator. Leave the retry attempt increment up to the flow processor.
1 parent 80953cd commit 9d3a38d

File tree

5 files changed

+81
-40
lines changed

5 files changed

+81
-40
lines changed

ft-coroutines-kafka-retry/src/main/kotlin/tech/figure/kafka/coroutines/retry/flow/KafkaFlowRetry.kt

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -41,11 +41,7 @@ open class KafkaFlowRetry<K, V>(
4141
e: Throwable
4242
) {
4343
log.debug { "adding record to retry queue key:${item.key()} source:${item.topic()}-${item.partition()}" }
44-
store.putOne(item, e) {
45-
attempt = 0
46-
lastAttempted = OffsetDateTime.now()
47-
lastException = e.localizedMessage
48-
}
44+
store.insert(item, e)
4945
}
5046

5147
override suspend fun onSuccess(
@@ -57,11 +53,7 @@ open class KafkaFlowRetry<K, V>(
5753

5854
override suspend fun onFailure(item: RetryRecord<ConsumerRecord<K, V>>, e: Throwable) {
5955
log.debug { "failed reprocess attempt:${item.attempt} Error: ${item.lastException} key:${item.data.key()} source:${item.data.topic()}-${item.data.partition()}" }
60-
store.putOne(item.data, e) {
61-
attempt.inc()
62-
lastAttempted = OffsetDateTime.now()
63-
lastException = e.localizedMessage
64-
}
56+
store.update(item.data, e)
6557
}
6658

6759
override suspend fun process(

ft-coroutines-kafka-retry/src/main/kotlin/tech/figure/kafka/coroutines/retry/store/InMemoryConsumerRecordStore.kt

Lines changed: 12 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,13 @@
11
package tech.figure.kafka.coroutines.retry.store
22

3-
import tech.figure.coroutines.retry.store.RetryRecord
4-
import tech.figure.coroutines.retry.store.RetryRecordStore
53
import java.time.OffsetDateTime
6-
import mu.KotlinLogging
74
import org.apache.kafka.clients.consumer.ConsumerRecord
5+
import tech.figure.coroutines.retry.store.RetryRecord
6+
import tech.figure.coroutines.retry.store.RetryRecordStore
87

98
fun <K, V> inMemoryConsumerRecordStore(
109
data: MutableList<RetryRecord<ConsumerRecord<K, V>>> = mutableListOf()
1110
) = object : RetryRecordStore<ConsumerRecord<K, V>> {
12-
val log = KotlinLogging.logger {}
13-
1411
override suspend fun isEmpty(): Boolean =
1512
data.isEmpty()
1613

@@ -25,27 +22,22 @@ fun <K, V> inMemoryConsumerRecordStore(
2522
.take(limit)
2623
}
2724

28-
override suspend fun getOne(
25+
override suspend fun get(
2926
item: ConsumerRecord<K, V>
3027
): RetryRecord<ConsumerRecord<K, V>>? {
3128
return data.firstOrNull(recordMatches(item))
3229
}
3330

34-
override suspend fun putOne(
35-
item: ConsumerRecord<K, V>,
36-
lastException: Throwable?,
37-
mutator: RetryRecord<ConsumerRecord<K, V>>.() -> Unit
38-
) {
39-
val record = getOne(item)
40-
if (record == null) {
41-
data += RetryRecord(item, 0, OffsetDateTime.now(), lastException?.message.orEmpty()).also {
42-
log.debug { "putting new entry for ${item.key()}" }
43-
}
44-
return
45-
}
31+
override suspend fun insert(item: ConsumerRecord<K, V>, e: Throwable?) {
32+
data += RetryRecord(item, lastException = e?.localizedMessage.orEmpty())
33+
}
34+
35+
override suspend fun update(item: ConsumerRecord<K, V>, e: Throwable?) {
36+
val record = get(item) ?: error("record not found")
4637

47-
data[data.indexOf(record)].mutator().also {
48-
log.debug { "incrementing attempt for $record" }
38+
// Find and update the record in the data set.
39+
data[data.indexOf(record)].also {
40+
it.data = item
4941
}
5042
}
5143

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
package tech.figure.coroutines.retry.flow
2+
3+
import java.time.OffsetDateTime
4+
import kotlinx.coroutines.flow.Flow
5+
import kotlinx.coroutines.flow.asFlow
6+
import tech.figure.coroutines.retry.store.RetryRecord
7+
import tech.figure.coroutines.retry.store.RetryRecordStore
8+
9+
/**
10+
* Retry a flow of objects using the backing [store].
11+
*
12+
* @param handler The handler to reprocess with.
13+
* @param store [RetryRecordStore] to save and retrieve data object of type [T] from.
14+
* @param groupSize Process a max of this many elements each poll loop (aka: limit).
15+
*/
16+
fun <T> recordStoreFlowRetry(
17+
handler: suspend (T) -> Unit,
18+
store: RetryRecordStore<T>,
19+
groupSize: Int = 40,
20+
): FlowRetry<T> = RecordStoreFlowRetry(handler, store, groupSize)
21+
22+
/**
23+
* Retry a flow of objects using the backing [store].
24+
*
25+
* @param handler The handler to reprocess with.
26+
* @param store [RetryRecordStore] to save and retrieve data object of type [T] from.
27+
* @param groupSize Process a max of this many elements each poll loop (aka: limit).
28+
*/
29+
internal open class RecordStoreFlowRetry<T>(
30+
private val handler: suspend (T) -> Unit,
31+
private val store: RetryRecordStore<T>,
32+
private val groupSize: Int = 40,
33+
) : FlowRetry<T> {
34+
override suspend fun produceNext(
35+
attemptRange: IntRange,
36+
olderThan: OffsetDateTime,
37+
limit: Int
38+
): Flow<RetryRecord<T>> =
39+
store
40+
.select(attemptRange, olderThan, limit)
41+
.sortedByDescending { it.lastAttempted }
42+
.take(groupSize)
43+
.asFlow()
44+
45+
override suspend fun send(item: T, e: Throwable) =
46+
store.update(item, e)
47+
48+
override suspend fun onSuccess(item: RetryRecord<T>) =
49+
store.remove(item.data)
50+
51+
override suspend fun onFailure(item: RetryRecord<T>, e: Throwable) =
52+
store.update(item.data, e)
53+
54+
override suspend fun process(item: T, attempt: Int) =
55+
handler(item)
56+
57+
override suspend fun hasNext() =
58+
!store.isEmpty()
59+
}

ft-coroutines-retry/src/main/kotlin/tech/figure/coroutines/retry/flow/RetryFlow.kt

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ import kotlin.time.toJavaDuration
77
import kotlinx.coroutines.ExperimentalCoroutinesApi
88
import kotlinx.coroutines.flow.Flow
99
import kotlinx.coroutines.flow.collect
10-
import kotlinx.coroutines.flow.map
1110
import kotlinx.coroutines.flow.onStart
1211
import mu.KotlinLogging
1312
import tech.figure.coroutines.retry.RetryStrategy
@@ -37,28 +36,26 @@ fun <T> retryFlow(
3736
val log = KotlinLogging.logger {}
3837
val strategies = retryStrategies.invert()
3938

39+
log.info { "initializing polling retry flow ${flowRetry.javaClass.name}" }
4040
return pollingFlow(retryInterval) {
4141
if (!flowRetry.hasNext()) {
4242
return@pollingFlow
4343
}
4444

4545
for (strategy in strategies) {
46-
val lastAttempted = OffsetDateTime.now().minus(strategy.value.lastAttempted.toJavaDuration())
46+
val attemptedBefore = OffsetDateTime.now().minus(strategy.value.lastAttempted.toJavaDuration())
4747

4848
val onFailure: suspend (RetryRecord<T>, Throwable) -> Unit = { rec, it ->
4949
strategy.value.onFailure("", it)
5050
flowRetry.onFailure(rec, it)
5151
}
5252

53-
flowRetry.produceNext(strategy.key, lastAttempted, batchSize)
53+
flowRetry.produceNext(strategy.key, attemptedBefore, batchSize)
5454
.onStart {
55-
log.trace { "${strategy.value.name} --> Retrying records in group:${strategy.key} lastAttempted:$lastAttempted" }
56-
}
57-
.map {
58-
it.attempt = it.attempt.inc()
59-
it
55+
log.trace { "${strategy.value.name} --> Retrying records in group:${strategy.key} lastAttempted:$attemptedBefore" }
6056
}
6157
.tryMap(onFailure) {
58+
log.debug { "retry processing attempt:${it.attempt} rec:${it.data}" }
6259
flowRetry.process(it.data, it.attempt)
6360

6461
log.debug { "retry succeeded on attempt:${it.attempt} rec:${it.data}" }

ft-coroutines-retry/src/main/kotlin/tech/figure/coroutines/retry/store/RetryRecordStore.kt

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,8 @@ import java.time.OffsetDateTime
66
interface RetryRecordStore<T> {
77
suspend fun isEmpty(): Boolean
88
suspend fun select(attemptRange: IntRange, lastAttempted: OffsetDateTime, limit: Int = DEFAULT_FETCH_LIMIT): List<RetryRecord<T>>
9-
suspend fun getOne(item: T): RetryRecord<T>?
10-
suspend fun putOne(item: T, lastException: Throwable? = null, mutator: RetryRecord<T>.() -> Unit)
9+
suspend fun get(item: T): RetryRecord<T>?
10+
suspend fun insert(item: T, e: Throwable? = null)
11+
suspend fun update(item: T, e: Throwable? = null)
1112
suspend fun remove(item: T)
1213
}

0 commit comments

Comments
 (0)