Skip to content

Commit bd38fc7

Browse files
authored
Add (optional) collection identifier to records going into the store. (#39)
* Increment the necessary fields to retry for in memory store * Use named consts instead of magic numbers * Add collection as a better way to organize multiple retries in one repo
1 parent 51d0520 commit bd38fc7

File tree

3 files changed

+11
-4
lines changed

3 files changed

+11
-4
lines changed

ft-coroutines-kafka-retry/src/main/kotlin/tech/figure/kafka/coroutines/retry/store/InMemoryConsumerRecordStore.kt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,9 @@ fun <K, V> inMemoryConsumerRecordStore(
3838
// Find and update the record in the data set.
3939
data[data.indexOf(record)].also {
4040
it.data = item
41+
it.attempt++
42+
it.lastAttempted = OffsetDateTime.now()
43+
it.lastException = e?.localizedMessage.orEmpty()
4144
}
4245
}
4346

ft-coroutines-retry/src/main/kotlin/tech/figure/coroutines/retry/flow/ObjectStoreRetryFlow.kt

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,9 @@ import kotlinx.coroutines.flow.asFlow
66
import tech.figure.coroutines.retry.store.RetryRecord
77
import tech.figure.coroutines.retry.store.RetryRecordStore
88

9+
const val DEFAULT_RETRY_GROUP_SIZE = 40
10+
const val DEFAULT_RETRY_COLLECTION_NAME = ""
11+
912
/**
1013
* Retry a flow of objects using the backing [store].
1114
*
@@ -16,7 +19,7 @@ import tech.figure.coroutines.retry.store.RetryRecordStore
1619
fun <T> recordStoreFlowRetry(
1720
handler: suspend (T) -> Unit,
1821
store: RetryRecordStore<T>,
19-
groupSize: Int = 40,
22+
groupSize: Int = DEFAULT_RETRY_GROUP_SIZE,
2023
): FlowRetry<T> = RecordStoreFlowRetry(handler, store, groupSize)
2124

2225
/**
@@ -29,7 +32,7 @@ fun <T> recordStoreFlowRetry(
2932
internal open class RecordStoreFlowRetry<T>(
3033
private val handler: suspend (T) -> Unit,
3134
private val store: RetryRecordStore<T>,
32-
private val groupSize: Int = 40,
35+
private val groupSize: Int = DEFAULT_RETRY_GROUP_SIZE,
3336
) : FlowRetry<T> {
3437
override suspend fun produceNext(
3538
attemptRange: IntRange,

ft-coroutines-retry/src/main/kotlin/tech/figure/coroutines/retry/store/RetryRecord.kt

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,9 @@ open class RetryRecord<T>(
66
var data: T,
77
var attempt: Int = 0,
88
var lastAttempted: OffsetDateTime = OffsetDateTime.now(),
9-
var lastException: String = ""
9+
var lastException: String = "",
10+
var collection: String = "",
1011
) {
1112
override fun toString(): String =
12-
"RetryRecord(attempt:$attempt Exception:$lastException lastAttempted:$lastAttempted data:$data)"
13+
"RetryRecord(attempt:$attempt exception:$lastException lastAttempted:$lastAttempted data:$data collection:$collection)"
1314
}

0 commit comments

Comments
 (0)