File tree Expand file tree Collapse file tree 1 file changed +30
-0
lines changed
ft-coroutines-kafka/src/main/kotlin/tech/figure/kafka/context/extensions Expand file tree Collapse file tree 1 file changed +30
-0
lines changed Original file line number Diff line number Diff line change 1+ package tech.figure.kafka.context.extensions
2+
3+ import org.apache.kafka.clients.consumer.ConsumerRecord
4+ import org.apache.kafka.common.record.TimestampType
5+ import java.time.Instant
6+ import java.time.OffsetDateTime
7+ import java.time.ZoneId
8+ import java.time.ZoneOffset
9+
10+ /* *
11+ * All valid timestamp type values that can be produced by Kafka to indicate a proper record timestamp.
12+ */
13+ private val VALID_ODT_TIMESTAMP_TYPES : Set <TimestampType > = setOf (
14+ TimestampType .CREATE_TIME ,
15+ TimestampType .LOG_APPEND_TIME ,
16+ )
17+
18+ /* *
19+ * Fetches a valid OffsetDateTime representation for the Kafka record, with regard to the valid timestamp types in the
20+ * encountered data.
21+ */
22+ fun <K , V > ConsumerRecord <K , V >.timestampTz (
23+ zone : ZoneId = ZoneOffset .systemDefault(),
24+ ): OffsetDateTime = timestampType().let { timestampType ->
25+ if (timestampType in VALID_ODT_TIMESTAMP_TYPES ) {
26+ OffsetDateTime .ofInstant(Instant .ofEpochMilli(timestamp()), zone)
27+ } else {
28+ error(" Unexpected timestamp type [$timestampType ] in record at offset [${offset()} ]" )
29+ }
30+ }
You can’t perform that action at this time.
0 commit comments