-
Notifications
You must be signed in to change notification settings - Fork 1.5k
#7250 Kafka consumption from a specific timestamp #8057
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -22,18 +22,24 @@ | |
| import java.io.IOException; | ||
| import java.time.Duration; | ||
| import java.util.Collections; | ||
| import java.util.HashMap; | ||
| import java.util.Map; | ||
| import org.apache.kafka.common.TopicPartition; | ||
| import org.apache.kafka.common.errors.TimeoutException; | ||
| import org.apache.pinot.spi.stream.LongMsgOffset; | ||
| import org.apache.pinot.spi.stream.OffsetCriteria; | ||
| import org.apache.pinot.spi.stream.StreamConfig; | ||
| import org.apache.pinot.spi.stream.StreamMetadataProvider; | ||
| import org.apache.pinot.spi.stream.StreamPartitionMsgOffset; | ||
| import org.apache.pinot.spi.utils.TimeUtils; | ||
| import org.apache.pinot.spi.stream.TransientConsumerException; | ||
|
|
||
|
|
||
| public class KafkaStreamMetadataProvider extends KafkaPartitionLevelConnectionHandler | ||
| implements StreamMetadataProvider { | ||
|
|
||
| private static Map<TopicPartition, Long> _offsetsForTimes = new HashMap<>(); | ||
|
|
||
| public KafkaStreamMetadataProvider(String clientId, StreamConfig streamConfig) { | ||
| this(clientId, streamConfig, Integer.MIN_VALUE); | ||
| } | ||
|
|
@@ -63,6 +69,28 @@ public StreamPartitionMsgOffset fetchStreamPartitionOffset(OffsetCriteria offset | |
| offset = | ||
| _consumer.beginningOffsets(Collections.singletonList(_topicPartition), Duration.ofMillis(timeoutMillis)) | ||
| .get(_topicPartition); | ||
| } else if (offsetCriteria.isPeriod()) { | ||
| String offsetString = offsetCriteria.getOffsetString(); | ||
| Long periodToMillis = TimeUtils.convertPeriodToMillis(offsetString); | ||
| TopicPartition tp = new TopicPartition(_topic, _partition); | ||
| _offsetsForTimes.put(tp, System.currentTimeMillis() - periodToMillis); | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please use an injected |
||
| if (_consumer.offsetsForTimes(_offsetsForTimes).get(tp) == null) { | ||
| offset = _consumer.endOffsets(Collections.singletonList(_topicPartition), Duration.ofMillis(timeoutMillis)) | ||
| .get(_topicPartition); | ||
| } else { | ||
| offset = _consumer.offsetsForTimes(_offsetsForTimes).get(tp).offset(); | ||
| } | ||
| } else if (offsetCriteria.isTimestamp()) { | ||
| String offsetString = offsetCriteria.getOffsetString(); | ||
| Long timestampToMillis = TimeUtils.convertDateTimeStringToMillis(offsetString); | ||
| TopicPartition tp = new TopicPartition(_topic, _partition); | ||
| _offsetsForTimes.put(tp, timestampToMillis); | ||
| if (_consumer.offsetsForTimes(_offsetsForTimes).get(tp) == null) { | ||
| offset = _consumer.endOffsets(Collections.singletonList(_topicPartition), Duration.ofMillis(timeoutMillis)) | ||
| .get(_topicPartition); | ||
| } else { | ||
| offset = _consumer.offsetsForTimes(_offsetsForTimes).get(tp).offset(); | ||
| } | ||
|
Comment on lines
+72
to
+93
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. IMO this is a bit complicated for a rather simple logic. |
||
| } else { | ||
| throw new IllegalArgumentException("Unknown initial offset value " + offsetCriteria); | ||
| } | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -21,13 +21,17 @@ | |
| import com.google.common.base.Preconditions; | ||
| import org.apache.pinot.spi.utils.EqualityUtils; | ||
| import org.apache.pinot.spi.utils.TimeUtils; | ||
| import org.slf4j.Logger; | ||
| import org.slf4j.LoggerFactory; | ||
|
|
||
|
|
||
| /** | ||
| * Defines and builds the offset criteria for consumption from a stream | ||
| */ | ||
| public class OffsetCriteria { | ||
|
|
||
| private static final Logger LOGGER = LoggerFactory.getLogger(OffsetCriteria.class); | ||
|
|
||
| public static final OffsetCriteria SMALLEST_OFFSET_CRITERIA = | ||
| new OffsetCriteria.OffsetCriteriaBuilder().withOffsetSmallest(); | ||
|
|
||
|
|
@@ -48,6 +52,9 @@ private enum OffsetType { | |
| // Consumes from the time as provided in the period string | ||
| PERIOD, | ||
|
|
||
| // Consumes from the timestamp specified | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please specify the acceptable time format. |
||
| TIMESTAMP, | ||
|
|
||
| // Consumes from the custom offset criteria */ | ||
| CUSTOM | ||
| } | ||
|
|
@@ -91,6 +98,14 @@ public boolean isPeriod() { | |
| return _offsetType != null && _offsetType.equals(OffsetType.PERIOD); | ||
| } | ||
|
|
||
| /** | ||
| * True if the offset criteria is defined as a timestamp format string | ||
| * @return | ||
| */ | ||
| public boolean isTimestamp() { | ||
| return _offsetType != null && _offsetType.equals(OffsetType.TIMESTAMP); | ||
| } | ||
|
|
||
| /** | ||
| * True if the offset criteria is defined as a custom format string | ||
| * @return | ||
|
|
@@ -168,24 +183,30 @@ public OffsetCriteria withOffsetCustom(String customString) { | |
| */ | ||
| public OffsetCriteria withOffsetString(String offsetString) { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. To be consistent with other types, could you add method |
||
| Preconditions.checkNotNull(offsetString, "Must provide offset string"); | ||
| _offsetCriteria.setOffsetString(offsetString); | ||
|
|
||
| if (offsetString.equalsIgnoreCase(OffsetType.SMALLEST.toString())) { | ||
| _offsetCriteria.setOffsetType(OffsetType.SMALLEST); | ||
| } else if (offsetString.equalsIgnoreCase(OffsetType.LARGEST.toString())) { | ||
| _offsetCriteria.setOffsetType(OffsetType.LARGEST); | ||
| } else { | ||
| try { | ||
| Long periodToMillis = TimeUtils.convertPeriodToMillis(offsetString); | ||
| if (periodToMillis >= 0) { | ||
| _offsetCriteria.setOffsetType(OffsetType.PERIOD); | ||
| Long periodToMillis = TimeUtils.convertPeriodToMillis(offsetString); | ||
| if (periodToMillis >= 0) { | ||
| _offsetCriteria.setOffsetType(OffsetType.PERIOD); | ||
| return _offsetCriteria; | ||
| } else { | ||
| LOGGER.error("Invalid time spec: '" + offsetString + "' (Valid examples: '3h', '4h30m')"); | ||
| Long timestampToMillis = TimeUtils.convertDateTimeStringToMillis(offsetString); | ||
|
Comment on lines
+193
to
+199
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is problematic. If offsetString is not of type period, convertPeriodToMillis throws parsing exception and it doesn't get to the else part. |
||
| if (timestampToMillis >= 0) { | ||
| _offsetCriteria.setOffsetType(OffsetType.TIMESTAMP); | ||
| return _offsetCriteria; | ||
| } else { | ||
| _offsetCriteria.setOffsetType(OffsetType.CUSTOM); | ||
| LOGGER.error("Invalid time spec: '" + offsetString + "' Valid timestamp format: " | ||
| + TimeUtils.DATE_TIME_FORMAT); | ||
| } | ||
| } catch (Exception e) { | ||
| _offsetCriteria.setOffsetType(OffsetType.CUSTOM); | ||
| } | ||
| _offsetCriteria.setOffsetType(OffsetType.CUSTOM); | ||
| } | ||
| _offsetCriteria.setOffsetString(offsetString); | ||
| return _offsetCriteria; | ||
| } | ||
| } | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -18,6 +18,9 @@ | |
| */ | ||
| package org.apache.pinot.spi.utils; | ||
|
|
||
| import java.text.ParseException; | ||
| import java.text.SimpleDateFormat; | ||
| import java.util.Date; | ||
| import java.util.concurrent.TimeUnit; | ||
| import javax.annotation.Nullable; | ||
| import org.joda.time.DateTime; | ||
|
|
@@ -140,6 +143,8 @@ public static long getValidMaxTimeMillis() { | |
| return VALID_MAX_TIME_MILLIS; | ||
| } | ||
|
|
||
| private static final Long ERROR_FLAG = -1L; | ||
|
|
||
| private static final PeriodFormatter PERIOD_FORMATTER = | ||
| new PeriodFormatterBuilder().appendDays().appendSuffix("d").appendHours().appendSuffix("h").appendMinutes() | ||
| .appendSuffix("m").appendSeconds().appendSuffix("s").toFormatter(); | ||
|
|
@@ -160,8 +165,7 @@ public static Long convertPeriodToMillis(String timeStr) { | |
| Period p = PERIOD_FORMATTER.parsePeriod(timeStr); | ||
| millis = p.toStandardDuration().getStandardSeconds() * 1000L; | ||
| } catch (IllegalArgumentException e) { | ||
| // rethrowing with more contextual information | ||
| throw new IllegalArgumentException("Invalid time spec '" + timeStr + "' (Valid examples: '3h', '4h30m')", e); | ||
| return ERROR_FLAG; | ||
| } | ||
| } | ||
| return millis; | ||
|
|
@@ -194,4 +198,20 @@ public static boolean isPeriodValid(String timeStr) { | |
| return false; | ||
| } | ||
| } | ||
|
|
||
| public static final String DATE_TIME_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSSZ"; | ||
|
|
||
| public static Long convertDateTimeStringToMillis(String timeStr) { | ||
richardstartin marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| Long millis = 0L; | ||
| SimpleDateFormat f = new SimpleDateFormat(DATE_TIME_FORMAT); | ||
| if (timeStr != null) { | ||
| try { | ||
| Date d = f.parse(timeStr); | ||
| millis = d.getTime(); | ||
| } catch (ParseException e) { | ||
| return ERROR_FLAG; | ||
| } | ||
| } | ||
| return millis; | ||
|
Comment on lines
+206
to
+215
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please use return timeStr == null ? 0L : Instant.parse(timeStr).toEpochMilli();This handles ISO 8601 timestamps automatically. (obviously you still need to handle parse errors) |
||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be final