Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,24 @@
import java.io.IOException;
import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.pinot.spi.stream.LongMsgOffset;
import org.apache.pinot.spi.stream.OffsetCriteria;
import org.apache.pinot.spi.stream.StreamConfig;
import org.apache.pinot.spi.stream.StreamMetadataProvider;
import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
import org.apache.pinot.spi.utils.TimeUtils;
import org.apache.pinot.spi.stream.TransientConsumerException;


public class KafkaStreamMetadataProvider extends KafkaPartitionLevelConnectionHandler
implements StreamMetadataProvider {

private static Map<TopicPartition, Long> _offsetsForTimes = new HashMap<>();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be final


public KafkaStreamMetadataProvider(String clientId, StreamConfig streamConfig) {
this(clientId, streamConfig, Integer.MIN_VALUE);
}
Expand Down Expand Up @@ -63,6 +69,28 @@ public StreamPartitionMsgOffset fetchStreamPartitionOffset(OffsetCriteria offset
offset =
_consumer.beginningOffsets(Collections.singletonList(_topicPartition), Duration.ofMillis(timeoutMillis))
.get(_topicPartition);
} else if (offsetCriteria.isPeriod()) {
String offsetString = offsetCriteria.getOffsetString();
Long periodToMillis = TimeUtils.convertPeriodToMillis(offsetString);
TopicPartition tp = new TopicPartition(_topic, _partition);
_offsetsForTimes.put(tp, System.currentTimeMillis() - periodToMillis);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use an injected java.time.Clock so the time can be controlled for testing purposes. Injecting Clock.systemClock() and calling Clock.millis() is equivalent to this logic, but allows you to inject a fixed clock for testing purposes.

if (_consumer.offsetsForTimes(_offsetsForTimes).get(tp) == null) {
offset = _consumer.endOffsets(Collections.singletonList(_topicPartition), Duration.ofMillis(timeoutMillis))
.get(_topicPartition);
} else {
offset = _consumer.offsetsForTimes(_offsetsForTimes).get(tp).offset();
}
} else if (offsetCriteria.isTimestamp()) {
String offsetString = offsetCriteria.getOffsetString();
Long timestampToMillis = TimeUtils.convertDateTimeStringToMillis(offsetString);
TopicPartition tp = new TopicPartition(_topic, _partition);
_offsetsForTimes.put(tp, timestampToMillis);
if (_consumer.offsetsForTimes(_offsetsForTimes).get(tp) == null) {
offset = _consumer.endOffsets(Collections.singletonList(_topicPartition), Duration.ofMillis(timeoutMillis))
.get(_topicPartition);
} else {
offset = _consumer.offsetsForTimes(_offsetsForTimes).get(tp).offset();
}
Comment on lines +72 to +93
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO this is a bit complicated for a rather simple logic. _consumer.offsetForTimes fetches offset for the given timestamp in millis. It can be called only once. If the offset criteria is timestamp, directly use its value. If it's period, subtract the period in millis from current time in millis and use this value as timestamp for offsetForTimes.

} else {
throw new IllegalArgumentException("Unknown initial offset value " + offsetCriteria);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,17 @@
import com.google.common.base.Preconditions;
import org.apache.pinot.spi.utils.EqualityUtils;
import org.apache.pinot.spi.utils.TimeUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


/**
* Defines and builds the offset criteria for consumption from a stream
*/
public class OffsetCriteria {

private static final Logger LOGGER = LoggerFactory.getLogger(OffsetCriteria.class);

public static final OffsetCriteria SMALLEST_OFFSET_CRITERIA =
new OffsetCriteria.OffsetCriteriaBuilder().withOffsetSmallest();

Expand All @@ -48,6 +52,9 @@ private enum OffsetType {
// Consumes from the time as provided in the period string
PERIOD,

// Consumes from the timestamp specified
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please specify the acceptable time format.

TIMESTAMP,

// Consumes from the custom offset criteria */
CUSTOM
}
Expand Down Expand Up @@ -91,6 +98,14 @@ public boolean isPeriod() {
return _offsetType != null && _offsetType.equals(OffsetType.PERIOD);
}

/**
* True if the offset criteria is defined as a timestamp format string
* @return
*/
public boolean isTimestamp() {
return _offsetType != null && _offsetType.equals(OffsetType.TIMESTAMP);
}

/**
* True if the offset criteria is defined as a custom format string
* @return
Expand Down Expand Up @@ -168,24 +183,30 @@ public OffsetCriteria withOffsetCustom(String customString) {
*/
public OffsetCriteria withOffsetString(String offsetString) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be consistent with other types, could you add method withOffsetAsTimestamp?

Preconditions.checkNotNull(offsetString, "Must provide offset string");
_offsetCriteria.setOffsetString(offsetString);

if (offsetString.equalsIgnoreCase(OffsetType.SMALLEST.toString())) {
_offsetCriteria.setOffsetType(OffsetType.SMALLEST);
} else if (offsetString.equalsIgnoreCase(OffsetType.LARGEST.toString())) {
_offsetCriteria.setOffsetType(OffsetType.LARGEST);
} else {
try {
Long periodToMillis = TimeUtils.convertPeriodToMillis(offsetString);
if (periodToMillis >= 0) {
_offsetCriteria.setOffsetType(OffsetType.PERIOD);
Long periodToMillis = TimeUtils.convertPeriodToMillis(offsetString);
if (periodToMillis >= 0) {
_offsetCriteria.setOffsetType(OffsetType.PERIOD);
return _offsetCriteria;
} else {
LOGGER.error("Invalid time spec: '" + offsetString + "' (Valid examples: '3h', '4h30m')");
Long timestampToMillis = TimeUtils.convertDateTimeStringToMillis(offsetString);
Comment on lines +193 to +199
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is problematic. If offsetString is not of type period, convertPeriodToMillis throws parsing exception and it doesn't get to the else part.

if (timestampToMillis >= 0) {
_offsetCriteria.setOffsetType(OffsetType.TIMESTAMP);
return _offsetCriteria;
} else {
_offsetCriteria.setOffsetType(OffsetType.CUSTOM);
LOGGER.error("Invalid time spec: '" + offsetString + "' Valid timestamp format: "
+ TimeUtils.DATE_TIME_FORMAT);
}
} catch (Exception e) {
_offsetCriteria.setOffsetType(OffsetType.CUSTOM);
}
_offsetCriteria.setOffsetType(OffsetType.CUSTOM);
}
_offsetCriteria.setOffsetString(offsetString);
return _offsetCriteria;
}
}
Expand Down
24 changes: 22 additions & 2 deletions pinot-spi/src/main/java/org/apache/pinot/spi/utils/TimeUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
*/
package org.apache.pinot.spi.utils;

import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.joda.time.DateTime;
Expand Down Expand Up @@ -140,6 +143,8 @@ public static long getValidMaxTimeMillis() {
return VALID_MAX_TIME_MILLIS;
}

private static final Long ERROR_FLAG = -1L;

private static final PeriodFormatter PERIOD_FORMATTER =
new PeriodFormatterBuilder().appendDays().appendSuffix("d").appendHours().appendSuffix("h").appendMinutes()
.appendSuffix("m").appendSeconds().appendSuffix("s").toFormatter();
Expand All @@ -160,8 +165,7 @@ public static Long convertPeriodToMillis(String timeStr) {
Period p = PERIOD_FORMATTER.parsePeriod(timeStr);
millis = p.toStandardDuration().getStandardSeconds() * 1000L;
} catch (IllegalArgumentException e) {
// rethrowing with more contextual information
throw new IllegalArgumentException("Invalid time spec '" + timeStr + "' (Valid examples: '3h', '4h30m')", e);
return ERROR_FLAG;
}
}
return millis;
Expand Down Expand Up @@ -194,4 +198,20 @@ public static boolean isPeriodValid(String timeStr) {
return false;
}
}

public static final String DATE_TIME_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSSZ";

public static Long convertDateTimeStringToMillis(String timeStr) {
Long millis = 0L;
SimpleDateFormat f = new SimpleDateFormat(DATE_TIME_FORMAT);
if (timeStr != null) {
try {
Date d = f.parse(timeStr);
millis = d.getTime();
} catch (ParseException e) {
return ERROR_FLAG;
}
}
return millis;
Comment on lines +206 to +215
Copy link
Member

@richardstartin richardstartin Jan 24, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use java.time instead - you can replace this with

return timeStr == null ? 0L : Instant.parse(timeStr).toEpochMilli();

This handles ISO 8601 timestamps automatically.

(obviously you still need to handle parse errors)

}
}