Skip to content

Commit

Permalink
NIFI-5267 Kafka support to replay by timestamp
Browse files Browse the repository at this point in the history
This closes #3372.

Signed-off-by: Bryan Bende <bbende@apache.org>
  • Loading branch information
Sandish Kumar authored and bbende committed Mar 15, 2019
1 parent 82e2c97 commit 45ebeba
Show file tree
Hide file tree
Showing 14 changed files with 26 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
@WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_KEY, description = "The key of message if present and if single message. "
+ "How the key is encoded depends on the value of the 'Key Attribute Encoding' property."),
@WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_OFFSET, description = "The offset of the message in the partition of the topic."),
@WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_TIMESTAMP, description = "The timestamp of the message in the partition of the topic."),
@WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_PARTITION, description = "The partition of the topic the message or message bundle is from"),
@WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_TOPIC, description = "The topic the message or message bundle is from")
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -442,6 +442,7 @@ private void writeRecordData(final ProcessSession session, final List<ConsumerRe
// And continue to the next message.
final Map<String, String> attributes = new HashMap<>();
attributes.put(KafkaProcessorUtils.KAFKA_OFFSET, String.valueOf(consumerRecord.offset()));
attributes.put(KafkaProcessorUtils.KAFKA_TIMESTAMP, String.valueOf(consumerRecord.timestamp()));
attributes.put(KafkaProcessorUtils.KAFKA_PARTITION, String.valueOf(topicPartition.partition()));
attributes.put(KafkaProcessorUtils.KAFKA_TOPIC, topicPartition.topic());

Expand Down Expand Up @@ -566,6 +567,7 @@ private void rollback(final TopicPartition topicPartition) {
private void populateAttributes(final BundleTracker tracker) {
final Map<String, String> kafkaAttrs = new HashMap<>();
kafkaAttrs.put(KafkaProcessorUtils.KAFKA_OFFSET, String.valueOf(tracker.initialOffset));
kafkaAttrs.put(KafkaProcessorUtils.KAFKA_TIMESTAMP, String.valueOf(tracker.initialTimestamp));
if (tracker.key != null && tracker.totalRecords == 1) {
kafkaAttrs.put(KafkaProcessorUtils.KAFKA_KEY, tracker.key);
}
Expand All @@ -590,6 +592,7 @@ private void populateAttributes(final BundleTracker tracker) {
private static class BundleTracker {

final long initialOffset;
final long initialTimestamp;
final int partition;
final String topic;
final String key;
Expand All @@ -603,6 +606,7 @@ private BundleTracker(final ConsumerRecord<byte[], byte[]> initialRecord, final

private BundleTracker(final ConsumerRecord<byte[], byte[]> initialRecord, final TopicPartition topicPartition, final String keyEncoding, final RecordSetWriter recordWriter) {
this.initialOffset = initialRecord.offset();
this.initialTimestamp = initialRecord.timestamp();
this.partition = topicPartition.partition();
this.topic = topicPartition.topic();
this.recordWriter = recordWriter;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ final class KafkaProcessorUtils {
static final String KAFKA_TOPIC = "kafka.topic";
static final String KAFKA_PARTITION = "kafka.partition";
static final String KAFKA_OFFSET = "kafka.offset";
static final String KAFKA_TIMESTAMP = "kafka.timestamp";
static final String KAFKA_COUNT = "kafka.count";
static final AllowableValue SEC_PLAINTEXT = new AllowableValue("PLAINTEXT", "PLAINTEXT", "PLAINTEXT");
static final AllowableValue SEC_SSL = new AllowableValue("SSL", "SSL", "SSL");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
@WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_KEY, description = "The key of message if present and if single message. "
+ "How the key is encoded depends on the value of the 'Key Attribute Encoding' property."),
@WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_OFFSET, description = "The offset of the message in the partition of the topic."),
@WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_TIMESTAMP, description = "The timestamp of the message in the partition of the topic."),
@WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_PARTITION, description = "The partition of the topic the message or message bundle is from"),
@WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_TOPIC, description = "The topic the message or message bundle is from")
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -463,6 +463,7 @@ private void handleParseFailure(final ConsumerRecord<byte[], byte[]> consumerRec
// If we are unable to parse the data, we need to transfer it to 'parse failure' relationship
final Map<String, String> attributes = getAttributes(consumerRecord);
attributes.put(KafkaProcessorUtils.KAFKA_OFFSET, String.valueOf(consumerRecord.offset()));
attributes.put(KafkaProcessorUtils.KAFKA_TIMESTAMP, String.valueOf(consumerRecord.timestamp()));
attributes.put(KafkaProcessorUtils.KAFKA_PARTITION, String.valueOf(consumerRecord.partition()));
attributes.put(KafkaProcessorUtils.KAFKA_TOPIC, consumerRecord.topic());

Expand Down Expand Up @@ -623,6 +624,7 @@ private void rollback(final TopicPartition topicPartition) {
private void populateAttributes(final BundleTracker tracker) {
final Map<String, String> kafkaAttrs = new HashMap<>();
kafkaAttrs.put(KafkaProcessorUtils.KAFKA_OFFSET, String.valueOf(tracker.initialOffset));
kafkaAttrs.put(KafkaProcessorUtils.KAFKA_TIMESTAMP, String.valueOf(tracker.initialTimestamp));
if (tracker.key != null && tracker.totalRecords == 1) {
kafkaAttrs.put(KafkaProcessorUtils.KAFKA_KEY, tracker.key);
}
Expand All @@ -647,6 +649,7 @@ private void populateAttributes(final BundleTracker tracker) {
private static class BundleTracker {

final long initialOffset;
final long initialTimestamp;
final int partition;
final String topic;
final String key;
Expand All @@ -660,6 +663,7 @@ private BundleTracker(final ConsumerRecord<byte[], byte[]> initialRecord, final

private BundleTracker(final ConsumerRecord<byte[], byte[]> initialRecord, final TopicPartition topicPartition, final String keyEncoding, final RecordSetWriter recordWriter) {
this.initialOffset = initialRecord.offset();
this.initialTimestamp = initialRecord.timestamp();
this.partition = topicPartition.partition();
this.topic = topicPartition.topic();
this.recordWriter = recordWriter;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ final class KafkaProcessorUtils {
static final String KAFKA_TOPIC = "kafka.topic";
static final String KAFKA_PARTITION = "kafka.partition";
static final String KAFKA_OFFSET = "kafka.offset";
static final String KAFKA_TIMESTAMP = "kafka.timestamp";
static final String KAFKA_COUNT = "kafka.count";
static final AllowableValue SEC_PLAINTEXT = new AllowableValue("PLAINTEXT", "PLAINTEXT", "PLAINTEXT");
static final AllowableValue SEC_SSL = new AllowableValue("SSL", "SSL", "SSL");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
@WritesAttribute(attribute = "record.count", description = "The number of records received"),
@WritesAttribute(attribute = "mime.type", description = "The MIME Type that is provided by the configured Record Writer"),
@WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_PARTITION, description = "The partition of the topic the records are from"),
@WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_TIMESTAMP, description = "The timestamp of the message in the partition of the topic."),
@WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_TOPIC, description = "The topic records are from")
})
@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
@WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_KEY, description = "The key of message if present and if single message. "
+ "How the key is encoded depends on the value of the 'Key Attribute Encoding' property."),
@WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_OFFSET, description = "The offset of the message in the partition of the topic."),
@WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_TIMESTAMP, description = "The timestamp of the message in the partition of the topic."),
@WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_PARTITION, description = "The partition of the topic the message or message bundle is from"),
@WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_TOPIC, description = "The topic the message or message bundle is from")
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -463,6 +463,7 @@ private void handleParseFailure(final ConsumerRecord<byte[], byte[]> consumerRec
// If we are unable to parse the data, we need to transfer it to 'parse failure' relationship
final Map<String, String> attributes = getAttributes(consumerRecord);
attributes.put(KafkaProcessorUtils.KAFKA_OFFSET, String.valueOf(consumerRecord.offset()));
attributes.put(KafkaProcessorUtils.KAFKA_TIMESTAMP, String.valueOf(consumerRecord.timestamp()));
attributes.put(KafkaProcessorUtils.KAFKA_PARTITION, String.valueOf(consumerRecord.partition()));
attributes.put(KafkaProcessorUtils.KAFKA_TOPIC, consumerRecord.topic());

Expand Down Expand Up @@ -623,6 +624,7 @@ private void rollback(final TopicPartition topicPartition) {
private void populateAttributes(final BundleTracker tracker) {
final Map<String, String> kafkaAttrs = new HashMap<>();
kafkaAttrs.put(KafkaProcessorUtils.KAFKA_OFFSET, String.valueOf(tracker.initialOffset));
kafkaAttrs.put(KafkaProcessorUtils.KAFKA_TIMESTAMP, String.valueOf(tracker.initialTimestamp));
if (tracker.key != null && tracker.totalRecords == 1) {
kafkaAttrs.put(KafkaProcessorUtils.KAFKA_KEY, tracker.key);
}
Expand All @@ -647,6 +649,7 @@ private void populateAttributes(final BundleTracker tracker) {
private static class BundleTracker {

final long initialOffset;
final long initialTimestamp;
final int partition;
final String topic;
final String key;
Expand All @@ -660,6 +663,7 @@ private BundleTracker(final ConsumerRecord<byte[], byte[]> initialRecord, final

private BundleTracker(final ConsumerRecord<byte[], byte[]> initialRecord, final TopicPartition topicPartition, final String keyEncoding, final RecordSetWriter recordWriter) {
this.initialOffset = initialRecord.offset();
this.initialTimestamp = initialRecord.timestamp();
this.partition = topicPartition.partition();
this.topic = topicPartition.topic();
this.recordWriter = recordWriter;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ final class KafkaProcessorUtils {
static final String KAFKA_TOPIC = "kafka.topic";
static final String KAFKA_PARTITION = "kafka.partition";
static final String KAFKA_OFFSET = "kafka.offset";
static final String KAFKA_TIMESTAMP = "kafka.timestamp";
static final String KAFKA_COUNT = "kafka.count";
static final AllowableValue SEC_PLAINTEXT = new AllowableValue("PLAINTEXT", "PLAINTEXT", "PLAINTEXT");
static final AllowableValue SEC_SSL = new AllowableValue("SSL", "SSL", "SSL");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
@WritesAttribute(attribute = "record.count", description = "The number of records received"),
@WritesAttribute(attribute = "mime.type", description = "The MIME Type that is provided by the configured Record Writer"),
@WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_PARTITION, description = "The partition of the topic the records are from"),
@WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_TIMESTAMP, description = "The timestamp of the message in the partition of the topic."),
@WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_TOPIC, description = "The topic records are from")
})
@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
@WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_KEY, description = "The key of message if present and if single message. "
+ "How the key is encoded depends on the value of the 'Key Attribute Encoding' property."),
@WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_OFFSET, description = "The offset of the message in the partition of the topic."),
@WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_TIMESTAMP, description = "The timestamp of the message in the partition of the topic."),
@WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_PARTITION, description = "The partition of the topic the message or message bundle is from"),
@WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_TOPIC, description = "The topic the message or message bundle is from")
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -463,6 +463,7 @@ private void handleParseFailure(final ConsumerRecord<byte[], byte[]> consumerRec
// If we are unable to parse the data, we need to transfer it to 'parse failure' relationship
final Map<String, String> attributes = getAttributes(consumerRecord);
attributes.put(KafkaProcessorUtils.KAFKA_OFFSET, String.valueOf(consumerRecord.offset()));
attributes.put(KafkaProcessorUtils.KAFKA_TIMESTAMP, String.valueOf(consumerRecord.timestamp()));
attributes.put(KafkaProcessorUtils.KAFKA_PARTITION, String.valueOf(consumerRecord.partition()));
attributes.put(KafkaProcessorUtils.KAFKA_TOPIC, consumerRecord.topic());

Expand Down Expand Up @@ -623,6 +624,7 @@ private void rollback(final TopicPartition topicPartition) {
private void populateAttributes(final BundleTracker tracker) {
final Map<String, String> kafkaAttrs = new HashMap<>();
kafkaAttrs.put(KafkaProcessorUtils.KAFKA_OFFSET, String.valueOf(tracker.initialOffset));
kafkaAttrs.put(KafkaProcessorUtils.KAFKA_TIMESTAMP, String.valueOf(tracker.initialTimestamp));
if (tracker.key != null && tracker.totalRecords == 1) {
kafkaAttrs.put(KafkaProcessorUtils.KAFKA_KEY, tracker.key);
}
Expand All @@ -647,6 +649,7 @@ private void populateAttributes(final BundleTracker tracker) {
private static class BundleTracker {

final long initialOffset;
final long initialTimestamp;
final int partition;
final String topic;
final String key;
Expand All @@ -660,6 +663,7 @@ private BundleTracker(final ConsumerRecord<byte[], byte[]> initialRecord, final

private BundleTracker(final ConsumerRecord<byte[], byte[]> initialRecord, final TopicPartition topicPartition, final String keyEncoding, final RecordSetWriter recordWriter) {
this.initialOffset = initialRecord.offset();
this.initialTimestamp = initialRecord.timestamp();
this.partition = topicPartition.partition();
this.topic = topicPartition.topic();
this.recordWriter = recordWriter;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ final class KafkaProcessorUtils {
static final String KAFKA_TOPIC = "kafka.topic";
static final String KAFKA_PARTITION = "kafka.partition";
static final String KAFKA_OFFSET = "kafka.offset";
static final String KAFKA_TIMESTAMP = "kafka.timestamp";
static final String KAFKA_COUNT = "kafka.count";
static final AllowableValue SEC_PLAINTEXT = new AllowableValue("PLAINTEXT", "PLAINTEXT", "PLAINTEXT");
static final AllowableValue SEC_SSL = new AllowableValue("SSL", "SSL", "SSL");
Expand Down

0 comments on commit 45ebeba

Please sign in to comment.