Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,13 @@ public void setUp()
Map<String, String> props = new HashMap<>();
props.put(FieldConfig.TEXT_INDEX_USE_AND_FOR_MULTI_TERM_QUERIES, "true");
columnProperties.put(SKILLS_TEXT_COL_MULTI_TERM_NAME, props);
props = new HashMap<>();
props.put(FieldConfig.TEXT_INDEX_STOP_WORD_INCLUDE_KEY, "coordinator");
props.put(FieldConfig.TEXT_INDEX_STOP_WORD_EXCLUDE_KEY, "it, those");
columnProperties.put(SKILLS_TEXT_COL_NAME, props);
props = new HashMap<>();
props.put(FieldConfig.TEXT_INDEX_STOP_WORD_EXCLUDE_KEY, "");
columnProperties.put(SKILLS_TEXT_COL_DICT_NAME, props);
indexLoadingConfig.setColumnProperties(columnProperties);
ImmutableSegment immutableSegment =
ImmutableSegmentLoader.load(new File(INDEX_DIR, SEGMENT_NAME), indexLoadingConfig);
Expand Down Expand Up @@ -193,6 +200,13 @@ private void buildSegment()
props.put(FieldConfig.TEXT_INDEX_NO_RAW_DATA, "true");
props.put(FieldConfig.TEXT_INDEX_RAW_VALUE, "ILoveCoding");
columnProperties.put(SKILLS_TEXT_NO_RAW_NAME, props);
props = new HashMap<>();
props.put(FieldConfig.TEXT_INDEX_STOP_WORD_INCLUDE_KEY, "coordinator");
props.put(FieldConfig.TEXT_INDEX_STOP_WORD_EXCLUDE_KEY, "it, those");
columnProperties.put(SKILLS_TEXT_COL_NAME, props);
props = new HashMap<>();
props.put(FieldConfig.TEXT_INDEX_STOP_WORD_EXCLUDE_KEY, "");
columnProperties.put(SKILLS_TEXT_COL_DICT_NAME, props);
config.setColumnProperties(columnProperties);
SegmentIndexCreationDriverImpl driver = new SegmentIndexCreationDriverImpl();
try (RecordReader recordReader = new GenericRowRecordReader(rows)) {
Expand All @@ -206,7 +220,7 @@ private List<GenericRow> createTestData()
List<GenericRow> rows = new ArrayList<>();

// read the skills file
String[] skills = new String[24];
String[] skills = new String[28];
List<String[]> multiValueStringList = new ArrayList<>();
int skillCount = 0;
try (BufferedReader reader = new BufferedReader(new InputStreamReader(
Expand All @@ -217,7 +231,7 @@ private List<GenericRow> createTestData()
multiValueStringList.add(StringUtils.splitByWholeSeparator(line, ", "));
}
}
assertEquals(skillCount, 24);
assertEquals(skillCount, 28);

// read the query log file (24k queries) and build dataset
int counter = 0;
Expand Down Expand Up @@ -1864,6 +1878,34 @@ public void testInterSegment() {
query = "SELECT count(*) FROM MyTable WHERE TEXT_MATCH(SKILLS_TEXT_COL_DICT, 'a and or in the are')";
testInterSegmentAggregationQueryHelper(query, 0);

// query with words excluded from default stop-words. they should be indexed
query = "SELECT count(*) FROM MyTable WHERE TEXT_MATCH(SKILLS_TEXT_COL, '\"IT support\" or \"IT manager\"')";
testInterSegmentAggregationQueryHelper(query, 8);

// query with words excluded from default stop-words. they should be indexed
query = "SELECT count(*) FROM MyTable WHERE TEXT_MATCH(SKILLS_TEXT_COL, '\"IT\"')";
testInterSegmentAggregationQueryHelper(query, 16);

// query without stop-words
query = "SELECT count(*) FROM MyTable WHERE TEXT_MATCH(SKILLS_TEXT_COL, '\"support\" or \"manager\"')";
testInterSegmentAggregationQueryHelper(query, 12);

// query without stop-words
query = "SELECT count(*) FROM MyTable WHERE TEXT_MATCH(SKILLS_TEXT_COL, '\"supporting\"')";
testInterSegmentAggregationQueryHelper(query, 4);

// query with included stop-words. they should not be indexed
query = "SELECT count(*) FROM MyTable WHERE TEXT_MATCH(SKILLS_TEXT_COL, 'coordinator')";
testInterSegmentAggregationQueryHelper(query, 0);

// query with default stop-words. they should not be indexed
query = "SELECT count(*) FROM MyTable WHERE TEXT_MATCH(SKILLS_TEXT_COL_DICT, '\"IT support\" or \"IT manager\"')";
testInterSegmentAggregationQueryHelper(query, 12);
query = "SELECT count(*) FROM MyTable WHERE TEXT_MATCH(SKILLS_TEXT_COL_DICT, '\"IT\"')";
testInterSegmentAggregationQueryHelper(query, 0);
query = "SELECT count(*) FROM MyTable WHERE TEXT_MATCH(SKILLS_TEXT_COL_DICT, '\"support\" or \"manager\"')";
testInterSegmentAggregationQueryHelper(query, 12);

// analyzer should prune/ignore the stop words from search expression and consider everything else for a match
query = "SELECT count(*) FROM MyTable WHERE TEXT_MATCH(SKILLS_TEXT_COL, '\"learned a lot\"')";
testInterSegmentAggregationQueryHelper(query, 4);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,8 @@ C++, Java, Python, realtime streaming systems, Machine learning, spark, Kubernet
Databases, columnar query processing, Apache Arrow, distributed systems, Machine learning, cluster management, docker image building and distribution
Database engine, OLAP systems, OLTP transaction processing at large scale, concurrency, multi-threading, GO, building large scale systems
GET /administrator/ HTTP/1.1 200 4263 - Mozilla/5.0 (Windows NT 6.0; rv:34.0) Gecko/20100101 Firefox/34.0 - NullPointerException
Foo worked in a lot of places and learned a lot of things
Foo worked in a lot of places and learned a lot of things
IT support, python, hardware debugging
IT manager, workspace coordinator
manager, coordinator, IT
IT supporting
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public class TextIndicesRealtimeClusterIntegrationTest extends BaseClusterIntegr
private static final String TEXT_COLUMN_NAME = "skills";
private static final String TEXT_COLUMN_NAME_NATIVE = "skills_native";
private static final String TIME_COLUMN_NAME = "millisSinceEpoch";
private static final int NUM_SKILLS = 24;
private static final int NUM_SKILLS = 28;
private static final int NUM_MATCHING_SKILLS = 4;
private static final int NUM_RECORDS = NUM_SKILLS * 1000;
private static final int NUM_MATCHING_RECORDS = NUM_MATCHING_SKILLS * 1000;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -319,12 +319,16 @@ public long getLatestIngestionTimestamp() {

// Text index
MutableTextIndex textIndex;
List<String> stopWordsInclude = null;
List<String> stopWordsExclude = null;
if (textIndexColumns.contains(column)) {
boolean useNativeTextIndex = false;
if (_fieldConfigList != null) {
for (FieldConfig fieldConfig : _fieldConfigList) {
if (fieldConfig.getName().equals(column)) {
Map<String, String> properties = fieldConfig.getProperties();
stopWordsInclude = TextIndexUtils.extractStopWordsInclude(properties);
stopWordsExclude = TextIndexUtils.extractStopWordsExclude(properties);
if (TextIndexUtils.isFstTypeNative(properties)) {
useNativeTextIndex = true;
}
Expand All @@ -340,7 +344,8 @@ public long getLatestIngestionTimestamp() {
// it is beyond the scope of realtime index pluggability to do this refactoring, so realtime
// text indexes remain statically defined. Revisit this after this refactoring has been done.
RealtimeLuceneTextIndex luceneTextIndex =
new RealtimeLuceneTextIndex(column, new File(config.getConsumerDir()), _segmentName);
new RealtimeLuceneTextIndex(column, new File(config.getConsumerDir()), _segmentName,
stopWordsInclude, stopWordsExclude);
if (_realtimeLuceneReaders == null) {
_realtimeLuceneReaders = new RealtimeLuceneIndexRefreshState.RealtimeLuceneReaders(_segmentName);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.pinot.segment.local.realtime.impl.invertedindex;

import java.io.File;
import java.util.List;
import org.apache.lucene.analysis.standard.StandardAnalyzer;
import org.apache.lucene.document.Document;
import org.apache.lucene.index.IndexWriter;
Expand Down Expand Up @@ -57,8 +58,11 @@ public class RealtimeLuceneTextIndex implements MutableTextIndex {
* @param column column name
* @param segmentIndexDir realtime segment consumer dir
* @param segmentName realtime segment name
* @param stopWordsInclude the words to include in addition to the default stop word list
* @param stopWordsExclude stop words to exclude from default stop words
*/
public RealtimeLuceneTextIndex(String column, File segmentIndexDir, String segmentName) {
public RealtimeLuceneTextIndex(String column, File segmentIndexDir, String segmentName,
List<String> stopWordsInclude, List<String> stopWordsExclude) {
_column = column;
_segmentName = segmentName;
try {
Expand All @@ -72,7 +76,7 @@ public RealtimeLuceneTextIndex(String column, File segmentIndexDir, String segme
// for realtime
_indexCreator =
new LuceneTextIndexCreator(column, new File(segmentIndexDir.getAbsolutePath() + "/" + segmentName),
false /* commitOnClose */);
false /* commitOnClose */, stopWordsInclude, stopWordsExclude);
IndexWriter indexWriter = _indexCreator.getIndexWriter();
_searcherManager = new SearcherManager(indexWriter, false, false, null);
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ public TextIndexCreator newTextIndexCreator(IndexCreationContext.Text context)
return new NativeTextIndexCreator(context.getFieldSpec().getName(), context.getIndexDir());
} else {
return new LuceneTextIndexCreator(context.getFieldSpec().getName(), context.getIndexDir(),
context.isCommitOnClose());
context.isCommitOnClose(), context.getStopWordsInclude(), context.getStopWordsExclude());
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,9 @@ public void init(SegmentGeneratorConfig segmentCreationSpec, SegmentIndexCreatio
}
}
_textIndexCreatorMap.put(columnName,
_indexCreatorProvider.newTextIndexCreator(context.forTextIndex(fstType, true)));
_indexCreatorProvider.newTextIndexCreator(context.forTextIndex(fstType, true,
TextIndexUtils.extractStopWordsInclude(columnName, _columnProperties),
TextIndexUtils.extractStopWordsExclude(columnName, _columnProperties))));
}

if (fstIndexColumns.contains(columnName)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@
import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import javax.annotation.Nullable;
import org.apache.lucene.analysis.CharArraySet;
import org.apache.lucene.analysis.standard.StandardAnalyzer;
import org.apache.lucene.document.Document;
Expand All @@ -33,6 +36,7 @@
import org.apache.lucene.store.FSDirectory;
import org.apache.pinot.segment.local.realtime.impl.invertedindex.RealtimeLuceneTextIndex;
import org.apache.pinot.segment.local.segment.creator.impl.SegmentColumnarIndexCreator;
import org.apache.pinot.segment.local.segment.store.TextIndexUtils;
import org.apache.pinot.segment.spi.V1Constants;
import org.apache.pinot.segment.spi.index.creator.DictionaryBasedInvertedIndexCreator;
import org.apache.pinot.segment.spi.index.creator.TextIndexCreator;
Expand All @@ -55,10 +59,15 @@ public class LuceneTextIndexCreator implements TextIndexCreator {

private int _nextDocId = 0;

public static final CharArraySet ENGLISH_STOP_WORDS_SET = new CharArraySet(Arrays
.asList("a", "an", "and", "are", "as", "at", "be", "but", "by", "for", "if", "in", "into", "is", "it", "no",
"not", "of", "on", "or", "such", "that", "the", "their", "then", "than", "there", "these", "they", "this",
"to", "was", "will", "with", "those"), true);
public static HashSet<String> getDefaultEnglishStopWordsSet() {
return new HashSet<>(
Arrays.asList("a", "an", "and", "are", "as", "at", "be", "but", "by", "for", "if", "in", "into", "is", "it",
"no", "not", "of", "on", "or", "such", "that", "the", "their", "then", "than", "there", "these", "they",
"this", "to", "was", "will", "with", "those"));
}

public static final CharArraySet ENGLISH_STOP_WORDS_SET = new CharArraySet(getDefaultEnglishStopWordsSet(), true);


/**
* Called by {@link SegmentColumnarIndexCreator}
Expand All @@ -82,16 +91,20 @@ public class LuceneTextIndexCreator implements TextIndexCreator {
* no need to commit the index from the realtime side. So when the realtime segment
* is destroyed (which is after the realtime segment has been committed and converted
* to offline), we close this lucene index writer to release resources but don't commit.
* This is the reason to have commit flag part of the constructor.
* @param stopWordsInclude the words to include in addition to the default stop word list
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if stopWordsInclude and stopWordsExclude have the same word, will the code throw any exception on that?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current behavior is to prioritize exclude these duplicate words. I'm planning to put default ENGLISH_STOP_WORDS_SET, config keys, and this behavior in the user doc.

* @param stopWordsExclude the words to exclude from the default stop word list
*/
public LuceneTextIndexCreator(String column, File segmentIndexDir, boolean commit) {
public LuceneTextIndexCreator(String column, File segmentIndexDir, boolean commit,
@Nullable List<String> stopWordsInclude, @Nullable List<String> stopWordsExclude) {
_textColumn = column;
try {
// segment generation is always in V1 and later we convert (as part of post creation processing)
// to V3 if segmentVersion is set to V3 in SegmentGeneratorConfig.
File indexFile = getV1TextIndexFile(segmentIndexDir);
_indexDirectory = FSDirectory.open(indexFile.toPath());
StandardAnalyzer standardAnalyzer = new StandardAnalyzer(ENGLISH_STOP_WORDS_SET);

StandardAnalyzer standardAnalyzer =
TextIndexUtils.getStandardAnalyzerWithCustomizedStopWords(stopWordsInclude, stopWordsExclude);
IndexWriterConfig indexWriterConfig = new IndexWriterConfig(standardAnalyzer);
indexWriterConfig.setRAMBufferSizeMB(LUCENE_INDEX_MAX_BUFFER_SIZE_MB);
indexWriterConfig.setCommitOnClose(commit);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,13 @@
import java.io.File;
import java.io.IOException;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.apache.pinot.segment.local.segment.index.loader.IndexHandler;
import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
import org.apache.pinot.segment.local.segment.index.loader.LoaderUtils;
import org.apache.pinot.segment.local.segment.index.loader.SegmentPreProcessor;
import org.apache.pinot.segment.local.segment.store.TextIndexUtils;
import org.apache.pinot.segment.spi.ColumnMetadata;
import org.apache.pinot.segment.spi.SegmentMetadata;
import org.apache.pinot.segment.spi.creator.IndexCreationContext;
Expand Down Expand Up @@ -87,11 +89,13 @@ public class TextIndexHandler implements IndexHandler {
private final SegmentMetadata _segmentMetadata;
private final Set<String> _columnsToAddIdx;
private final FSTType _fstType;
private final Map<String, Map<String, String>> _columnProperties;

public TextIndexHandler(SegmentMetadata segmentMetadata, IndexLoadingConfig indexLoadingConfig) {
_segmentMetadata = segmentMetadata;
_fstType = indexLoadingConfig.getFSTIndexType();
_columnsToAddIdx = indexLoadingConfig.getTextIndexColumns();
_columnProperties = indexLoadingConfig.getColumnProperties();
}

@Override
Expand Down Expand Up @@ -179,7 +183,9 @@ private void createTextIndexForColumn(SegmentDirectory.Writer segmentWriter, Col
try (ForwardIndexReader forwardIndexReader = LoaderUtils.getForwardIndexReader(segmentWriter, columnMetadata);
ForwardIndexReaderContext readerContext = forwardIndexReader.createContext();
TextIndexCreator textIndexCreator = indexCreatorProvider.newTextIndexCreator(IndexCreationContext.builder()
.withColumnMetadata(columnMetadata).withIndexDir(segmentDirectory).build().forTextIndex(_fstType, true))) {
.withColumnMetadata(columnMetadata).withIndexDir(segmentDirectory).build().forTextIndex(_fstType, true,
TextIndexUtils.extractStopWordsInclude(columnName, _columnProperties),
TextIndexUtils.extractStopWordsExclude(columnName, _columnProperties)))) {
if (columnMetadata.isSingleValue()) {
processSVField(segmentWriter, hasDictionary, forwardIndexReader, readerContext, textIndexCreator, numDocs,
columnMetadata);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader;
import org.apache.pinot.segment.local.segment.creator.impl.text.LuceneTextIndexCreator;
import org.apache.pinot.segment.local.segment.index.column.PhysicalColumnIndexContainer;
import org.apache.pinot.segment.local.segment.store.TextIndexUtils;
import org.apache.pinot.segment.spi.V1Constants;
import org.apache.pinot.segment.spi.index.reader.TextIndexReader;
import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
Expand Down Expand Up @@ -93,7 +94,10 @@ public LuceneTextIndexReader(String column, File indexDir, int numDocs,
// TODO: consider using a threshold of num docs per segment to decide between building
// mapping file upfront on segment load v/s on-the-fly during query processing
_docIdTranslator = new DocIdTranslator(indexDir, _column, numDocs, _indexSearcher);
_standardAnalyzer = new StandardAnalyzer(LuceneTextIndexCreator.ENGLISH_STOP_WORDS_SET);
_standardAnalyzer = TextIndexUtils.getStandardAnalyzerWithCustomizedStopWords(
TextIndexUtils.extractStopWordsInclude(textIndexProperties),
TextIndexUtils.extractStopWordsExclude(textIndexProperties)
);
} catch (Exception e) {
LOGGER
.error("Failed to instantiate Lucene text index reader for column {}, exception {}", column, e.getMessage());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,18 @@
package org.apache.pinot.segment.local.segment.store;

import java.io.File;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.commons.io.FileUtils;
import org.apache.lucene.analysis.CharArraySet;
import org.apache.lucene.analysis.standard.StandardAnalyzer;
import org.apache.pinot.segment.local.segment.creator.impl.text.LuceneTextIndexCreator;
import org.apache.pinot.segment.spi.V1Constants.Indexes;
import org.apache.pinot.segment.spi.store.SegmentDirectoryPaths;
import org.apache.pinot.spi.config.table.FSTType;
Expand Down Expand Up @@ -64,4 +73,49 @@ public static boolean isFstTypeNative(@Nullable Map<String, String> textIndexPro
public static FSTType getFSTTypeOfIndex(File indexDir, String column) {
return SegmentDirectoryPaths.findTextIndexIndexFile(indexDir, column) != null ? FSTType.LUCENE : FSTType.NATIVE;
}

@Nonnull
public static List<String> extractStopWordsInclude(String colName,
Map<String, Map<String, String>> columnProperties) {
return extractStopWordsInclude(columnProperties.getOrDefault(colName, null));
}

@Nonnull
public static List<String> extractStopWordsExclude(String colName,
Map<String, Map<String, String>> columnProperties) {
return extractStopWordsExclude(columnProperties.getOrDefault(colName, null));
}

@Nonnull
public static List<String> extractStopWordsInclude(Map<String, String> columnProperty) {
return parseEntryAsString(columnProperty, FieldConfig.TEXT_INDEX_STOP_WORD_INCLUDE_KEY);
}

@Nonnull
public static List<String> extractStopWordsExclude(Map<String, String> columnProperty) {
return parseEntryAsString(columnProperty, FieldConfig.TEXT_INDEX_STOP_WORD_EXCLUDE_KEY);
}

@Nonnull
private static List<String> parseEntryAsString(@Nullable Map<String, String> columnProperties,
String stopWordKey) {
if (columnProperties == null) {
return Collections.EMPTY_LIST;
}
String includeWords = columnProperties.getOrDefault(stopWordKey, "");
return Arrays.stream(includeWords.split(FieldConfig.TEXT_INDEX_STOP_WORD_SEPERATOR))
.map(String::trim).collect(Collectors.toList());
}

public static StandardAnalyzer getStandardAnalyzerWithCustomizedStopWords(@Nullable List<String> stopWordsInclude,
@Nullable List<String> stopWordsExclude) {
HashSet<String> stopWordSet = LuceneTextIndexCreator.getDefaultEnglishStopWordsSet();
if (stopWordsInclude != null) {
stopWordSet.addAll(stopWordsInclude);
}
if (stopWordsExclude != null) {
stopWordsExclude.forEach(stopWordSet::remove);
}
return new StandardAnalyzer(new CharArraySet(stopWordSet, true));
}
}
Loading