Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,36 @@ public static String chr(int codepoint) {
return new String(result);
}

/**
* @param bytes
* @param charsetName encoding
* @return bytearray to string
* returns null on exception
*/
@ScalarFunction
public static String fromBytes(byte[] bytes, String charsetName) {
try {
return new String(bytes, charsetName);
} catch (UnsupportedEncodingException e) {
return null;
}
}

/**
* @param input
* @param charsetName encoding
* @return bytearray to string
* returns null on exception
*/
@ScalarFunction
public static byte[] toBytes(String input, String charsetName) {
try {
return input.getBytes(charsetName);
} catch (UnsupportedEncodingException e) {
return null;
}
}

/**
* @see StandardCharsets#UTF_8#encode(String)
* @param input
Expand Down Expand Up @@ -556,7 +586,7 @@ public static int strcmp(String input1, String input2) {
@ScalarFunction
public static String encodeUrl(String input)
throws UnsupportedEncodingException {
return URLEncoder.encode(input, StandardCharsets.UTF_8.toString());
return URLEncoder.encode(input, StandardCharsets.UTF_8.toString());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,9 @@
import org.apache.pinot.spi.stream.RowMetadata;
import org.apache.pinot.spi.stream.StreamConsumerFactory;
import org.apache.pinot.spi.stream.StreamConsumerFactoryProvider;
import org.apache.pinot.spi.stream.StreamDataDecoder;
import org.apache.pinot.spi.stream.StreamDataDecoderImpl;
import org.apache.pinot.spi.stream.StreamDataDecoderResult;
import org.apache.pinot.spi.stream.StreamDecoderProvider;
import org.apache.pinot.spi.stream.StreamMessageDecoder;
import org.apache.pinot.spi.stream.StreamMetadataProvider;
Expand Down Expand Up @@ -212,7 +215,7 @@ public void deleteSegmentFile() {
private final SegmentZKMetadata _segmentZKMetadata;
private final TableConfig _tableConfig;
private final RealtimeTableDataManager _realtimeTableDataManager;
private final StreamMessageDecoder _messageDecoder;
private final StreamDataDecoder _streamDataDecoder;
private final int _segmentMaxRowCount;
private final String _resourceDataDir;
private final IndexLoadingConfig _indexLoadingConfig;
Expand Down Expand Up @@ -506,7 +509,6 @@ private boolean processStreamEvents(MessageBatch messagesAndOffsets, long idlePi
int streamMessageCount = 0;
boolean canTakeMore = true;

GenericRow reuse = new GenericRow();
TransformPipeline.Result reusedResult = new TransformPipeline.Result();
boolean prematureExit = false;
for (int index = 0; index < messageCount; index++) {
Expand Down Expand Up @@ -539,18 +541,18 @@ private boolean processStreamEvents(MessageBatch messagesAndOffsets, long idlePi
throw new RuntimeException("Realtime segment full");
}

// Index each message
reuse.clear();
// retrieve metadata from the message batch if available
// this can be overridden by the decoder if there is a better indicator in the message payload
RowMetadata msgMetadata = messagesAndOffsets.getMetadataAtIndex(index);

GenericRow decodedRow = _messageDecoder
.decode(messagesAndOffsets.getMessageAtIndex(index), messagesAndOffsets.getMessageOffsetAtIndex(index),
messagesAndOffsets.getMessageLengthAtIndex(index), reuse);
if (decodedRow != null) {
// Decode message
StreamDataDecoderResult decodedRow = _streamDataDecoder.decode(messagesAndOffsets.getStreamMessage(index));
RowMetadata msgMetadata = messagesAndOffsets.getStreamMessage(index).getMetadata();
if (decodedRow.getException() != null) {
// TODO: based on a config, decide whether the record should be silently dropped or stop further consumption on
// decode error
realtimeRowsDroppedMeter =
_serverMetrics.addMeteredTableValue(_metricKeyName, ServerMeter.INVALID_REALTIME_ROWS_DROPPED, 1,
realtimeRowsDroppedMeter);
} else {
try {
_transformPipeline.processRow(decodedRow, reusedResult);
_transformPipeline.processRow(decodedRow.getResult(), reusedResult);
} catch (Exception e) {
_numRowsErrored++;
// when exception happens we prefer abandoning the whole batch and not partially indexing some rows
Expand Down Expand Up @@ -585,10 +587,6 @@ private boolean processStreamEvents(MessageBatch messagesAndOffsets, long idlePi
new SegmentErrorInfo(now(), errorMessage, e));
}
}
} else {
realtimeRowsDroppedMeter =
_serverMetrics.addMeteredTableValue(_metricKeyName, ServerMeter.INVALID_REALTIME_ROWS_DROPPED, 1,
realtimeRowsDroppedMeter);
}

_currentOffset = messagesAndOffsets.getNextStreamPartitionMsgOffsetAtIndex(index);
Expand Down Expand Up @@ -1364,7 +1362,8 @@ public LLRealtimeSegmentDataManager(SegmentZKMetadata segmentZKMetadata, TableCo

// Create message decoder
Set<String> fieldsToRead = IngestionUtils.getFieldsForRecordExtractor(_tableConfig.getIngestionConfig(), _schema);
_messageDecoder = StreamDecoderProvider.create(_partitionLevelStreamConfig, fieldsToRead);
StreamMessageDecoder streamMessageDecoder = StreamDecoderProvider.create(_partitionLevelStreamConfig, fieldsToRead);
_streamDataDecoder = new StreamDataDecoderImpl(streamMessageDecoder);
_clientId = streamTopic + "-" + _partitionGroupId;

_transformPipeline = new TransformPipeline(tableConfig, schema);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ public void testSegmentMetadataApi()
segmentMetadataFromDirectEndpoint.get("segment.total.docs"));
}

// TODO: This test fails when using `llc` consumer mode. Needs investigation
@Test
public void testSegmentListApi()
throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,17 @@
*/
package org.apache.pinot.plugin.stream.kafka20;

import java.nio.ByteBuffer;
import java.util.List;
import org.apache.pinot.spi.stream.LongMsgOffset;
import org.apache.pinot.spi.stream.MessageBatch;
import org.apache.pinot.spi.stream.RowMetadata;
import org.apache.pinot.spi.stream.StreamMessage;
import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;


public class KafkaMessageBatch implements MessageBatch<byte[]> {

private final List<MessageAndOffsetAndMetadata> _messageList;
public class KafkaMessageBatch implements MessageBatch<StreamMessage> {
private final List<StreamMessage> _messageList;
private final int _unfilteredMessageCount;
private final long _lastOffset;

Expand All @@ -36,7 +37,7 @@ public class KafkaMessageBatch implements MessageBatch<byte[]> {
* @param lastOffset the offset of the last message in the batch
* @param batch the messages, which may be smaller than {@see unfilteredMessageCount}
*/
public KafkaMessageBatch(int unfilteredMessageCount, long lastOffset, List<MessageAndOffsetAndMetadata> batch) {
public KafkaMessageBatch(int unfilteredMessageCount, long lastOffset, List<StreamMessage> batch) {
_messageList = batch;
_lastOffset = lastOffset;
_unfilteredMessageCount = unfilteredMessageCount;
Expand All @@ -53,18 +54,18 @@ public int getUnfilteredMessageCount() {
}

@Override
public byte[] getMessageAtIndex(int index) {
return _messageList.get(index).getMessage().array();
public StreamMessage getMessageAtIndex(int index) {
return _messageList.get(index);
}

@Override
public int getMessageOffsetAtIndex(int index) {
return _messageList.get(index).getMessage().arrayOffset();
return ByteBuffer.wrap(_messageList.get(index).getValue()).arrayOffset();
}

@Override
public int getMessageLengthAtIndex(int index) {
return _messageList.get(index).payloadSize();
return _messageList.get(index).getValue().length;
}

@Override
Expand All @@ -74,7 +75,7 @@ public long getNextStreamMessageOffsetAtIndex(int index) {

@Override
public StreamPartitionMsgOffset getNextStreamPartitionMsgOffsetAtIndex(int index) {
return new LongMsgOffset(_messageList.get(index).getNextOffset());
return new LongMsgOffset(((KafkaStreamMessage) _messageList.get(index)).getNextOffset());
}

@Override
Expand All @@ -84,6 +85,16 @@ public StreamPartitionMsgOffset getOffsetOfNextBatch() {

@Override
public RowMetadata getMetadataAtIndex(int index) {
return _messageList.get(index).getRowMetadata();
return _messageList.get(index).getMetadata();
}

@Override
public byte[] getMessageBytesAtIndex(int index) {
return _messageList.get(index).getValue();
}

@Override
public StreamMessage getStreamMessage(int index) {
return _messageList.get(index);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.plugin.stream.kafka20;

import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.apache.pinot.spi.data.readers.GenericRow;
import org.apache.pinot.spi.stream.RowMetadata;


@FunctionalInterface
public interface KafkaMetadataExtractor {
static KafkaMetadataExtractor build(boolean populateMetadata) {
return record -> {
if (!populateMetadata) {
long recordTimestamp = record.timestamp();
Map<String, String> metadataMap = new HashMap<>();
metadataMap.put(KafkaStreamMessageMetadata.METADATA_OFFSET_KEY, String.valueOf(record.offset()));
metadataMap.put(KafkaStreamMessageMetadata.RECORD_TIMESTAMP_KEY, String.valueOf(recordTimestamp));
return new KafkaStreamMessageMetadata(recordTimestamp, RowMetadata.EMPTY_ROW, metadataMap);
}
GenericRow headerGenericRow = new GenericRow();
Headers headers = record.headers();
if (headers != null) {
Header[] headersArray = headers.toArray();
for (Header header : headersArray) {
headerGenericRow.putValue(header.key(), header.value());
}
}
Map<String, String> metadata = new HashMap<>();
metadata.put(KafkaStreamMessageMetadata.METADATA_OFFSET_KEY, String.valueOf(record.offset()));
metadata.put(KafkaStreamMessageMetadata.RECORD_TIMESTAMP_KEY, String.valueOf(record.timestamp()));
return new KafkaStreamMessageMetadata(record.timestamp(), headerGenericRow, metadata);
};
}

RowMetadata extract(ConsumerRecord<?, ?> consumerRecord);
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public abstract class KafkaPartitionLevelConnectionHandler {
protected final String _topic;
protected final Consumer<String, Bytes> _consumer;
protected final TopicPartition _topicPartition;
protected final RowMetadataExtractor _rowMetadataExtractor;
protected final KafkaMetadataExtractor _kafkaMetadataExtractor;

public KafkaPartitionLevelConnectionHandler(String clientId, StreamConfig streamConfig, int partition) {
_config = new KafkaPartitionLevelStreamConfig(streamConfig);
Expand All @@ -64,7 +64,7 @@ public KafkaPartitionLevelConnectionHandler(String clientId, StreamConfig stream
_consumer = new KafkaConsumer<>(consumerProp);
_topicPartition = new TopicPartition(_topic, _partition);
_consumer.assign(Collections.singletonList(_topicPartition));
_rowMetadataExtractor = RowMetadataExtractor.build(_config.isPopulateMetadata());
_kafkaMetadataExtractor = KafkaMetadataExtractor.build(_config.isPopulateMetadata());
}

public void close()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pinot.plugin.stream.kafka20;

import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
Expand All @@ -28,6 +29,8 @@
import org.apache.pinot.spi.stream.MessageBatch;
import org.apache.pinot.spi.stream.PartitionLevelConsumer;
import org.apache.pinot.spi.stream.StreamConfig;
import org.apache.pinot.spi.stream.StreamMessage;
import org.apache.pinot.spi.stream.StreamMessageMetadata;
import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -43,30 +46,35 @@ public KafkaPartitionLevelConsumer(String clientId, StreamConfig streamConfig, i
}

@Override
public MessageBatch<byte[]> fetchMessages(StreamPartitionMsgOffset startMsgOffset,
public MessageBatch<StreamMessage> fetchMessages(StreamPartitionMsgOffset startMsgOffset,
StreamPartitionMsgOffset endMsgOffset, int timeoutMillis) {
final long startOffset = ((LongMsgOffset) startMsgOffset).getOffset();
final long endOffset = endMsgOffset == null ? Long.MAX_VALUE : ((LongMsgOffset) endMsgOffset).getOffset();
return fetchMessages(startOffset, endOffset, timeoutMillis);
}

public MessageBatch<byte[]> fetchMessages(long startOffset, long endOffset, int timeoutMillis) {
public MessageBatch<StreamMessage> fetchMessages(long startOffset, long endOffset, int timeoutMillis) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("poll consumer: {}, startOffset: {}, endOffset:{} timeout: {}ms", _topicPartition, startOffset,
endOffset, timeoutMillis);
}
LOGGER.warn("poll consumer: {}, startOffset: {}, endOffset:{} timeout: {}ms", _topicPartition, startOffset,
endOffset, timeoutMillis);
_consumer.seek(_topicPartition, startOffset);
ConsumerRecords<String, Bytes> consumerRecords = _consumer.poll(Duration.ofMillis(timeoutMillis));
List<ConsumerRecord<String, Bytes>> messageAndOffsets = consumerRecords.records(_topicPartition);
List<MessageAndOffsetAndMetadata> filtered = new ArrayList<>(messageAndOffsets.size());
List<StreamMessage> filtered = new ArrayList<>(messageAndOffsets.size());
long lastOffset = startOffset;
for (ConsumerRecord<String, Bytes> messageAndOffset : messageAndOffsets) {
String key = messageAndOffset.key();
byte[] keyBytes = key == null ? null : key.getBytes(StandardCharsets.UTF_8);
Bytes message = messageAndOffset.value();
long offset = messageAndOffset.offset();
if (offset >= startOffset & (endOffset > offset | endOffset == -1)) {
if (message != null) {
StreamMessageMetadata rowMetadata = (StreamMessageMetadata) _kafkaMetadataExtractor.extract(messageAndOffset);
filtered.add(
new MessageAndOffsetAndMetadata(message.get(), offset, _rowMetadataExtractor.extract(messageAndOffset)));
new KafkaStreamMessage(keyBytes, message.get(), rowMetadata));
} else if (LOGGER.isDebugEnabled()) {
LOGGER.debug("tombstone message at offset {}", offset);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,38 +16,23 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.plugin.stream.kafka;
package org.apache.pinot.plugin.stream.kafka20;

import java.nio.ByteBuffer;
import javax.annotation.Nullable;
import org.apache.pinot.spi.stream.StreamMessage;
import org.apache.pinot.spi.stream.StreamMessageMetadata;


public class MessageAndOffset {

private ByteBuffer _message;
private long _offset;

public MessageAndOffset(byte[] message, long offset) {
this(ByteBuffer.wrap(message), offset);
}

public MessageAndOffset(ByteBuffer message, long offset) {
_message = message;
_offset = offset;
}

public ByteBuffer getMessage() {
return _message;
}

public long getOffset() {
return _offset;
public class KafkaStreamMessage extends StreamMessage {
public KafkaStreamMessage(@Nullable byte[] key, byte[] value, @Nullable StreamMessageMetadata metadata) {
super(key, value, metadata);
}

public long getNextOffset() {
return getOffset() + 1;
}

public int payloadSize() {
return getMessage().array().length;
if (_metadata != null) {
long offset = Long.parseLong(_metadata.getRecordMetadata().get(KafkaStreamMessageMetadata.METADATA_OFFSET_KEY));
return offset < 0 ? -1 : offset + 1;
}
return -1;
}
}
Loading