Skip to content

#7250 Kafka consumption from a specific timestamp#8057

Draft
kriti-sc wants to merge 4 commits intoapache:masterfrom
kriti-sc:#7250-kafka-consumption-from-timestamp
Draft

#7250 Kafka consumption from a specific timestamp#8057
kriti-sc wants to merge 4 commits intoapache:masterfrom
kriti-sc:#7250-kafka-consumption-from-timestamp

Conversation

@kriti-sc
Copy link
Contributor

@kriti-sc kriti-sc commented Jan 22, 2022

Fixes #7250

@richardstartin
Copy link
Member

hi @kriti-sc could you cover this with some tests please? I would want to see both:

  • Unit tests at the Kafka consumer level e.g. the correct offsets are produced from a given timestamp, mocking the Consumer
  • An integration test involving LLRealtimeSegmentDataManager with TIMESTAMP offset type.

I recently had to fix a bug where realtime ingestion stalled indefinitely, which was caused by a very simple change to Kafka stream consumption, so going forwards I would like to see proof that LLRealtimeSegmentDataManager#consumeLoop can advance whenever a newly introduced branch/setting is active.

String offsetString = offsetCriteria.getOffsetString();
Long periodToMillis = TimeUtils.convertPeriodToMillis(offsetString);
TopicPartition tp = new TopicPartition(_topic, _partition);
_offsetsForTimes.put(tp, System.currentTimeMillis() - periodToMillis);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use an injected java.time.Clock so the time can be controlled for testing purposes. Injecting Clock.systemClock() and calling Clock.millis() is equivalent to this logic, but allows you to inject a fixed clock for testing purposes.

Comment on lines +206 to +215
SimpleDateFormat f = new SimpleDateFormat(DATE_TIME_FORMAT);
if (timeStr != null) {
try {
Date d = f.parse(timeStr);
millis = d.getTime();
} catch (ParseException e) {
return ERROR_FLAG;
}
}
return millis;
Copy link
Member

@richardstartin richardstartin Jan 24, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use java.time instead - you can replace this with

return timeStr == null ? 0L : Instant.parse(timeStr).toEpochMilli();

This handles ISO 8601 timestamps automatically.

(obviously you still need to handle parse errors)

public class KafkaStreamMetadataProvider extends KafkaPartitionLevelConnectionHandler
implements StreamMetadataProvider {

private static Map<TopicPartition, Long> _offsetsForTimes = new HashMap<>();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be final

@kriti-sc kriti-sc marked this pull request as draft January 24, 2022 20:47
Copy link
Contributor

@sajjad-moradi sajjad-moradi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add unit tests to verify your logic.

Comment on lines +72 to +93
} else if (offsetCriteria.isPeriod()) {
String offsetString = offsetCriteria.getOffsetString();
Long periodToMillis = TimeUtils.convertPeriodToMillis(offsetString);
TopicPartition tp = new TopicPartition(_topic, _partition);
_offsetsForTimes.put(tp, System.currentTimeMillis() - periodToMillis);
if (_consumer.offsetsForTimes(_offsetsForTimes).get(tp) == null) {
offset = _consumer.endOffsets(Collections.singletonList(_topicPartition), Duration.ofMillis(timeoutMillis))
.get(_topicPartition);
} else {
offset = _consumer.offsetsForTimes(_offsetsForTimes).get(tp).offset();
}
} else if (offsetCriteria.isTimestamp()) {
String offsetString = offsetCriteria.getOffsetString();
Long timestampToMillis = TimeUtils.convertDateTimeStringToMillis(offsetString);
TopicPartition tp = new TopicPartition(_topic, _partition);
_offsetsForTimes.put(tp, timestampToMillis);
if (_consumer.offsetsForTimes(_offsetsForTimes).get(tp) == null) {
offset = _consumer.endOffsets(Collections.singletonList(_topicPartition), Duration.ofMillis(timeoutMillis))
.get(_topicPartition);
} else {
offset = _consumer.offsetsForTimes(_offsetsForTimes).get(tp).offset();
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO this is a bit complicated for a rather simple logic. _consumer.offsetForTimes fetches offset for the given timestamp in millis. It can be called only once. If the offset criteria is timestamp, directly use its value. If it's period, subtract the period in millis from current time in millis and use this value as timestamp for offsetForTimes.

Comment on lines +193 to +199
Long periodToMillis = TimeUtils.convertPeriodToMillis(offsetString);
if (periodToMillis >= 0) {
_offsetCriteria.setOffsetType(OffsetType.PERIOD);
return _offsetCriteria;
} else {
LOGGER.error("Invalid time spec: '" + offsetString + "' (Valid examples: '3h', '4h30m')");
Long timestampToMillis = TimeUtils.convertDateTimeStringToMillis(offsetString);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is problematic. If offsetString is not of type period, convertPeriodToMillis throws parsing exception and it doesn't get to the else part.

// Consumes from the time as provided in the period string
PERIOD,

// Consumes from the timestamp specified
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please specify the acceptable time format.

@@ -168,24 +183,30 @@ public OffsetCriteria withOffsetCustom(String customString) {
*/
public OffsetCriteria withOffsetString(String offsetString) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be consistent with other types, could you add method withOffsetAsTimestamp?

@kriti-sc
Copy link
Contributor Author

Just noting that I have taken note of the review comments and am working on them. I have just gotten absurdly busy, hence the delay surrounding this and this note.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Configure Kafka consumption from a specific timestamp

3 participants