#7250 Kafka consumption from a specific timestamp#8057
#7250 Kafka consumption from a specific timestamp#8057kriti-sc wants to merge 4 commits intoapache:masterfrom
Conversation
|
hi @kriti-sc could you cover this with some tests please? I would want to see both:
I recently had to fix a bug where realtime ingestion stalled indefinitely, which was caused by a very simple change to Kafka stream consumption, so going forwards I would like to see proof that |
| String offsetString = offsetCriteria.getOffsetString(); | ||
| Long periodToMillis = TimeUtils.convertPeriodToMillis(offsetString); | ||
| TopicPartition tp = new TopicPartition(_topic, _partition); | ||
| _offsetsForTimes.put(tp, System.currentTimeMillis() - periodToMillis); |
There was a problem hiding this comment.
Please use an injected java.time.Clock so the time can be controlled for testing purposes. Injecting Clock.systemClock() and calling Clock.millis() is equivalent to this logic, but allows you to inject a fixed clock for testing purposes.
| SimpleDateFormat f = new SimpleDateFormat(DATE_TIME_FORMAT); | ||
| if (timeStr != null) { | ||
| try { | ||
| Date d = f.parse(timeStr); | ||
| millis = d.getTime(); | ||
| } catch (ParseException e) { | ||
| return ERROR_FLAG; | ||
| } | ||
| } | ||
| return millis; |
There was a problem hiding this comment.
Please use java.time instead - you can replace this with
return timeStr == null ? 0L : Instant.parse(timeStr).toEpochMilli();This handles ISO 8601 timestamps automatically.
(obviously you still need to handle parse errors)
| public class KafkaStreamMetadataProvider extends KafkaPartitionLevelConnectionHandler | ||
| implements StreamMetadataProvider { | ||
|
|
||
| private static Map<TopicPartition, Long> _offsetsForTimes = new HashMap<>(); |
sajjad-moradi
left a comment
There was a problem hiding this comment.
Please add unit tests to verify your logic.
| } else if (offsetCriteria.isPeriod()) { | ||
| String offsetString = offsetCriteria.getOffsetString(); | ||
| Long periodToMillis = TimeUtils.convertPeriodToMillis(offsetString); | ||
| TopicPartition tp = new TopicPartition(_topic, _partition); | ||
| _offsetsForTimes.put(tp, System.currentTimeMillis() - periodToMillis); | ||
| if (_consumer.offsetsForTimes(_offsetsForTimes).get(tp) == null) { | ||
| offset = _consumer.endOffsets(Collections.singletonList(_topicPartition), Duration.ofMillis(timeoutMillis)) | ||
| .get(_topicPartition); | ||
| } else { | ||
| offset = _consumer.offsetsForTimes(_offsetsForTimes).get(tp).offset(); | ||
| } | ||
| } else if (offsetCriteria.isTimestamp()) { | ||
| String offsetString = offsetCriteria.getOffsetString(); | ||
| Long timestampToMillis = TimeUtils.convertDateTimeStringToMillis(offsetString); | ||
| TopicPartition tp = new TopicPartition(_topic, _partition); | ||
| _offsetsForTimes.put(tp, timestampToMillis); | ||
| if (_consumer.offsetsForTimes(_offsetsForTimes).get(tp) == null) { | ||
| offset = _consumer.endOffsets(Collections.singletonList(_topicPartition), Duration.ofMillis(timeoutMillis)) | ||
| .get(_topicPartition); | ||
| } else { | ||
| offset = _consumer.offsetsForTimes(_offsetsForTimes).get(tp).offset(); | ||
| } |
There was a problem hiding this comment.
IMO this is a bit complicated for a rather simple logic. _consumer.offsetForTimes fetches offset for the given timestamp in millis. It can be called only once. If the offset criteria is timestamp, directly use its value. If it's period, subtract the period in millis from current time in millis and use this value as timestamp for offsetForTimes.
| Long periodToMillis = TimeUtils.convertPeriodToMillis(offsetString); | ||
| if (periodToMillis >= 0) { | ||
| _offsetCriteria.setOffsetType(OffsetType.PERIOD); | ||
| return _offsetCriteria; | ||
| } else { | ||
| LOGGER.error("Invalid time spec: '" + offsetString + "' (Valid examples: '3h', '4h30m')"); | ||
| Long timestampToMillis = TimeUtils.convertDateTimeStringToMillis(offsetString); |
There was a problem hiding this comment.
This is problematic. If offsetString is not of type period, convertPeriodToMillis throws parsing exception and it doesn't get to the else part.
| // Consumes from the time as provided in the period string | ||
| PERIOD, | ||
|
|
||
| // Consumes from the timestamp specified |
There was a problem hiding this comment.
Please specify the acceptable time format.
| @@ -168,24 +183,30 @@ public OffsetCriteria withOffsetCustom(String customString) { | |||
| */ | |||
| public OffsetCriteria withOffsetString(String offsetString) { | |||
There was a problem hiding this comment.
To be consistent with other types, could you add method withOffsetAsTimestamp?
|
Just noting that I have taken note of the review comments and am working on them. I have just gotten absurdly busy, hence the delay surrounding this and this note. |
Fixes #7250