Skip to content

Commit

Permalink
fixing merge
Browse files Browse the repository at this point in the history
Signed-off-by: Bharathwaj G <bharath78910@gmail.com>
  • Loading branch information
bharath-techie committed Oct 17, 2024
1 parent 537f19c commit 2bba7c1
Show file tree
Hide file tree
Showing 16 changed files with 565 additions and 195 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.apache.lucene.index;

import org.apache.lucene.codecs.DocValuesProducer;

import java.util.Collections;
import java.util.Set;

/**
* Utility class for DocValuesProducers
* @opensearch.internal
*/
public class DocValuesProducerUtil {
/**
* Returns the segment doc values producers for the given doc values producer.
* If the given doc values producer is not a segment doc values producer, an empty set is returned.
* @param docValuesProducer the doc values producer
* @return the segment doc values producers
*/
public static Set<DocValuesProducer> getSegmentDocValuesProducers(DocValuesProducer docValuesProducer) {
if (docValuesProducer instanceof SegmentDocValuesProducer) {
return (((SegmentDocValuesProducer) docValuesProducer).dvProducers);
}
return Collections.emptySet();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,13 @@ public Composite912DocValuesReader(DocValuesProducer producer, SegmentReadState
// populates the dummy list of field infos to fetch doc id set iterators for respective fields.
// the dummy field info is used to fetch the doc id set iterators for respective fields based on field name
FieldInfos fieldInfos = new FieldInfos(getFieldInfoList(fields, dimensionFieldTypeMap));
this.readState = new SegmentReadState(readState.directory, readState.segmentInfo, fieldInfos, readState.context);
this.readState = new SegmentReadState(
readState.directory,
readState.segmentInfo,
fieldInfos,
readState.context,
readState.segmentSuffix
);

// initialize star-tree doc values producer

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.apache.lucene.codecs.DocValuesConsumer;
import org.apache.lucene.codecs.DocValuesProducer;
import org.apache.lucene.index.DocValues;
import org.apache.lucene.index.DocValuesProducerUtil;
import org.apache.lucene.index.DocValuesType;
import org.apache.lucene.index.EmptyDocValuesProducer;
import org.apache.lucene.index.FieldInfo;
Expand Down Expand Up @@ -71,9 +72,7 @@ public class Composite912DocValuesWriter extends DocValuesConsumer {
private final Set<String> segmentFieldSet;
private final boolean segmentHasCompositeFields;
private final AtomicInteger fieldNumberAcrossCompositeFields;

private final Map<String, DocValuesProducer> fieldProducerMap = new HashMap<>();
private final Map<String, SortedSetDocValues> fieldDocIdSetIteratorMap = new HashMap<>();

public Composite912DocValuesWriter(DocValuesConsumer delegate, SegmentWriteState segmentWriteState, MapperService mapperService)
throws IOException {
Expand Down Expand Up @@ -147,11 +146,9 @@ public Composite912DocValuesWriter(DocValuesConsumer delegate, SegmentWriteState
private void addStarTreeSupportedFieldsFromSegment() {
// TODO : add integ test for this
for (FieldInfo fi : this.state.fieldInfos) {
if (DocValuesType.SORTED_NUMERIC.equals(fi.getDocValuesType())) {
segmentFieldSet.add(fi.name);
} else if (DocValuesType.SORTED_SET.equals(fi.getDocValuesType())) {
segmentFieldSet.add(fi.name);
} else if (fi.name.equals(DocCountFieldMapper.NAME)) {
if (DocValuesType.SORTED_NUMERIC.equals(fi.getDocValuesType())
|| DocValuesType.SORTED_SET.equals(fi.getDocValuesType())
|| fi.name.equals(DocCountFieldMapper.NAME)) {
segmentFieldSet.add(fi.name);
}
}
Expand Down Expand Up @@ -192,11 +189,6 @@ public void addSortedSetField(FieldInfo field, DocValuesProducer valuesProducer)
if (mergeState.get() == null && segmentHasCompositeFields) {
createCompositeIndicesIfPossible(valuesProducer, field);
}
if (mergeState.get() != null) {
if (compositeFieldSet.contains(field.name)) {
fieldDocIdSetIteratorMap.put(field.name, valuesProducer.getSortedSet(field));
}
}
}

@Override
Expand Down Expand Up @@ -317,8 +309,20 @@ private void mergeStarTreeFields(MergeState mergeState) throws IOException {
if (mergeState.docValuesProducers[i] instanceof CompositeIndexReader) {
reader = (CompositeIndexReader) mergeState.docValuesProducers[i];
} else {
continue;
Set<DocValuesProducer> docValuesProducers = DocValuesProducerUtil.getSegmentDocValuesProducers(
mergeState.docValuesProducers[i]
);
for (DocValuesProducer docValuesProducer : docValuesProducers) {
if (docValuesProducer instanceof CompositeIndexReader) {
reader = (CompositeIndexReader) docValuesProducer;
List<CompositeIndexFieldInfo> compositeFieldInfo = reader.getCompositeIndexFields();
if (compositeFieldInfo.isEmpty() == false) {
break;
}
}
}
}
if (reader == null) continue;

List<CompositeIndexFieldInfo> compositeFieldInfo = reader.getCompositeIndexFields();
for (CompositeIndexFieldInfo fieldInfo : compositeFieldInfo) {
Expand All @@ -345,14 +349,7 @@ private void mergeStarTreeFields(MergeState mergeState) throws IOException {
}
}
try (StarTreesBuilder starTreesBuilder = new StarTreesBuilder(state, mapperService, fieldNumberAcrossCompositeFields)) {
starTreesBuilder.buildDuringMerge(
metaOut,
dataOut,
starTreeSubsPerField,
compositeDocValuesConsumer,
mergeState,
fieldDocIdSetIteratorMap
);
starTreesBuilder.buildDuringMerge(metaOut, dataOut, starTreeSubsPerField, compositeDocValuesConsumer);
}
}

Expand All @@ -379,7 +376,8 @@ private static SegmentWriteState getSegmentWriteState(SegmentWriteState segmentW
segmentInfo,
segmentWriteState.fieldInfos,
segmentWriteState.segUpdates,
segmentWriteState.context
segmentWriteState.context,
segmentWriteState.segmentSuffix
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@
import org.apache.lucene.index.DocValuesWriterWrapper;
import org.apache.lucene.index.EmptyDocValuesProducer;
import org.apache.lucene.index.FieldInfo;
import org.apache.lucene.index.FilteredTermsEnum;
import org.apache.lucene.index.MergeState;
import org.apache.lucene.index.OrdinalMap;
import org.apache.lucene.index.SegmentWriteState;
import org.apache.lucene.index.SortedNumericDocValues;
Expand All @@ -27,11 +25,8 @@
import org.apache.lucene.index.TermsEnum;
import org.apache.lucene.search.DocIdSetIterator;
import org.apache.lucene.store.IndexOutput;
import org.apache.lucene.util.Bits;
import org.apache.lucene.util.ByteBlockPool;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.Counter;
import org.apache.lucene.util.LongBitSet;
import org.apache.lucene.util.LongValues;
import org.apache.lucene.util.NumericUtils;
import org.apache.lucene.util.packed.PackedInts;
Expand Down Expand Up @@ -74,7 +69,6 @@
import static org.opensearch.index.compositeindex.datacube.startree.utils.StarTreeUtils.fullyQualifiedFieldNameForStarTreeDimensionsDocValues;
import static org.opensearch.index.compositeindex.datacube.startree.utils.StarTreeUtils.fullyQualifiedFieldNameForStarTreeMetricsDocValues;
import static org.opensearch.index.compositeindex.datacube.startree.utils.StarTreeUtils.getFieldInfo;
import static org.opensearch.index.compositeindex.datacube.startree.utils.iterator.StarTreeValuesIterator.NO_MORE_ENTRIES;
import static org.opensearch.index.mapper.NumberFieldMapper.NumberType.DOUBLE;
import static org.opensearch.index.mapper.NumberFieldMapper.NumberType.LONG;

Expand Down Expand Up @@ -109,7 +103,14 @@ public abstract class BaseStarTreeBuilder implements StarTreeBuilder {
private final IndexOutput metaOut;
private final IndexOutput dataOut;
private final Counter bytesUsed = Counter.newCounter();
Map<String, SortedSetDocValues> sortedSetDocValuesMap = new HashMap<>();
Map<String, SortedSetDocValues> flushSortedSetDocValuesMap = new HashMap<>();
// Maintains list of sortedSetDocValues for each star tree dimension field across segments during merge
Map<String, List<SortedSetStarTreeValuesIterator>> mergeSortedSetDimensionsMap = new HashMap<>();
// Maintains ordinalMap for each star tree dimension field during merge
Map<String, OrdinalMap> mergeSortedSetDimensionsOrdinalMap = new HashMap<>();

// This should be true for merge flows
protected boolean isMerge = false;

/**
* Reads all the configuration related to dimensions and metrics, builds a star-tree based on the different construction parameters.
Expand Down Expand Up @@ -265,7 +266,7 @@ public void build(
if (dimensionsSplitOrder.get(i).getDocValuesType().equals(DocValuesType.SORTED_SET)) {
// This is needed as we need to write the ordinals and also the bytesRef associated with it
// as part of star tree doc values file formats
sortedSetDocValuesMap.put(
flushSortedSetDocValuesMap.put(
dimensionsSplitOrder.get(i).getField(),
fieldProducerMap.get(dimensionFieldInfo.name).getSortedSet(dimensionFieldInfo)
);
Expand All @@ -278,28 +279,6 @@ public void build(
logger.debug("Finished Building star-tree in ms : {}", (System.currentTimeMillis() - startTime));
}

/**
* Copy of BitsFilteredTermsEnum from DocValuesConsumer
*/
static class BitsFilteredTermsEnum extends FilteredTermsEnum {
final LongBitSet liveTerms;

BitsFilteredTermsEnum(TermsEnum in, LongBitSet liveTerms) {
super(in, false);
assert liveTerms != null;
this.liveTerms = liveTerms;
}

@Override
protected AcceptStatus accept(BytesRef term) throws IOException {
if (liveTerms.get(ord())) {
return AcceptStatus.YES;
} else {
return AcceptStatus.NO;
}
}
}

/**
* Returns the sequential doc values iterator for the given field based on associated docValuesType
*/
Expand All @@ -325,10 +304,9 @@ private SequentialDocValuesIterator getSequentialDocValuesIterator(
/**
* Returns the ordinal map per field based on given star-tree values across different segments
*/
protected Map<String, OrdinalMap> getOrdinalMaps(List<StarTreeValues> starTreeValuesSubs, MergeState mergeState) throws IOException {
protected Map<String, OrdinalMap> getOrdinalMaps(List<StarTreeValues> starTreeValuesSubs) throws IOException {
long curr = System.currentTimeMillis();
Map<String, List<SortedSetStarTreeValuesIterator>> dimensionToIterators = new HashMap<>();

// Group iterators by dimension
for (StarTreeValues starTree : starTreeValuesSubs) {
for (String dimName : starTree.getStarTreeField().getDimensionNames()) {
Expand All @@ -340,9 +318,8 @@ protected Map<String, OrdinalMap> getOrdinalMaps(List<StarTreeValues> starTreeVa
}

if (dimensionToIterators.isEmpty()) return Collections.emptyMap();

this.mergeSortedSetDimensionsMap = dimensionToIterators;
Map<String, OrdinalMap> dimensionToOrdinalMap = new HashMap<>();

for (Map.Entry<String, List<SortedSetStarTreeValuesIterator>> entry : dimensionToIterators.entrySet()) {
String dimName = entry.getKey();
List<SortedSetStarTreeValuesIterator> iterators = entry.getValue();
Expand All @@ -353,23 +330,8 @@ protected Map<String, OrdinalMap> getOrdinalMaps(List<StarTreeValues> starTreeVa

for (int sub = 0; sub < liveTerms.length; sub++) {
SortedSetStarTreeValuesIterator dv = iterators.get(sub);
Bits liveDocs = mergeState.liveDocs[sub];
if (liveDocs == null) {
liveTerms[sub] = dv.termsEnum();
weights[sub] = dv.getValueCount();
} else {
LongBitSet bitset = new LongBitSet(dv.getValueCount());
int docID;
while ((docID = dv.nextEntry()) != NO_MORE_ENTRIES) {
if (liveDocs.get(docID)) {
for (int i = 0; i < dv.docValueCount(); i++) {
bitset.set(dv.nextOrd());
}
}
}
liveTerms[sub] = new BitsFilteredTermsEnum(dv.termsEnum(), bitset);
weights[sub] = bitset.cardinality();
}
liveTerms[sub] = dv.termsEnum();
weights[sub] = dv.getValueCount();
}

// step 2: create ordinal map for this dimension
Expand All @@ -378,7 +340,7 @@ protected Map<String, OrdinalMap> getOrdinalMaps(List<StarTreeValues> starTreeVa

logger.debug("Ordinal map for dimension {} - Size in bytes: {}", dimName, map.ramBytesUsed());
}

this.mergeSortedSetDimensionsOrdinalMap = dimensionToOrdinalMap;
logger.debug("Total time to build ordinal maps: {} ms", System.currentTimeMillis() - curr);
return dimensionToOrdinalMap;
}
Expand Down Expand Up @@ -418,7 +380,7 @@ public void build(

createAggregatedDocs(rootNode);
int numAggregatedStarTreeDocument = numStarTreeDocs - numStarTreeDocument - numStarTreeDocumentUnderStarNode;
logger.info("Finished creating aggregated documents : {}", numAggregatedStarTreeDocument);
logger.debug("Finished creating aggregated documents : {}", numAggregatedStarTreeDocument);

// Create doc values indices in disk
createSortedDocValuesIndices(starTreeDocValuesConsumer, fieldNumberAcrossStarTrees);
Expand Down Expand Up @@ -496,7 +458,6 @@ private void createSortedDocValuesIndices(DocValuesConsumer docValuesConsumer, A
metricFieldInfoList[i] = fi;
metricWriters.add(new SortedNumericDocValuesWriterWrapper(fi, bytesUsed));
}

for (int docId = 0; docId < numStarTreeDocs; docId++) {
StarTreeDocument starTreeDocument = getStarTreeDocument(docId);
int idx = 0;
Expand Down Expand Up @@ -544,7 +505,17 @@ private void createSortedDocValuesIndices(DocValuesConsumer docValuesConsumer, A
private void indexDocValue(DocValuesWriterWrapper<?> dvWriter, int docId, long value, String field) throws IOException {
if (dvWriter instanceof SortedSetDocValuesWriterWrapper) {
// TODO : cache lookupOrd to make it faster
((SortedSetDocValuesWriterWrapper) dvWriter).addValue(docId, sortedSetDocValuesMap.get(field).lookupOrd(value));
if (isMerge) {
OrdinalMap map = mergeSortedSetDimensionsOrdinalMap.get(field);
int segmentNumber = map.getFirstSegmentNumber(value);
long segmentOrd = map.getFirstSegmentOrd(value);
((SortedSetDocValuesWriterWrapper) dvWriter).addValue(
docId,
mergeSortedSetDimensionsMap.get(field).get(segmentNumber).lookupOrd(segmentOrd)
);
} else {
((SortedSetDocValuesWriterWrapper) dvWriter).addValue(docId, flushSortedSetDocValuesMap.get(field).lookupOrd(value));
}
} else if (dvWriter instanceof SortedNumericDocValuesWriterWrapper) {
((SortedNumericDocValuesWriterWrapper) dvWriter).addValue(docId, value);
}
Expand Down Expand Up @@ -622,7 +593,7 @@ protected StarTreeDocument getStarTreeDocument(
/**
* Sets dimensions / metric readers nnd numSegmentDocs
*/
protected void setReadersAndNumSegmentDocs(
protected void setReadersAndNumSegmentDocsDuringMerge(
SequentialDocValuesIterator[] dimensionReaders,
List<SequentialDocValuesIterator> metricReaders,
AtomicInteger numSegmentDocs,
Expand Down Expand Up @@ -855,8 +826,8 @@ private static Long getLong(Object metric) {
* Sets the sortedSetDocValuesMap.
* This is needed as we need to write the ordinals and also the bytesRef associated with it
*/
void setSortedSetDocValuesMap(Map<String, SortedSetDocValues> sortedSetDocValuesMap) {
this.sortedSetDocValuesMap = sortedSetDocValuesMap;
void setFlushSortedSetDocValuesMap(Map<String, SortedSetDocValues> flushSortedSetDocValuesMap) {
this.flushSortedSetDocValuesMap = flushSortedSetDocValuesMap;
}

/**
Expand Down Expand Up @@ -1128,7 +1099,7 @@ public void close() throws IOException {

}

abstract Iterator<StarTreeDocument> mergeStarTrees(List<StarTreeValues> starTreeValues, MergeState mergeState) throws IOException;
abstract Iterator<StarTreeDocument> mergeStarTrees(List<StarTreeValues> starTreeValues) throws IOException;

public InMemoryTreeNode getRootNode() {
return rootNode;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,8 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.codecs.DocValuesConsumer;
import org.apache.lucene.index.MergeState;
import org.apache.lucene.index.OrdinalMap;
import org.apache.lucene.index.SegmentWriteState;
import org.apache.lucene.index.SortedSetDocValues;
import org.apache.lucene.store.IndexOutput;
import org.apache.lucene.util.LongValues;
import org.opensearch.common.annotation.ExperimentalApi;
Expand Down Expand Up @@ -91,14 +89,11 @@ public void appendStarTreeDocument(StarTreeDocument starTreeDocument) throws IOE
public void build(
List<StarTreeValues> starTreeValuesSubs,
AtomicInteger fieldNumberAcrossStarTrees,
DocValuesConsumer starTreeDocValuesConsumer,
MergeState mergeState,
Map<String, SortedSetDocValues> sortedSetDocValuesMap
DocValuesConsumer starTreeDocValuesConsumer
) throws IOException {
boolean success = false;
setSortedSetDocValuesMap(sortedSetDocValuesMap);
try {
build(mergeStarTrees(starTreeValuesSubs, mergeState), fieldNumberAcrossStarTrees, starTreeDocValuesConsumer);
build(mergeStarTrees(starTreeValuesSubs), fieldNumberAcrossStarTrees, starTreeDocValuesConsumer);
success = true;
} finally {
starTreeDocumentFileManager.deleteFiles(success);
Expand Down Expand Up @@ -144,17 +139,18 @@ public Iterator<StarTreeDocument> sortAndAggregateSegmentDocuments(
* @param starTreeValuesSubs StarTreeValues from multiple segments
* @return iterator of star tree documents
*/
Iterator<StarTreeDocument> mergeStarTrees(List<StarTreeValues> starTreeValuesSubs, MergeState mergeState) throws IOException {
Iterator<StarTreeDocument> mergeStarTrees(List<StarTreeValues> starTreeValuesSubs) throws IOException {
int numDocs = 0;
int[] docIds;
Map<String, OrdinalMap> ordinalMaps = getOrdinalMaps(starTreeValuesSubs, mergeState);
this.isMerge = true;
Map<String, OrdinalMap> ordinalMaps = getOrdinalMaps(starTreeValuesSubs);
try {
int seg = 0;
for (StarTreeValues starTreeValues : starTreeValuesSubs) {
SequentialDocValuesIterator[] dimensionReaders = new SequentialDocValuesIterator[numDimensions];
List<SequentialDocValuesIterator> metricReaders = new ArrayList<>();
AtomicInteger numSegmentDocs = new AtomicInteger();
setReadersAndNumSegmentDocs(dimensionReaders, metricReaders, numSegmentDocs, starTreeValues);
setReadersAndNumSegmentDocsDuringMerge(dimensionReaders, metricReaders, numSegmentDocs, starTreeValues);
int currentDocId = 0;
Map<String, LongValues> longValuesMap = new LinkedHashMap<>();
for (Map.Entry<String, OrdinalMap> entry : ordinalMaps.entrySet()) {
Expand Down
Loading

0 comments on commit 2bba7c1

Please sign in to comment.