Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1425,6 +1425,7 @@ public LLRealtimeSegmentDataManager(SegmentZKMetadata segmentZKMetadata, TableCo
.setPartitionUpsertMetadataManager(partitionUpsertMetadataManager)
.setPartitionDedupMetadataManager(partitionDedupMetadataManager)
.setUpsertComparisonColumns(tableConfig.getUpsertComparisonColumns())
.setUpsertDeleteRecordColumn(tableConfig.getUpsertDeleteRecordColumn())
.setFieldConfigList(tableConfig.getFieldConfigList());

// Create message decoder
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,24 +81,33 @@ public FilterPlanNode(IndexSegment indexSegment, QueryContext queryContext, @Nul

@Override
public BaseFilterOperator run() {
// NOTE: Snapshot the validDocIds before reading the numDocs to prevent the latest updates getting lost
ThreadSafeMutableRoaringBitmap validDocIds = _indexSegment.getValidDocIds();
MutableRoaringBitmap validDocIdsSnapshot =
validDocIds != null && !_queryContext.isSkipUpsert() ? validDocIds.getMutableRoaringBitmap() : null;
// NOTE: Snapshot the queryableDocIds before reading the numDocs to prevent the latest updates getting lost
MutableRoaringBitmap queryableDocIdSnapshot = null;
if (!_queryContext.isSkipUpsert()) {
ThreadSafeMutableRoaringBitmap queryableDocIds = _indexSegment.getQueryableDocIds();
if (queryableDocIds != null) {
queryableDocIdSnapshot = queryableDocIds.getMutableRoaringBitmap();
} else {
ThreadSafeMutableRoaringBitmap validDocIds = _indexSegment.getValidDocIds();
if (validDocIds != null) {
queryableDocIdSnapshot = validDocIds.getMutableRoaringBitmap();
}
}
}
int numDocs = _indexSegment.getSegmentMetadata().getTotalDocs();

FilterContext filter = _filter != null ? _filter : _queryContext.getFilter();
if (filter != null) {
BaseFilterOperator filterOperator = constructPhysicalOperator(filter, numDocs);
if (validDocIdsSnapshot != null) {
BaseFilterOperator validDocFilter = new BitmapBasedFilterOperator(validDocIdsSnapshot, false, numDocs);
if (queryableDocIdSnapshot != null) {
BaseFilterOperator validDocFilter = new BitmapBasedFilterOperator(queryableDocIdSnapshot, false, numDocs);
return FilterOperatorUtils.getAndFilterOperator(_queryContext, Arrays.asList(filterOperator, validDocFilter),
numDocs);
} else {
return filterOperator;
}
} else if (validDocIdsSnapshot != null) {
return new BitmapBasedFilterOperator(validDocIdsSnapshot, false, numDocs);
} else if (queryableDocIdSnapshot != null) {
return new BitmapBasedFilterOperator(queryableDocIdSnapshot, false, numDocs);
} else {
return new MatchAllFilterOperator(numDocs);
}
Expand Down Expand Up @@ -250,9 +259,8 @@ private BaseFilterOperator constructPhysicalOperator(FilterContext filter, int n
return new TextContainsFilterOperator(textIndexReader, (TextContainsPredicate) predicate, numDocs);
case TEXT_MATCH:
textIndexReader = dataSource.getTextIndex();
Preconditions
.checkState(textIndexReader != null, "Cannot apply TEXT_MATCH on column: %s without text index",
column);
Preconditions.checkState(textIndexReader != null,
"Cannot apply TEXT_MATCH on column: %s without text index", column);
// We could check for real time and segment Lucene reader, but easier to check the other way round
if (textIndexReader instanceof NativeTextIndexReader
|| textIndexReader instanceof NativeMutableTextIndex) {
Expand Down Expand Up @@ -300,8 +308,8 @@ private BaseFilterOperator constructPhysicalOperator(FilterContext filter, int n
return new MatchAllFilterOperator(numDocs);
}
default:
predicateEvaluator = PredicateEvaluatorProvider.getPredicateEvaluator(predicate, dataSource,
_queryContext);
predicateEvaluator =
PredicateEvaluatorProvider.getPredicateEvaluator(predicate, dataSource, _queryContext);
_predicateEvaluators.add(Pair.of(predicate, predicateEvaluator));
return FilterOperatorUtils.getLeafFilterOperator(predicateEvaluator, dataSource, numDocs,
_queryContext.isNullHandlingEnabled());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,8 @@ public void buildSegment()
segmentGeneratorConfig.setTableName("testTable");
segmentGeneratorConfig.setSegmentName(SEGMENT_NAME);
segmentGeneratorConfig.setOutDir(INDEX_DIR.getAbsolutePath());
segmentGeneratorConfig.setIndexOn(StandardIndexes.inverted(), IndexConfig.ENABLED,
"column6", "column7", "column11", "column17", "column18");
segmentGeneratorConfig.setIndexOn(StandardIndexes.inverted(), IndexConfig.ENABLED, "column6", "column7", "column11",
"column17", "column18");

// Build the index segment.
SegmentIndexCreationDriver driver = new SegmentIndexCreationDriverImpl();
Expand All @@ -131,8 +131,8 @@ public void loadSegment()
_upsertIndexSegment = ImmutableSegmentLoader.load(new File(INDEX_DIR, SEGMENT_NAME), ReadMode.heap);
((ImmutableSegmentImpl) _upsertIndexSegment).enableUpsert(
new ConcurrentMapPartitionUpsertMetadataManager("testTable_REALTIME", 0, Collections.singletonList("column6"),
Collections.singletonList("daysSinceEpoch"), HashFunction.NONE, null, false, serverMetrics),
new ThreadSafeMutableRoaringBitmap());
Collections.singletonList("daysSinceEpoch"), null, HashFunction.NONE, null, false, serverMetrics),
new ThreadSafeMutableRoaringBitmap(), null);
}

@AfterClass
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,12 @@ public ThreadSafeMutableRoaringBitmap getValidDocIds() {
return null;
}

@Nullable
@Override
public ThreadSafeMutableRoaringBitmap getQueryableDocIds() {
return null;
}

@Override
public GenericRow getRecord(int docId, GenericRow reuse) {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.pinot.client.ConnectionFactory;
import org.apache.pinot.common.utils.TarGzCompressionUtils;
import org.apache.pinot.common.utils.config.TagNameUtils;
import org.apache.pinot.plugin.inputformat.csv.CSVMessageDecoder;
import org.apache.pinot.plugin.stream.kafka.KafkaStreamConfigProperties;
import org.apache.pinot.spi.config.table.ColumnPartitionConfig;
import org.apache.pinot.spi.config.table.DedupConfig;
Expand All @@ -54,6 +55,8 @@
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.stream.StreamConfig;
import org.apache.pinot.spi.stream.StreamConfigProperties;
import org.apache.pinot.spi.stream.StreamDataProducer;
import org.apache.pinot.spi.stream.StreamDataProvider;
import org.apache.pinot.spi.stream.StreamDataServerStartable;
import org.apache.pinot.spi.utils.JsonUtils;
import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
Expand Down Expand Up @@ -396,11 +399,15 @@ protected TableConfig createRealtimeTableConfig(File sampleAvroFile) {
/**
* Creates a new Upsert enabled table config.
*/
protected TableConfig createUpsertTableConfig(File sampleAvroFile, String primaryKeyColumn, int numPartitions) {
protected TableConfig createUpsertTableConfig(File sampleAvroFile, String primaryKeyColumn, String deleteColumn,
int numPartitions) {
AvroFileSchemaKafkaAvroMessageDecoder._avroFile = sampleAvroFile;
Map<String, ColumnPartitionConfig> columnPartitionConfigMap = new HashMap<>();
columnPartitionConfigMap.put(primaryKeyColumn, new ColumnPartitionConfig("Murmur", numPartitions));

UpsertConfig upsertConfig = new UpsertConfig(UpsertConfig.Mode.FULL);
upsertConfig.setDeleteRecordColumn(deleteColumn);

return new TableConfigBuilder(TableType.REALTIME).setTableName(getTableName()).setSchemaName(getSchemaName())
.setTimeColumnName(getTimeColumnName()).setFieldConfigList(getFieldConfigs()).setNumReplicas(getNumReplicas())
.setSegmentVersion(getSegmentVersion()).setLoadMode(getLoadMode()).setTaskConfig(getTaskConfig())
Expand All @@ -409,7 +416,62 @@ protected TableConfig createUpsertTableConfig(File sampleAvroFile, String primar
.setRoutingConfig(new RoutingConfig(null, null, RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE))
.setSegmentPartitionConfig(new SegmentPartitionConfig(columnPartitionConfigMap))
.setReplicaGroupStrategyConfig(new ReplicaGroupStrategyConfig(primaryKeyColumn, 1))
.setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL)).build();
.setUpsertConfig(upsertConfig).build();
}

protected Map<String, String> getCSVDecoderProperties(@Nullable String delimiter,
@Nullable String csvHeaderProperty) {
String streamType = "kafka";
Map<String, String> csvDecoderProperties = new HashMap<>();
csvDecoderProperties.put(
StreamConfigProperties.constructStreamProperty(streamType, StreamConfigProperties.STREAM_DECODER_CLASS),
CSVMessageDecoder.class.getName());
if (delimiter != null) {
csvDecoderProperties.put(StreamConfigProperties.constructStreamProperty(streamType, "decoder.prop.delimiter"),
delimiter);
}
if (csvHeaderProperty != null) {
csvDecoderProperties.put(StreamConfigProperties.constructStreamProperty(streamType, "decoder.prop.header"),
csvHeaderProperty);
}
return csvDecoderProperties;
}

/**
* Creates a new Upsert enabled table config.
*/
protected TableConfig createCSVUpsertTableConfig(String tableName, @Nullable String schemaName,
@Nullable String kafkaTopicName, int numPartitions, Map<String, String> streamDecoderProperties,
UpsertConfig upsertConfig, String primaryKeyColumn) {
if (schemaName == null) {
schemaName = getSchemaName();
}
Map<String, ColumnPartitionConfig> columnPartitionConfigMap = new HashMap<>();
columnPartitionConfigMap.put(primaryKeyColumn, new ColumnPartitionConfig("Murmur", numPartitions));

if (upsertConfig == null) {
upsertConfig = new UpsertConfig(UpsertConfig.Mode.FULL);
}
if (kafkaTopicName == null) {
kafkaTopicName = getKafkaTopic();
}

Map<String, String> streamConfigsMap = getStreamConfigMap();
streamConfigsMap.put(
StreamConfigProperties.constructStreamProperty("kafka", StreamConfigProperties.STREAM_TOPIC_NAME),
kafkaTopicName);
streamConfigsMap.putAll(streamDecoderProperties);

return new TableConfigBuilder(TableType.REALTIME).setTableName(tableName).setSchemaName(schemaName)
.setTimeColumnName(getTimeColumnName()).setFieldConfigList(getFieldConfigs()).setNumReplicas(getNumReplicas())
.setSegmentVersion(getSegmentVersion()).setLoadMode(getLoadMode()).setTaskConfig(getTaskConfig())
.setBrokerTenant(getBrokerTenant()).setServerTenant(getServerTenant()).setIngestionConfig(getIngestionConfig())
.setLLC(useLlc()).setStreamConfigs(streamConfigsMap)
.setNullHandlingEnabled(UpsertConfig.Mode.PARTIAL.equals(upsertConfig.getMode()) || getNullHandlingEnabled())
.setRoutingConfig(new RoutingConfig(null, null, RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE))
.setSegmentPartitionConfig(new SegmentPartitionConfig(columnPartitionConfigMap))
.setReplicaGroupStrategyConfig(new ReplicaGroupStrategyConfig(primaryKeyColumn, 1))
.setUpsertConfig(upsertConfig).build();
}

/**
Expand Down Expand Up @@ -498,33 +560,84 @@ protected void setUpQueryGenerator(List<File> avroFiles) {
_queryGenerator = new QueryGenerator(avroFiles, tableName, tableName);
}

protected List<File> unpackAvroData(File outputDir)
throws Exception {
return unpackTarData(getAvroTarFileName(), outputDir);
}

/**
* Unpack the tarred Avro data into the given directory.
* Unpack the tarred data into the given directory.
*
* @param tarFileName Input tar filename
* @param outputDir Output directory
* @return List of files unpacked.
* @throws Exception
*/
protected List<File> unpackAvroData(File outputDir)
protected List<File> unpackTarData(String tarFileName, File outputDir)
throws Exception {
InputStream inputStream =
BaseClusterIntegrationTest.class.getClassLoader().getResourceAsStream(getAvroTarFileName());
BaseClusterIntegrationTest.class.getClassLoader().getResourceAsStream(tarFileName);
Assert.assertNotNull(inputStream);
return TarGzCompressionUtils.untar(inputStream, outputDir);
}

/**
* Pushes the data in the given Avro files into a Kafka stream.
*
* @param avroFiles List of Avro files
*/
protected void pushAvroIntoKafka(List<File> avroFiles)
throws Exception {

ClusterIntegrationTestUtils.pushAvroIntoKafka(avroFiles, "localhost:" + getKafkaPort(), getKafkaTopic(),
getMaxNumKafkaMessagesPerBatch(), getKafkaMessageHeader(), getPartitionColumn(), injectTombstones());
}

/**
* Pushes the data in the given Avro files into a Kafka stream.
*
* @param csvFile List of CSV strings
*/
protected void pushCsvIntoKafka(File csvFile, String kafkaTopic, @Nullable Integer partitionColumnIndex)
throws Exception {
String kafkaBroker = "localhost:" + getKafkaPort();
StreamDataProducer producer = null;
try {
producer =
StreamDataProvider.getStreamDataProducer(KafkaStarterUtils.KAFKA_PRODUCER_CLASS_NAME,
getDefaultKafkaProducerProperties(kafkaBroker));
ClusterIntegrationTestUtils.pushCsvIntoKafka(csvFile, kafkaTopic, partitionColumnIndex, injectTombstones(),
producer);
} catch (Exception e) {
if (producer != null) {
producer.close();
}
throw e;
}
}

protected void pushCsvIntoKafka(List<String> csvRecords, String kafkaTopic, @Nullable Integer partitionColumnIndex) {
String kafkaBroker = "localhost:" + getKafkaPort();
StreamDataProducer producer = null;
try {
producer =
StreamDataProvider.getStreamDataProducer(KafkaStarterUtils.KAFKA_PRODUCER_CLASS_NAME,
getDefaultKafkaProducerProperties(kafkaBroker));
ClusterIntegrationTestUtils.pushCsvIntoKafka(csvRecords, kafkaTopic, partitionColumnIndex, injectTombstones(),
producer);
} catch (Exception e) {
if (producer != null) {
producer.close();
}
}
}
private Properties getDefaultKafkaProducerProperties(String kafkaBroker) {
Properties properties = new Properties();
properties.put("metadata.broker.list", kafkaBroker);
properties.put("serializer.class", "kafka.serializer.DefaultEncoder");
properties.put("request.required.acks", "1");
properties.put("partitioner.class", "kafka.producer.ByteArrayPartitioner");
return properties;
}

protected boolean injectTombstones() {
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.io.File;
import java.io.IOException;
import java.math.BigDecimal;
import java.nio.charset.StandardCharsets;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
Expand Down Expand Up @@ -55,6 +56,9 @@
import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.EncoderFactory;
import org.apache.avro.util.Utf8;
import org.apache.commons.csv.CSVFormat;
import org.apache.commons.csv.CSVParser;
import org.apache.commons.csv.CSVRecord;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
Expand Down Expand Up @@ -337,6 +341,76 @@ public static void buildSegmentFromAvro(File avroFile, TableConfig tableConfig,
TarGzCompressionUtils.createTarGzFile(indexDir, segmentTarFile);
}

/**
* Push the records from the given Avro files into a Kafka stream.
*
* @param csvFile CSV File name
* @param kafkaTopic Kafka topic
* @param partitionColumnIndex Optional Index of the partition column
* @throws Exception
*/
public static void pushCsvIntoKafka(File csvFile, String kafkaTopic,
@Nullable Integer partitionColumnIndex, boolean injectTombstones, StreamDataProducer producer)
throws Exception {

if (injectTombstones) {
// publish lots of tombstones to livelock the consumer if it can't handle this properly
for (int i = 0; i < 1000; i++) {
// publish a tombstone first
producer.produce(kafkaTopic, Longs.toByteArray(System.currentTimeMillis()), null);
}
}
CSVFormat csvFormat = CSVFormat.DEFAULT.withSkipHeaderRecord(true);
try (CSVParser parser = CSVParser.parse(csvFile, StandardCharsets.UTF_8, csvFormat)) {
for (CSVRecord csv : parser) {
byte[] keyBytes = (partitionColumnIndex == null) ? Longs.toByteArray(System.currentTimeMillis())
: csv.get(partitionColumnIndex).getBytes(StandardCharsets.UTF_8);
List<String> cols = new ArrayList<>();
for (String col : csv) {
cols.add(col);
}
byte[] bytes = String.join(",", cols).getBytes(StandardCharsets.UTF_8);
producer.produce(kafkaTopic, keyBytes, bytes);
}
}
}

/**
* Push the records from the given Avro files into a Kafka stream.
*
* @param csvRecords List of CSV record string
* @param kafkaTopic Kafka topic
* @param partitionColumnIndex Optional Index of the partition column
* @throws Exception
*/
public static void pushCsvIntoKafka(List<String> csvRecords, String kafkaTopic,
@Nullable Integer partitionColumnIndex, boolean injectTombstones, StreamDataProducer producer)
throws Exception {

if (injectTombstones) {
// publish lots of tombstones to livelock the consumer if it can't handle this properly
for (int i = 0; i < 1000; i++) {
// publish a tombstone first
producer.produce(kafkaTopic, Longs.toByteArray(System.currentTimeMillis()), null);
}
}
CSVFormat csvFormat = CSVFormat.DEFAULT.withSkipHeaderRecord(true);
for (String recordCsv: csvRecords) {
try (CSVParser parser = CSVParser.parse(recordCsv, csvFormat)) {
for (CSVRecord csv : parser) {
byte[] keyBytes = (partitionColumnIndex == null) ? Longs.toByteArray(System.currentTimeMillis())
: csv.get(partitionColumnIndex).getBytes(StandardCharsets.UTF_8);
List<String> cols = new ArrayList<>();
for (String col : csv) {
cols.add(col);
}
byte[] bytes = String.join(",", cols).getBytes(StandardCharsets.UTF_8);
producer.produce(kafkaTopic, keyBytes, bytes);
}
}
}
}

/**
* Push the records from the given Avro files into a Kafka stream.
*
Expand Down
Loading