Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -932,6 +932,19 @@ public Object getValue(int docId, String column) {
}
}

/**
* Calls commit() on all mutable indexes. This is used in preparation for realtime segment conversion.
* .commit() can be implemented per index to perform any required actions before using mutable segment
* artifacts to optimize immutable segment build.
*/
public void commit() {
for (IndexContainer indexContainer : _indexContainerMap.values()) {
for (MutableIndex mutableIndex : indexContainer._mutableIndexes.values()) {
mutableIndex.commit();
}
}
}

@Override
public void destroy() {
_logger.info("Trying to close RealtimeSegmentImpl : {}", _segmentName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,9 @@ public void build(@Nullable SegmentVersion segmentVersion, ServerMetrics serverM
genConfig.setNullHandlingEnabled(_nullHandlingEnabled);
genConfig.setSegmentZKPropsConfig(_segmentZKPropsConfig);

// flush any artifacts to disk to improve mutable to immutable segment conversion
_realtimeSegmentImpl.commit();

SegmentIndexCreationDriverImpl driver = new SegmentIndexCreationDriverImpl();
try (PinotSegmentRecordReader recordReader = new PinotSegmentRecordReader()) {
int[] sortedDocIds = _columnIndicesForRealtimeTable.getSortedColumn() != null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public RealtimeLuceneTextIndex(String column, File segmentIndexDir, String segme
// for realtime
_indexCreator =
new LuceneTextIndexCreator(column, new File(segmentIndexDir.getAbsolutePath() + "/" + segmentName),
false /* commitOnClose */, config);
false /* commitOnClose */, true, null, config);
IndexWriter indexWriter = _indexCreator.getIndexWriter();
_searcherManager = new SearcherManager(indexWriter, false, false, null);
_analyzer = _indexCreator.getIndexWriter().getConfig().getAnalyzer();
Expand Down Expand Up @@ -181,6 +181,17 @@ private MutableRoaringBitmap getPinotDocIds(IndexSearcher indexSearcher, Mutable
return actualDocIDs;
}

@Override
public void commit() {
try {
_indexCreator.getIndexWriter().commit();
} catch (Exception e) {
LOGGER.error("Failed to commit the realtime lucene text index for column {}, exception {}", _column,
e.getMessage());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not just log the error e?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

didn't quite follow this, can you elaborate?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I meant LOGGER.err("Failed.. .", _column, e);

throw new RuntimeException(e);
}
}

@Override
public void close() {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,8 @@ public class SegmentColumnarIndexCreator implements SegmentCreator {

@Override
public void init(SegmentGeneratorConfig segmentCreationSpec, SegmentIndexCreationInfo segmentIndexCreationInfo,
TreeMap<String, ColumnIndexCreationInfo> indexCreationInfoMap, Schema schema, File outDir)
TreeMap<String, ColumnIndexCreationInfo> indexCreationInfoMap, Schema schema, File outDir,
@Nullable int[] immutableToMutableIdMap)
throws Exception {
_docIdCounter = 0;
_config = segmentCreationSpec;
Expand Down Expand Up @@ -158,6 +159,8 @@ public void init(SegmentGeneratorConfig segmentCreationSpec, SegmentIndexCreatio
.onHeap(segmentCreationSpec.isOnHeap())
.withForwardIndexDisabled(forwardIndexDisabled)
.withTextCommitOnClose(true)
.withImmutableToMutableIdMap(immutableToMutableIdMap)
.withRealtimeConversion(segmentCreationSpec.isRealtimeConversion())
.build();
//@formatter:on

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import javax.annotation.Nullable;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.io.FileUtils;
import org.apache.pinot.segment.local.realtime.converter.stats.RealtimeSegmentSegmentCreationDataSource;
import org.apache.pinot.segment.local.recordtransformer.ComplexTypeTransformer;
import org.apache.pinot.segment.local.recordtransformer.RecordTransformer;
import org.apache.pinot.segment.local.segment.creator.RecordReaderSegmentCreationDataSource;
Expand Down Expand Up @@ -191,6 +192,11 @@ public void init(SegmentGeneratorConfig config, SegmentCreationDataSource dataSo
((RecordReaderSegmentCreationDataSource) dataSource).setTransformPipeline(transformPipeline);
}

// Optimization for realtime segment conversion
if (dataSource instanceof RealtimeSegmentSegmentCreationDataSource) {
_config.setRealtimeConversion(true);
}

// Initialize stats collection
_segmentStats = dataSource.gatherStats(
new StatsCollectorConfig(config.getTableConfig(), _dataSchema, config.getSegmentPartitionConfig()));
Expand Down Expand Up @@ -218,6 +224,23 @@ public void init(SegmentGeneratorConfig config, SegmentCreationDataSource dataSo
LOGGER.debug("tempIndexDir:{}", _tempIndexDir);
}

/**
* Generate a mutable docId to immutable docId mapping from the sortedDocIds iteration order
*
* @param sortedDocIds used to map sortedDocIds[immutableId] = mutableId (based on RecordReader iteration order)
* @return int[] used to map output[mutableId] = immutableId, or null if sortedDocIds is null
*/
private int[] getImmutableToMutableIdMap(@Nullable int[] sortedDocIds) {
if (sortedDocIds == null) {
return null;
}
int[] res = new int[sortedDocIds.length];
for (int i = 0; i < res.length; i++) {
res[sortedDocIds[i]] = i;
}
return res;
}

@Override
public void build()
throws Exception {
Expand All @@ -229,10 +252,19 @@ public void build()

int incompleteRowsFound = 0;
try {
// TODO: Eventually pull the doc Id sorting logic out of Record Reader so that all row oriented logic can be
// removed from this code.
int[] immutableToMutableIdMap = null;
if (_recordReader instanceof PinotSegmentRecordReader) {
immutableToMutableIdMap =
getImmutableToMutableIdMap(((PinotSegmentRecordReader) _recordReader).getSortedDocIds());
}

// Initialize the index creation using the per-column statistics information
// TODO: _indexCreationInfoMap holds the reference to all unique values on heap (ColumnIndexCreationInfo ->
// ColumnStatistics) throughout the segment creation. Find a way to release the memory early.
_indexCreator.init(_config, _segmentIndexCreationInfo, _indexCreationInfoMap, _dataSchema, _tempIndexDir);
_indexCreator.init(_config, _segmentIndexCreationInfo, _indexCreationInfoMap, _dataSchema, _tempIndexDir,
immutableToMutableIdMap);

// Build the index
_recordReader.rewind();
Expand Down Expand Up @@ -299,19 +331,22 @@ public void buildByColumn(IndexSegment indexSegment)
LOGGER.info("Collected stats for {} documents", _totalDocs);

try {
// TODO: Eventually pull the doc Id sorting logic out of Record Reader so that all row oriented logic can be
// removed from this code.
int[] sortedDocIds = ((PinotSegmentRecordReader) _recordReader).getSortedDocIds();
int[] immutableToMutableIdMap = getImmutableToMutableIdMap(sortedDocIds);

// Initialize the index creation using the per-column statistics information
// TODO: _indexCreationInfoMap holds the reference to all unique values on heap (ColumnIndexCreationInfo ->
// ColumnStatistics) throughout the segment creation. Find a way to release the memory early.
_indexCreator.init(_config, _segmentIndexCreationInfo, _indexCreationInfoMap, _dataSchema, _tempIndexDir);
_indexCreator.init(_config, _segmentIndexCreationInfo, _indexCreationInfoMap, _dataSchema, _tempIndexDir,
immutableToMutableIdMap);

// Build the indexes
LOGGER.info("Start building Index by column");

TreeSet<String> columns = _dataSchema.getPhysicalColumnNames();

// TODO: Eventually pull the doc Id sorting logic out of Record Reader so that all row oriented logic can be
// removed from this code.
int[] sortedDocIds = ((PinotSegmentRecordReader) _recordReader).getSortedDocIds();
for (String col : columns) {
_indexCreator.indexColumn(col, sortedDocIds, indexSegment);
}
Expand Down
Loading