Skip to content

Commit

Permalink
For consuming segment, avoid using setter in IndexLoadingConfig
Browse files Browse the repository at this point in the history
  • Loading branch information
Jackie-Jiang committed Oct 9, 2024
1 parent 1ea0d35 commit 2d007f2
Show file tree
Hide file tree
Showing 7 changed files with 82 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@
/**
* Segment data manager for low level consumer realtime segments, which manages consumption and segment completion.
*/
@SuppressWarnings("jol")
public class RealtimeSegmentDataManager extends SegmentDataManager {

@VisibleForTesting
Expand Down Expand Up @@ -237,7 +238,6 @@ public void deleteSegmentFile() {
private final StreamDataDecoder _streamDataDecoder;
private final int _segmentMaxRowCount;
private final String _resourceDataDir;
private final IndexLoadingConfig _indexLoadingConfig;
private final Schema _schema;
// Semaphore for each partitionGroupId only, which is to prevent two different stream consumers
// from consuming with the same partitionGroupId in parallel in the same host.
Expand Down Expand Up @@ -1446,7 +1446,6 @@ public RealtimeSegmentDataManager(SegmentZKMetadata segmentZKMetadata, TableConf
_tableNameWithType = _tableConfig.getTableName();
_realtimeTableDataManager = realtimeTableDataManager;
_resourceDataDir = resourceDataDir;
_indexLoadingConfig = indexLoadingConfig;
_schema = schema;
_serverMetrics = serverMetrics;
_partitionUpsertMetadataManager = partitionUpsertMetadataManager;
Expand Down Expand Up @@ -1478,7 +1477,7 @@ public RealtimeSegmentDataManager(SegmentZKMetadata segmentZKMetadata, TableConf
_segmentZKMetadata.getStatus().toString());
_partitionGroupConsumerSemaphore = partitionGroupConsumerSemaphore;
_acquiredConsumerSemaphore = new AtomicBoolean(false);
InstanceDataManagerConfig instanceDataManagerConfig = _indexLoadingConfig.getInstanceDataManagerConfig();
InstanceDataManagerConfig instanceDataManagerConfig = indexLoadingConfig.getInstanceDataManagerConfig();
String clientIdSuffix =
instanceDataManagerConfig != null ? instanceDataManagerConfig.getConsumerClientIdSuffix() : null;
if (StringUtils.isNotBlank(clientIdSuffix)) {
Expand All @@ -1488,7 +1487,7 @@ public RealtimeSegmentDataManager(SegmentZKMetadata segmentZKMetadata, TableConf
}
_segmentLogger = LoggerFactory.getLogger(RealtimeSegmentDataManager.class.getName() + "_" + _segmentNameStr);
_tableStreamName = _tableNameWithType + "_" + streamTopic;
if (_indexLoadingConfig.isRealtimeOffHeapAllocation() && !_indexLoadingConfig.isDirectRealtimeOffHeapAllocation()) {
if (indexLoadingConfig.isRealtimeOffHeapAllocation() && !indexLoadingConfig.isDirectRealtimeOffHeapAllocation()) {
_memoryManager =
new MmapMemoryManager(_realtimeTableDataManager.getConsumerDir(), _segmentNameStr, _serverMetrics);
} else {
Expand Down Expand Up @@ -1526,13 +1525,6 @@ public RealtimeSegmentDataManager(SegmentZKMetadata segmentZKMetadata, TableConf
sortedColumn = null;
}
}
// Inverted index columns
// We need to add sorted column into inverted index columns because when we convert realtime in memory segment into
// offline segment, we use sorted column's inverted index to maintain the order of the records so that the records
// are sorted on the sorted column.
if (sortedColumn != null) {
indexLoadingConfig.addInvertedIndexColumns(sortedColumn);
}

// Read the max number of rows
int segmentMaxRowCount = segmentZKMetadata.getSizeThresholdToFlushSegment();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -759,4 +759,11 @@ protected <T> T getCellValue(JsonNode jsonNode, int colIndex, int rowIndex, Func
protected long getLongCellValue(JsonNode jsonNode, int colIndex, int rowIndex) {
return getCellValue(jsonNode, colIndex, rowIndex, JsonNode::asLong).longValue();
}

protected JsonNode getColumnIndexSize(String column)
throws Exception {
return JsonUtils.stringToJsonNode(
sendGetRequest(_controllerRequestURLBuilder.forTableAggregateMetadata(getTableName(), List.of(column))))
.get("columnIndexSizeMap").get(column);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.apache.pinot.plugin.stream.kafka.KafkaMessageBatch;
import org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory;
import org.apache.pinot.plugin.stream.kafka20.KafkaPartitionLevelConsumer;
import org.apache.pinot.segment.spi.index.StandardIndexes;
import org.apache.pinot.spi.config.table.IndexingConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
Expand All @@ -73,6 +74,7 @@
import org.testng.annotations.Test;

import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;

Expand Down Expand Up @@ -321,6 +323,37 @@ public void testReload()
testReload(false);
}

@Test
public void testSortedColumn()
throws Exception {
// There should be no inverted index or range index sealed because the sorted column is not configured with them
JsonNode columnIndexSize = getColumnIndexSize(getSortedColumn());
assertFalse(columnIndexSize.has(StandardIndexes.INVERTED_ID));
assertFalse(columnIndexSize.has(StandardIndexes.RANGE_ID));

// For point lookup query, there should be no scan from the committed/consuming segments, but full scan from the
// uploaded segments:
// - Committed segments have sorted index
// - Consuming segments have inverted index
// - Uploaded segments have neither of them
String query = "SELECT COUNT(*) FROM myTable WHERE Carrier = 'DL'";
JsonNode response = postQuery(query);
long numEntriesScannedInFilter = response.get("numEntriesScannedInFilter").asLong();
long numDocsInUploadedSegments = super.getCountStarResult();
assertEquals(numEntriesScannedInFilter, numDocsInUploadedSegments);

// For range query, there should be no scan from the committed segments, but full scan from the uploaded/consuming
// segments:
// - Committed segments have sorted index
// - Consuming/Uploaded segments do not have sorted index
query = "SELECT COUNT(*) FROM myTable WHERE Carrier > 'DL'";
response = postQuery(query);
numEntriesScannedInFilter = response.get("numEntriesScannedInFilter").asLong();
// NOTE: If this test is running after force commit test, there will be no records in consuming segments
assertTrue(numEntriesScannedInFilter >= numDocsInUploadedSegments);
assertTrue(numEntriesScannedInFilter < 2 * numDocsInUploadedSegments);
}

@Test(dataProvider = "useBothQueryEngines")
public void testAddRemoveDictionaryAndInvertedIndex(boolean useMultiStageQueryEngine)
throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1324,10 +1324,10 @@ public void testForwardIndexTriggering()
throws Exception {
String column = "DestCityName";
JsonNode columnIndexSize = getColumnIndexSize(column);
assertTrue(columnIndexSize.has("dictionary"));
assertTrue(columnIndexSize.has("forward_index"));
double dictionarySize = columnIndexSize.get("dictionary").asDouble();
double forwardIndexSize = columnIndexSize.get("forward_index").asDouble();
assertTrue(columnIndexSize.has(StandardIndexes.DICTIONARY_ID));
assertTrue(columnIndexSize.has(StandardIndexes.FORWARD_ID));
double dictionarySize = columnIndexSize.get(StandardIndexes.DICTIONARY_ID).asDouble();
double forwardIndexSize = columnIndexSize.get(StandardIndexes.FORWARD_ID).asDouble();

// Convert 'DestCityName' to raw index
TableConfig tableConfig = getOfflineTableConfig();
Expand All @@ -1339,9 +1339,9 @@ public void testForwardIndexTriggering()
long numTotalDocs = getCountStarResult();
reloadAllSegments(SELECT_STAR_QUERY, false, numTotalDocs);
columnIndexSize = getColumnIndexSize(column);
assertFalse(columnIndexSize.has("dictionary"));
assertTrue(columnIndexSize.has("forward_index"));
double v2rawIndexSize = columnIndexSize.get("forward_index").asDouble();
assertFalse(columnIndexSize.has(StandardIndexes.DICTIONARY_ID));
assertTrue(columnIndexSize.has(StandardIndexes.FORWARD_ID));
double v2rawIndexSize = columnIndexSize.get(StandardIndexes.FORWARD_ID).asDouble();
assertTrue(v2rawIndexSize > forwardIndexSize);

// NOTE: Currently Pinot doesn't support directly changing raw index version, so we need to first reset it back to
Expand All @@ -1361,9 +1361,9 @@ public void testForwardIndexTriggering()
updateTableConfig(tableConfig);
reloadAllSegments(SELECT_STAR_QUERY, false, numTotalDocs);
columnIndexSize = getColumnIndexSize(column);
assertFalse(columnIndexSize.has("dictionary"));
assertTrue(columnIndexSize.has("forward_index"));
double v4RawIndexSize = columnIndexSize.get("forward_index").asDouble();
assertFalse(columnIndexSize.has(StandardIndexes.DICTIONARY_ID));
assertTrue(columnIndexSize.has(StandardIndexes.FORWARD_ID));
double v4RawIndexSize = columnIndexSize.get(StandardIndexes.FORWARD_ID).asDouble();
assertTrue(v4RawIndexSize < v2rawIndexSize && v4RawIndexSize > forwardIndexSize);

// Convert 'DestCityName' to SNAPPY compression
Expand All @@ -1377,19 +1377,19 @@ public void testForwardIndexTriggering()
updateTableConfig(tableConfig);
reloadAllSegments(SELECT_STAR_QUERY, false, numTotalDocs);
columnIndexSize = getColumnIndexSize(column);
assertFalse(columnIndexSize.has("dictionary"));
assertTrue(columnIndexSize.has("forward_index"));
double v4SnappyRawIndexSize = columnIndexSize.get("forward_index").asDouble();
assertFalse(columnIndexSize.has(StandardIndexes.DICTIONARY_ID));
assertTrue(columnIndexSize.has(StandardIndexes.FORWARD_ID));
double v4SnappyRawIndexSize = columnIndexSize.get(StandardIndexes.FORWARD_ID).asDouble();
assertTrue(v4SnappyRawIndexSize > v2rawIndexSize);

// Removing FieldConfig should be no-op because compression is not explicitly set
fieldConfigs.remove(fieldConfigs.size() - 1);
updateTableConfig(tableConfig);
reloadAllSegments(SELECT_STAR_QUERY, false, numTotalDocs);
columnIndexSize = getColumnIndexSize(column);
assertFalse(columnIndexSize.has("dictionary"));
assertTrue(columnIndexSize.has("forward_index"));
assertEquals(columnIndexSize.get("forward_index").asDouble(), v4SnappyRawIndexSize);
assertFalse(columnIndexSize.has(StandardIndexes.DICTIONARY_ID));
assertTrue(columnIndexSize.has(StandardIndexes.FORWARD_ID));
assertEquals(columnIndexSize.get(StandardIndexes.FORWARD_ID).asDouble(), v4SnappyRawIndexSize);

// Adding 'LZ4' compression explicitly should trigger the conversion
forwardIndexConfig = new ForwardIndexConfig.Builder().withCompressionCodec(CompressionCodec.LZ4).build();
Expand All @@ -1400,28 +1400,21 @@ public void testForwardIndexTriggering()
updateTableConfig(tableConfig);
reloadAllSegments(SELECT_STAR_QUERY, false, numTotalDocs);
columnIndexSize = getColumnIndexSize(column);
assertFalse(columnIndexSize.has("dictionary"));
assertTrue(columnIndexSize.has("forward_index"));
assertEquals(columnIndexSize.get("forward_index").asDouble(), v2rawIndexSize);
assertFalse(columnIndexSize.has(StandardIndexes.DICTIONARY_ID));
assertTrue(columnIndexSize.has(StandardIndexes.FORWARD_ID));
assertEquals(columnIndexSize.get(StandardIndexes.FORWARD_ID).asDouble(), v2rawIndexSize);

resetForwardIndex(dictionarySize, forwardIndexSize);
}

private JsonNode getColumnIndexSize(String column)
throws Exception {
return JsonUtils.stringToJsonNode(
sendGetRequest(_controllerRequestURLBuilder.forTableAggregateMetadata(getTableName(), List.of(column))))
.get("columnIndexSizeMap").get(column);
}

private void resetForwardIndex(double expectedDictionarySize, double expectedForwardIndexSize)
throws Exception {
TableConfig tableConfig = createOfflineTableConfig();
updateTableConfig(tableConfig);
reloadAllSegments(SELECT_STAR_QUERY, false, getCountStarResult());
JsonNode columnIndexSize = getColumnIndexSize("DestCityName");
assertEquals(columnIndexSize.get("dictionary").asDouble(), expectedDictionarySize);
assertEquals(columnIndexSize.get("forward_index").asDouble(), expectedForwardIndexSize);
assertEquals(columnIndexSize.get(StandardIndexes.DICTIONARY_ID).asDouble(), expectedDictionarySize);
assertEquals(columnIndexSize.get(StandardIndexes.FORWARD_ID).asDouble(), expectedForwardIndexSize);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,6 @@ public RealtimeSegmentConverter(MutableSegmentImpl realtimeSegment, SegmentZKPro
_segmentZKPropsConfig = segmentZKPropsConfig;
_outputPath = outputPath;
_columnIndicesForRealtimeTable = cdc;
if (cdc.getSortedColumn() != null) {
_columnIndicesForRealtimeTable.getInvertedIndexColumns().remove(cdc.getSortedColumn());
}
_dataSchema = getUpdatedSchema(schema);
_tableName = tableName;
_tableConfig = tableConfig;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,16 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.common.utils.HashUtil;
import org.apache.pinot.segment.local.dedup.PartitionDedupMetadataManager;
import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
import org.apache.pinot.segment.local.upsert.PartitionUpsertMetadataManager;
import org.apache.pinot.segment.spi.index.FieldIndexConfigs;
import org.apache.pinot.segment.spi.index.FieldIndexConfigsUtil;
import org.apache.pinot.segment.spi.index.IndexType;
import org.apache.pinot.segment.spi.index.StandardIndexes;
import org.apache.pinot.segment.spi.memory.PinotDataBufferMemoryManager;
import org.apache.pinot.segment.spi.partition.PartitionFunction;
import org.apache.pinot.spi.config.table.FieldConfig;
Expand Down Expand Up @@ -279,18 +281,29 @@ public Builder() {
}

public Builder(IndexLoadingConfig indexLoadingConfig) {
this(indexLoadingConfig.getFieldIndexConfigByColName());
this(indexLoadingConfig.getFieldIndexConfigByColName(), indexLoadingConfig.getSortedColumns());
}

public Builder(TableConfig tableConfig, Schema schema) {
this(FieldIndexConfigsUtil.createIndexConfigsByColName(tableConfig, schema));
this(FieldIndexConfigsUtil.createIndexConfigsByColName(tableConfig, schema),
tableConfig.getIndexingConfig().getSortedColumn());
}

public Builder(Map<String, FieldIndexConfigs> indexConfigsByColName) {
_indexConfigByCol = new HashMap<>(HashUtil.getHashMapCapacity(indexConfigsByColName.size()));
public Builder(Map<String, FieldIndexConfigs> indexConfigsByColName, @Nullable List<String> sortedColumns) {
_indexConfigByCol = Maps.newHashMapWithExpectedSize(indexConfigsByColName.size());
for (Map.Entry<String, FieldIndexConfigs> entry : indexConfigsByColName.entrySet()) {
_indexConfigByCol.put(entry.getKey(), new FieldIndexConfigs.Builder(entry.getValue()));
}
// Add inverted index to sorted column for 2 reasons:
// 1. Since sorted index doesn't apply to mutable segment, add inverted index to get better performance
// 2. When converting mutable segment to immutable segment, we use sorted column's inverted index to accelerate
// the index creation
if (CollectionUtils.isNotEmpty(sortedColumns)) {
String sortedColumn = sortedColumns.get(0);
FieldIndexConfigs.Builder builder =
_indexConfigByCol.computeIfAbsent(sortedColumn, k -> new FieldIndexConfigs.Builder());
builder.add(StandardIndexes.inverted(), new IndexConfig(false));
}
}

public Builder setTableNameWithType(String tableNameWithType) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.pinot.segment.local.segment.index.loader;

import com.google.common.annotations.VisibleForTesting;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -462,13 +461,6 @@ public void setInvertedIndexColumns(Set<String> invertedIndexColumns) {
_dirty = true;
}

@Deprecated
@VisibleForTesting
public void addInvertedIndexColumns(String... invertedIndexColumns) {
_invertedIndexColumns.addAll(Arrays.asList(invertedIndexColumns));
_dirty = true;
}

@Deprecated
@VisibleForTesting
public void addNoDictionaryColumns(Collection<String> noDictionaryColumns) {
Expand Down

0 comments on commit 2d007f2

Please sign in to comment.