diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java index ecf5cb12cd8..bed8f2a3103 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java @@ -118,6 +118,7 @@ /** * Segment data manager for low level consumer realtime segments, which manages consumption and segment completion. */ +@SuppressWarnings("jol") public class RealtimeSegmentDataManager extends SegmentDataManager { @VisibleForTesting @@ -237,7 +238,6 @@ public void deleteSegmentFile() { private final StreamDataDecoder _streamDataDecoder; private final int _segmentMaxRowCount; private final String _resourceDataDir; - private final IndexLoadingConfig _indexLoadingConfig; private final Schema _schema; // Semaphore for each partitionGroupId only, which is to prevent two different stream consumers // from consuming with the same partitionGroupId in parallel in the same host. @@ -1446,7 +1446,6 @@ public RealtimeSegmentDataManager(SegmentZKMetadata segmentZKMetadata, TableConf _tableNameWithType = _tableConfig.getTableName(); _realtimeTableDataManager = realtimeTableDataManager; _resourceDataDir = resourceDataDir; - _indexLoadingConfig = indexLoadingConfig; _schema = schema; _serverMetrics = serverMetrics; _partitionUpsertMetadataManager = partitionUpsertMetadataManager; @@ -1478,7 +1477,7 @@ public RealtimeSegmentDataManager(SegmentZKMetadata segmentZKMetadata, TableConf _segmentZKMetadata.getStatus().toString()); _partitionGroupConsumerSemaphore = partitionGroupConsumerSemaphore; _acquiredConsumerSemaphore = new AtomicBoolean(false); - InstanceDataManagerConfig instanceDataManagerConfig = _indexLoadingConfig.getInstanceDataManagerConfig(); + InstanceDataManagerConfig instanceDataManagerConfig = indexLoadingConfig.getInstanceDataManagerConfig(); String clientIdSuffix = instanceDataManagerConfig != null ? instanceDataManagerConfig.getConsumerClientIdSuffix() : null; if (StringUtils.isNotBlank(clientIdSuffix)) { @@ -1488,7 +1487,7 @@ public RealtimeSegmentDataManager(SegmentZKMetadata segmentZKMetadata, TableConf } _segmentLogger = LoggerFactory.getLogger(RealtimeSegmentDataManager.class.getName() + "_" + _segmentNameStr); _tableStreamName = _tableNameWithType + "_" + streamTopic; - if (_indexLoadingConfig.isRealtimeOffHeapAllocation() && !_indexLoadingConfig.isDirectRealtimeOffHeapAllocation()) { + if (indexLoadingConfig.isRealtimeOffHeapAllocation() && !indexLoadingConfig.isDirectRealtimeOffHeapAllocation()) { _memoryManager = new MmapMemoryManager(_realtimeTableDataManager.getConsumerDir(), _segmentNameStr, _serverMetrics); } else { @@ -1526,13 +1525,6 @@ public RealtimeSegmentDataManager(SegmentZKMetadata segmentZKMetadata, TableConf sortedColumn = null; } } - // Inverted index columns - // We need to add sorted column into inverted index columns because when we convert realtime in memory segment into - // offline segment, we use sorted column's inverted index to maintain the order of the records so that the records - // are sorted on the sorted column. - if (sortedColumn != null) { - indexLoadingConfig.addInvertedIndexColumns(sortedColumn); - } // Read the max number of rows int segmentMaxRowCount = segmentZKMetadata.getSizeThresholdToFlushSegment(); diff --git a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java index 591a0308dd0..ffe846cf9c6 100644 --- a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java +++ b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java @@ -759,4 +759,11 @@ protected T getCellValue(JsonNode jsonNode, int colIndex, int rowIndex, Func protected long getLongCellValue(JsonNode jsonNode, int colIndex, int rowIndex) { return getCellValue(jsonNode, colIndex, rowIndex, JsonNode::asLong).longValue(); } + + protected JsonNode getColumnIndexSize(String column) + throws Exception { + return JsonUtils.stringToJsonNode( + sendGetRequest(_controllerRequestURLBuilder.forTableAggregateMetadata(getTableName(), List.of(column)))) + .get("columnIndexSizeMap").get(column); + } } diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeClusterIntegrationTest.java index 31dd2d26a3e..78b34fc5633 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeClusterIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeClusterIntegrationTest.java @@ -49,6 +49,7 @@ import org.apache.pinot.plugin.stream.kafka.KafkaMessageBatch; import org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory; import org.apache.pinot.plugin.stream.kafka20.KafkaPartitionLevelConsumer; +import org.apache.pinot.segment.spi.index.StandardIndexes; import org.apache.pinot.spi.config.table.IndexingConfig; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.config.table.TableType; @@ -73,6 +74,7 @@ import org.testng.annotations.Test; import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertTrue; @@ -321,6 +323,37 @@ public void testReload() testReload(false); } + @Test + public void testSortedColumn() + throws Exception { + // There should be no inverted index or range index sealed because the sorted column is not configured with them + JsonNode columnIndexSize = getColumnIndexSize(getSortedColumn()); + assertFalse(columnIndexSize.has(StandardIndexes.INVERTED_ID)); + assertFalse(columnIndexSize.has(StandardIndexes.RANGE_ID)); + + // For point lookup query, there should be no scan from the committed/consuming segments, but full scan from the + // uploaded segments: + // - Committed segments have sorted index + // - Consuming segments have inverted index + // - Uploaded segments have neither of them + String query = "SELECT COUNT(*) FROM myTable WHERE Carrier = 'DL'"; + JsonNode response = postQuery(query); + long numEntriesScannedInFilter = response.get("numEntriesScannedInFilter").asLong(); + long numDocsInUploadedSegments = super.getCountStarResult(); + assertEquals(numEntriesScannedInFilter, numDocsInUploadedSegments); + + // For range query, there should be no scan from the committed segments, but full scan from the uploaded/consuming + // segments: + // - Committed segments have sorted index + // - Consuming/Uploaded segments do not have sorted index + query = "SELECT COUNT(*) FROM myTable WHERE Carrier > 'DL'"; + response = postQuery(query); + numEntriesScannedInFilter = response.get("numEntriesScannedInFilter").asLong(); + // NOTE: If this test is running after force commit test, there will be no records in consuming segments + assertTrue(numEntriesScannedInFilter >= numDocsInUploadedSegments); + assertTrue(numEntriesScannedInFilter < 2 * numDocsInUploadedSegments); + } + @Test(dataProvider = "useBothQueryEngines") public void testAddRemoveDictionaryAndInvertedIndex(boolean useMultiStageQueryEngine) throws Exception { diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java index 97f39f66a4d..25a75352f72 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java @@ -1324,10 +1324,10 @@ public void testForwardIndexTriggering() throws Exception { String column = "DestCityName"; JsonNode columnIndexSize = getColumnIndexSize(column); - assertTrue(columnIndexSize.has("dictionary")); - assertTrue(columnIndexSize.has("forward_index")); - double dictionarySize = columnIndexSize.get("dictionary").asDouble(); - double forwardIndexSize = columnIndexSize.get("forward_index").asDouble(); + assertTrue(columnIndexSize.has(StandardIndexes.DICTIONARY_ID)); + assertTrue(columnIndexSize.has(StandardIndexes.FORWARD_ID)); + double dictionarySize = columnIndexSize.get(StandardIndexes.DICTIONARY_ID).asDouble(); + double forwardIndexSize = columnIndexSize.get(StandardIndexes.FORWARD_ID).asDouble(); // Convert 'DestCityName' to raw index TableConfig tableConfig = getOfflineTableConfig(); @@ -1339,9 +1339,9 @@ public void testForwardIndexTriggering() long numTotalDocs = getCountStarResult(); reloadAllSegments(SELECT_STAR_QUERY, false, numTotalDocs); columnIndexSize = getColumnIndexSize(column); - assertFalse(columnIndexSize.has("dictionary")); - assertTrue(columnIndexSize.has("forward_index")); - double v2rawIndexSize = columnIndexSize.get("forward_index").asDouble(); + assertFalse(columnIndexSize.has(StandardIndexes.DICTIONARY_ID)); + assertTrue(columnIndexSize.has(StandardIndexes.FORWARD_ID)); + double v2rawIndexSize = columnIndexSize.get(StandardIndexes.FORWARD_ID).asDouble(); assertTrue(v2rawIndexSize > forwardIndexSize); // NOTE: Currently Pinot doesn't support directly changing raw index version, so we need to first reset it back to @@ -1361,9 +1361,9 @@ public void testForwardIndexTriggering() updateTableConfig(tableConfig); reloadAllSegments(SELECT_STAR_QUERY, false, numTotalDocs); columnIndexSize = getColumnIndexSize(column); - assertFalse(columnIndexSize.has("dictionary")); - assertTrue(columnIndexSize.has("forward_index")); - double v4RawIndexSize = columnIndexSize.get("forward_index").asDouble(); + assertFalse(columnIndexSize.has(StandardIndexes.DICTIONARY_ID)); + assertTrue(columnIndexSize.has(StandardIndexes.FORWARD_ID)); + double v4RawIndexSize = columnIndexSize.get(StandardIndexes.FORWARD_ID).asDouble(); assertTrue(v4RawIndexSize < v2rawIndexSize && v4RawIndexSize > forwardIndexSize); // Convert 'DestCityName' to SNAPPY compression @@ -1377,9 +1377,9 @@ public void testForwardIndexTriggering() updateTableConfig(tableConfig); reloadAllSegments(SELECT_STAR_QUERY, false, numTotalDocs); columnIndexSize = getColumnIndexSize(column); - assertFalse(columnIndexSize.has("dictionary")); - assertTrue(columnIndexSize.has("forward_index")); - double v4SnappyRawIndexSize = columnIndexSize.get("forward_index").asDouble(); + assertFalse(columnIndexSize.has(StandardIndexes.DICTIONARY_ID)); + assertTrue(columnIndexSize.has(StandardIndexes.FORWARD_ID)); + double v4SnappyRawIndexSize = columnIndexSize.get(StandardIndexes.FORWARD_ID).asDouble(); assertTrue(v4SnappyRawIndexSize > v2rawIndexSize); // Removing FieldConfig should be no-op because compression is not explicitly set @@ -1387,9 +1387,9 @@ public void testForwardIndexTriggering() updateTableConfig(tableConfig); reloadAllSegments(SELECT_STAR_QUERY, false, numTotalDocs); columnIndexSize = getColumnIndexSize(column); - assertFalse(columnIndexSize.has("dictionary")); - assertTrue(columnIndexSize.has("forward_index")); - assertEquals(columnIndexSize.get("forward_index").asDouble(), v4SnappyRawIndexSize); + assertFalse(columnIndexSize.has(StandardIndexes.DICTIONARY_ID)); + assertTrue(columnIndexSize.has(StandardIndexes.FORWARD_ID)); + assertEquals(columnIndexSize.get(StandardIndexes.FORWARD_ID).asDouble(), v4SnappyRawIndexSize); // Adding 'LZ4' compression explicitly should trigger the conversion forwardIndexConfig = new ForwardIndexConfig.Builder().withCompressionCodec(CompressionCodec.LZ4).build(); @@ -1400,28 +1400,21 @@ public void testForwardIndexTriggering() updateTableConfig(tableConfig); reloadAllSegments(SELECT_STAR_QUERY, false, numTotalDocs); columnIndexSize = getColumnIndexSize(column); - assertFalse(columnIndexSize.has("dictionary")); - assertTrue(columnIndexSize.has("forward_index")); - assertEquals(columnIndexSize.get("forward_index").asDouble(), v2rawIndexSize); + assertFalse(columnIndexSize.has(StandardIndexes.DICTIONARY_ID)); + assertTrue(columnIndexSize.has(StandardIndexes.FORWARD_ID)); + assertEquals(columnIndexSize.get(StandardIndexes.FORWARD_ID).asDouble(), v2rawIndexSize); resetForwardIndex(dictionarySize, forwardIndexSize); } - private JsonNode getColumnIndexSize(String column) - throws Exception { - return JsonUtils.stringToJsonNode( - sendGetRequest(_controllerRequestURLBuilder.forTableAggregateMetadata(getTableName(), List.of(column)))) - .get("columnIndexSizeMap").get(column); - } - private void resetForwardIndex(double expectedDictionarySize, double expectedForwardIndexSize) throws Exception { TableConfig tableConfig = createOfflineTableConfig(); updateTableConfig(tableConfig); reloadAllSegments(SELECT_STAR_QUERY, false, getCountStarResult()); JsonNode columnIndexSize = getColumnIndexSize("DestCityName"); - assertEquals(columnIndexSize.get("dictionary").asDouble(), expectedDictionarySize); - assertEquals(columnIndexSize.get("forward_index").asDouble(), expectedForwardIndexSize); + assertEquals(columnIndexSize.get(StandardIndexes.DICTIONARY_ID).asDouble(), expectedDictionarySize); + assertEquals(columnIndexSize.get(StandardIndexes.FORWARD_ID).asDouble(), expectedForwardIndexSize); } /** diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/converter/RealtimeSegmentConverter.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/converter/RealtimeSegmentConverter.java index 2082f356229..65c69682f01 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/converter/RealtimeSegmentConverter.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/converter/RealtimeSegmentConverter.java @@ -63,9 +63,6 @@ public RealtimeSegmentConverter(MutableSegmentImpl realtimeSegment, SegmentZKPro _segmentZKPropsConfig = segmentZKPropsConfig; _outputPath = outputPath; _columnIndicesForRealtimeTable = cdc; - if (cdc.getSortedColumn() != null) { - _columnIndicesForRealtimeTable.getInvertedIndexColumns().remove(cdc.getSortedColumn()); - } _dataSchema = getUpdatedSchema(schema); _tableName = tableName; _tableConfig = tableConfig; diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/RealtimeSegmentConfig.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/RealtimeSegmentConfig.java index 8d196eb6453..5b3aeb26d53 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/RealtimeSegmentConfig.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/RealtimeSegmentConfig.java @@ -23,14 +23,16 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import javax.annotation.Nullable; +import org.apache.commons.collections4.CollectionUtils; import org.apache.pinot.common.metadata.segment.SegmentZKMetadata; -import org.apache.pinot.common.utils.HashUtil; import org.apache.pinot.segment.local.dedup.PartitionDedupMetadataManager; import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig; import org.apache.pinot.segment.local.upsert.PartitionUpsertMetadataManager; import org.apache.pinot.segment.spi.index.FieldIndexConfigs; import org.apache.pinot.segment.spi.index.FieldIndexConfigsUtil; import org.apache.pinot.segment.spi.index.IndexType; +import org.apache.pinot.segment.spi.index.StandardIndexes; import org.apache.pinot.segment.spi.memory.PinotDataBufferMemoryManager; import org.apache.pinot.segment.spi.partition.PartitionFunction; import org.apache.pinot.spi.config.table.FieldConfig; @@ -279,18 +281,29 @@ public Builder() { } public Builder(IndexLoadingConfig indexLoadingConfig) { - this(indexLoadingConfig.getFieldIndexConfigByColName()); + this(indexLoadingConfig.getFieldIndexConfigByColName(), indexLoadingConfig.getSortedColumns()); } public Builder(TableConfig tableConfig, Schema schema) { - this(FieldIndexConfigsUtil.createIndexConfigsByColName(tableConfig, schema)); + this(FieldIndexConfigsUtil.createIndexConfigsByColName(tableConfig, schema), + tableConfig.getIndexingConfig().getSortedColumn()); } - public Builder(Map indexConfigsByColName) { - _indexConfigByCol = new HashMap<>(HashUtil.getHashMapCapacity(indexConfigsByColName.size())); + public Builder(Map indexConfigsByColName, @Nullable List sortedColumns) { + _indexConfigByCol = Maps.newHashMapWithExpectedSize(indexConfigsByColName.size()); for (Map.Entry entry : indexConfigsByColName.entrySet()) { _indexConfigByCol.put(entry.getKey(), new FieldIndexConfigs.Builder(entry.getValue())); } + // Add inverted index to sorted column for 2 reasons: + // 1. Since sorted index doesn't apply to mutable segment, add inverted index to get better performance + // 2. When converting mutable segment to immutable segment, we use sorted column's inverted index to accelerate + // the index creation + if (CollectionUtils.isNotEmpty(sortedColumns)) { + String sortedColumn = sortedColumns.get(0); + FieldIndexConfigs.Builder builder = + _indexConfigByCol.computeIfAbsent(sortedColumn, k -> new FieldIndexConfigs.Builder()); + builder.add(StandardIndexes.inverted(), new IndexConfig(false)); + } } public Builder setTableNameWithType(String tableNameWithType) { diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/IndexLoadingConfig.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/IndexLoadingConfig.java index c24c286000d..0684fe9097d 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/IndexLoadingConfig.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/IndexLoadingConfig.java @@ -19,7 +19,6 @@ package org.apache.pinot.segment.local.segment.index.loader; import com.google.common.annotations.VisibleForTesting; -import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -462,13 +461,6 @@ public void setInvertedIndexColumns(Set invertedIndexColumns) { _dirty = true; } - @Deprecated - @VisibleForTesting - public void addInvertedIndexColumns(String... invertedIndexColumns) { - _invertedIndexColumns.addAll(Arrays.asList(invertedIndexColumns)); - _dirty = true; - } - @Deprecated @VisibleForTesting public void addNoDictionaryColumns(Collection noDictionaryColumns) {