From 2bba7c12e81ef82bdd43eab00c74cf63cee9fd37 Mon Sep 17 00:00:00 2001 From: Bharathwaj G Date: Thu, 17 Oct 2024 14:18:04 +0530 Subject: [PATCH] fixing merge Signed-off-by: Bharathwaj G --- .../lucene/index/DocValuesProducerUtil.java | 33 +++ .../Composite912DocValuesReader.java | 8 +- .../Composite912DocValuesWriter.java | 42 ++-- .../startree/builder/BaseStarTreeBuilder.java | 89 +++---- .../builder/OffHeapStarTreeBuilder.java | 16 +- .../builder/OnHeapStarTreeBuilder.java | 20 +- .../startree/builder/StarTreeBuilder.java | 7 +- .../startree/builder/StarTreesBuilder.java | 17 +- .../SortedSetDocValuesWriterWrapperTests.java | 98 ++++++++ .../StarTreeKeywordDocValuesFormatTests.java | 171 +++++++++---- .../datacube/startree/StarTreeTestUtils.java | 1 + .../builder/BaseStarTreeBuilderTests.java | 10 +- .../startree/builder/BuilderTestsUtils.java | 6 +- .../StarTreeBuilderFlushFlowTests.java | 4 +- .../StarTreeBuilderMergeFlowTests.java | 226 ++++++++++++++++-- .../builder/StarTreeBuilderTestCase.java | 12 + 16 files changed, 565 insertions(+), 195 deletions(-) create mode 100644 server/src/main/java/org/apache/lucene/index/DocValuesProducerUtil.java create mode 100644 server/src/test/java/org/opensearch/index/codec/composite/SortedSetDocValuesWriterWrapperTests.java diff --git a/server/src/main/java/org/apache/lucene/index/DocValuesProducerUtil.java b/server/src/main/java/org/apache/lucene/index/DocValuesProducerUtil.java new file mode 100644 index 0000000000000..3aebec56b82d9 --- /dev/null +++ b/server/src/main/java/org/apache/lucene/index/DocValuesProducerUtil.java @@ -0,0 +1,33 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.apache.lucene.index; + +import org.apache.lucene.codecs.DocValuesProducer; + +import java.util.Collections; +import java.util.Set; + +/** + * Utility class for DocValuesProducers + * @opensearch.internal + */ +public class DocValuesProducerUtil { + /** + * Returns the segment doc values producers for the given doc values producer. + * If the given doc values producer is not a segment doc values producer, an empty set is returned. + * @param docValuesProducer the doc values producer + * @return the segment doc values producers + */ + public static Set getSegmentDocValuesProducers(DocValuesProducer docValuesProducer) { + if (docValuesProducer instanceof SegmentDocValuesProducer) { + return (((SegmentDocValuesProducer) docValuesProducer).dvProducers); + } + return Collections.emptySet(); + } +} diff --git a/server/src/main/java/org/opensearch/index/codec/composite/composite912/Composite912DocValuesReader.java b/server/src/main/java/org/opensearch/index/codec/composite/composite912/Composite912DocValuesReader.java index e5695f9c350ca..82f425df873e6 100644 --- a/server/src/main/java/org/opensearch/index/codec/composite/composite912/Composite912DocValuesReader.java +++ b/server/src/main/java/org/opensearch/index/codec/composite/composite912/Composite912DocValuesReader.java @@ -190,7 +190,13 @@ public Composite912DocValuesReader(DocValuesProducer producer, SegmentReadState // populates the dummy list of field infos to fetch doc id set iterators for respective fields. // the dummy field info is used to fetch the doc id set iterators for respective fields based on field name FieldInfos fieldInfos = new FieldInfos(getFieldInfoList(fields, dimensionFieldTypeMap)); - this.readState = new SegmentReadState(readState.directory, readState.segmentInfo, fieldInfos, readState.context); + this.readState = new SegmentReadState( + readState.directory, + readState.segmentInfo, + fieldInfos, + readState.context, + readState.segmentSuffix + ); // initialize star-tree doc values producer diff --git a/server/src/main/java/org/opensearch/index/codec/composite/composite912/Composite912DocValuesWriter.java b/server/src/main/java/org/opensearch/index/codec/composite/composite912/Composite912DocValuesWriter.java index e352071b64651..3ea288a86b044 100644 --- a/server/src/main/java/org/opensearch/index/codec/composite/composite912/Composite912DocValuesWriter.java +++ b/server/src/main/java/org/opensearch/index/codec/composite/composite912/Composite912DocValuesWriter.java @@ -12,6 +12,7 @@ import org.apache.lucene.codecs.DocValuesConsumer; import org.apache.lucene.codecs.DocValuesProducer; import org.apache.lucene.index.DocValues; +import org.apache.lucene.index.DocValuesProducerUtil; import org.apache.lucene.index.DocValuesType; import org.apache.lucene.index.EmptyDocValuesProducer; import org.apache.lucene.index.FieldInfo; @@ -71,9 +72,7 @@ public class Composite912DocValuesWriter extends DocValuesConsumer { private final Set segmentFieldSet; private final boolean segmentHasCompositeFields; private final AtomicInteger fieldNumberAcrossCompositeFields; - private final Map fieldProducerMap = new HashMap<>(); - private final Map fieldDocIdSetIteratorMap = new HashMap<>(); public Composite912DocValuesWriter(DocValuesConsumer delegate, SegmentWriteState segmentWriteState, MapperService mapperService) throws IOException { @@ -147,11 +146,9 @@ public Composite912DocValuesWriter(DocValuesConsumer delegate, SegmentWriteState private void addStarTreeSupportedFieldsFromSegment() { // TODO : add integ test for this for (FieldInfo fi : this.state.fieldInfos) { - if (DocValuesType.SORTED_NUMERIC.equals(fi.getDocValuesType())) { - segmentFieldSet.add(fi.name); - } else if (DocValuesType.SORTED_SET.equals(fi.getDocValuesType())) { - segmentFieldSet.add(fi.name); - } else if (fi.name.equals(DocCountFieldMapper.NAME)) { + if (DocValuesType.SORTED_NUMERIC.equals(fi.getDocValuesType()) + || DocValuesType.SORTED_SET.equals(fi.getDocValuesType()) + || fi.name.equals(DocCountFieldMapper.NAME)) { segmentFieldSet.add(fi.name); } } @@ -192,11 +189,6 @@ public void addSortedSetField(FieldInfo field, DocValuesProducer valuesProducer) if (mergeState.get() == null && segmentHasCompositeFields) { createCompositeIndicesIfPossible(valuesProducer, field); } - if (mergeState.get() != null) { - if (compositeFieldSet.contains(field.name)) { - fieldDocIdSetIteratorMap.put(field.name, valuesProducer.getSortedSet(field)); - } - } } @Override @@ -317,8 +309,20 @@ private void mergeStarTreeFields(MergeState mergeState) throws IOException { if (mergeState.docValuesProducers[i] instanceof CompositeIndexReader) { reader = (CompositeIndexReader) mergeState.docValuesProducers[i]; } else { - continue; + Set docValuesProducers = DocValuesProducerUtil.getSegmentDocValuesProducers( + mergeState.docValuesProducers[i] + ); + for (DocValuesProducer docValuesProducer : docValuesProducers) { + if (docValuesProducer instanceof CompositeIndexReader) { + reader = (CompositeIndexReader) docValuesProducer; + List compositeFieldInfo = reader.getCompositeIndexFields(); + if (compositeFieldInfo.isEmpty() == false) { + break; + } + } + } } + if (reader == null) continue; List compositeFieldInfo = reader.getCompositeIndexFields(); for (CompositeIndexFieldInfo fieldInfo : compositeFieldInfo) { @@ -345,14 +349,7 @@ private void mergeStarTreeFields(MergeState mergeState) throws IOException { } } try (StarTreesBuilder starTreesBuilder = new StarTreesBuilder(state, mapperService, fieldNumberAcrossCompositeFields)) { - starTreesBuilder.buildDuringMerge( - metaOut, - dataOut, - starTreeSubsPerField, - compositeDocValuesConsumer, - mergeState, - fieldDocIdSetIteratorMap - ); + starTreesBuilder.buildDuringMerge(metaOut, dataOut, starTreeSubsPerField, compositeDocValuesConsumer); } } @@ -379,7 +376,8 @@ private static SegmentWriteState getSegmentWriteState(SegmentWriteState segmentW segmentInfo, segmentWriteState.fieldInfos, segmentWriteState.segUpdates, - segmentWriteState.context + segmentWriteState.context, + segmentWriteState.segmentSuffix ); } diff --git a/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/builder/BaseStarTreeBuilder.java b/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/builder/BaseStarTreeBuilder.java index c0dd32ef4f0a3..2cbe310c27f05 100644 --- a/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/builder/BaseStarTreeBuilder.java +++ b/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/builder/BaseStarTreeBuilder.java @@ -16,8 +16,6 @@ import org.apache.lucene.index.DocValuesWriterWrapper; import org.apache.lucene.index.EmptyDocValuesProducer; import org.apache.lucene.index.FieldInfo; -import org.apache.lucene.index.FilteredTermsEnum; -import org.apache.lucene.index.MergeState; import org.apache.lucene.index.OrdinalMap; import org.apache.lucene.index.SegmentWriteState; import org.apache.lucene.index.SortedNumericDocValues; @@ -27,11 +25,8 @@ import org.apache.lucene.index.TermsEnum; import org.apache.lucene.search.DocIdSetIterator; import org.apache.lucene.store.IndexOutput; -import org.apache.lucene.util.Bits; import org.apache.lucene.util.ByteBlockPool; -import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.Counter; -import org.apache.lucene.util.LongBitSet; import org.apache.lucene.util.LongValues; import org.apache.lucene.util.NumericUtils; import org.apache.lucene.util.packed.PackedInts; @@ -74,7 +69,6 @@ import static org.opensearch.index.compositeindex.datacube.startree.utils.StarTreeUtils.fullyQualifiedFieldNameForStarTreeDimensionsDocValues; import static org.opensearch.index.compositeindex.datacube.startree.utils.StarTreeUtils.fullyQualifiedFieldNameForStarTreeMetricsDocValues; import static org.opensearch.index.compositeindex.datacube.startree.utils.StarTreeUtils.getFieldInfo; -import static org.opensearch.index.compositeindex.datacube.startree.utils.iterator.StarTreeValuesIterator.NO_MORE_ENTRIES; import static org.opensearch.index.mapper.NumberFieldMapper.NumberType.DOUBLE; import static org.opensearch.index.mapper.NumberFieldMapper.NumberType.LONG; @@ -109,7 +103,14 @@ public abstract class BaseStarTreeBuilder implements StarTreeBuilder { private final IndexOutput metaOut; private final IndexOutput dataOut; private final Counter bytesUsed = Counter.newCounter(); - Map sortedSetDocValuesMap = new HashMap<>(); + Map flushSortedSetDocValuesMap = new HashMap<>(); + // Maintains list of sortedSetDocValues for each star tree dimension field across segments during merge + Map> mergeSortedSetDimensionsMap = new HashMap<>(); + // Maintains ordinalMap for each star tree dimension field during merge + Map mergeSortedSetDimensionsOrdinalMap = new HashMap<>(); + + // This should be true for merge flows + protected boolean isMerge = false; /** * Reads all the configuration related to dimensions and metrics, builds a star-tree based on the different construction parameters. @@ -265,7 +266,7 @@ public void build( if (dimensionsSplitOrder.get(i).getDocValuesType().equals(DocValuesType.SORTED_SET)) { // This is needed as we need to write the ordinals and also the bytesRef associated with it // as part of star tree doc values file formats - sortedSetDocValuesMap.put( + flushSortedSetDocValuesMap.put( dimensionsSplitOrder.get(i).getField(), fieldProducerMap.get(dimensionFieldInfo.name).getSortedSet(dimensionFieldInfo) ); @@ -278,28 +279,6 @@ public void build( logger.debug("Finished Building star-tree in ms : {}", (System.currentTimeMillis() - startTime)); } - /** - * Copy of BitsFilteredTermsEnum from DocValuesConsumer - */ - static class BitsFilteredTermsEnum extends FilteredTermsEnum { - final LongBitSet liveTerms; - - BitsFilteredTermsEnum(TermsEnum in, LongBitSet liveTerms) { - super(in, false); - assert liveTerms != null; - this.liveTerms = liveTerms; - } - - @Override - protected AcceptStatus accept(BytesRef term) throws IOException { - if (liveTerms.get(ord())) { - return AcceptStatus.YES; - } else { - return AcceptStatus.NO; - } - } - } - /** * Returns the sequential doc values iterator for the given field based on associated docValuesType */ @@ -325,10 +304,9 @@ private SequentialDocValuesIterator getSequentialDocValuesIterator( /** * Returns the ordinal map per field based on given star-tree values across different segments */ - protected Map getOrdinalMaps(List starTreeValuesSubs, MergeState mergeState) throws IOException { + protected Map getOrdinalMaps(List starTreeValuesSubs) throws IOException { long curr = System.currentTimeMillis(); Map> dimensionToIterators = new HashMap<>(); - // Group iterators by dimension for (StarTreeValues starTree : starTreeValuesSubs) { for (String dimName : starTree.getStarTreeField().getDimensionNames()) { @@ -340,9 +318,8 @@ protected Map getOrdinalMaps(List starTreeVa } if (dimensionToIterators.isEmpty()) return Collections.emptyMap(); - + this.mergeSortedSetDimensionsMap = dimensionToIterators; Map dimensionToOrdinalMap = new HashMap<>(); - for (Map.Entry> entry : dimensionToIterators.entrySet()) { String dimName = entry.getKey(); List iterators = entry.getValue(); @@ -353,23 +330,8 @@ protected Map getOrdinalMaps(List starTreeVa for (int sub = 0; sub < liveTerms.length; sub++) { SortedSetStarTreeValuesIterator dv = iterators.get(sub); - Bits liveDocs = mergeState.liveDocs[sub]; - if (liveDocs == null) { - liveTerms[sub] = dv.termsEnum(); - weights[sub] = dv.getValueCount(); - } else { - LongBitSet bitset = new LongBitSet(dv.getValueCount()); - int docID; - while ((docID = dv.nextEntry()) != NO_MORE_ENTRIES) { - if (liveDocs.get(docID)) { - for (int i = 0; i < dv.docValueCount(); i++) { - bitset.set(dv.nextOrd()); - } - } - } - liveTerms[sub] = new BitsFilteredTermsEnum(dv.termsEnum(), bitset); - weights[sub] = bitset.cardinality(); - } + liveTerms[sub] = dv.termsEnum(); + weights[sub] = dv.getValueCount(); } // step 2: create ordinal map for this dimension @@ -378,7 +340,7 @@ protected Map getOrdinalMaps(List starTreeVa logger.debug("Ordinal map for dimension {} - Size in bytes: {}", dimName, map.ramBytesUsed()); } - + this.mergeSortedSetDimensionsOrdinalMap = dimensionToOrdinalMap; logger.debug("Total time to build ordinal maps: {} ms", System.currentTimeMillis() - curr); return dimensionToOrdinalMap; } @@ -418,7 +380,7 @@ public void build( createAggregatedDocs(rootNode); int numAggregatedStarTreeDocument = numStarTreeDocs - numStarTreeDocument - numStarTreeDocumentUnderStarNode; - logger.info("Finished creating aggregated documents : {}", numAggregatedStarTreeDocument); + logger.debug("Finished creating aggregated documents : {}", numAggregatedStarTreeDocument); // Create doc values indices in disk createSortedDocValuesIndices(starTreeDocValuesConsumer, fieldNumberAcrossStarTrees); @@ -496,7 +458,6 @@ private void createSortedDocValuesIndices(DocValuesConsumer docValuesConsumer, A metricFieldInfoList[i] = fi; metricWriters.add(new SortedNumericDocValuesWriterWrapper(fi, bytesUsed)); } - for (int docId = 0; docId < numStarTreeDocs; docId++) { StarTreeDocument starTreeDocument = getStarTreeDocument(docId); int idx = 0; @@ -544,7 +505,17 @@ private void createSortedDocValuesIndices(DocValuesConsumer docValuesConsumer, A private void indexDocValue(DocValuesWriterWrapper dvWriter, int docId, long value, String field) throws IOException { if (dvWriter instanceof SortedSetDocValuesWriterWrapper) { // TODO : cache lookupOrd to make it faster - ((SortedSetDocValuesWriterWrapper) dvWriter).addValue(docId, sortedSetDocValuesMap.get(field).lookupOrd(value)); + if (isMerge) { + OrdinalMap map = mergeSortedSetDimensionsOrdinalMap.get(field); + int segmentNumber = map.getFirstSegmentNumber(value); + long segmentOrd = map.getFirstSegmentOrd(value); + ((SortedSetDocValuesWriterWrapper) dvWriter).addValue( + docId, + mergeSortedSetDimensionsMap.get(field).get(segmentNumber).lookupOrd(segmentOrd) + ); + } else { + ((SortedSetDocValuesWriterWrapper) dvWriter).addValue(docId, flushSortedSetDocValuesMap.get(field).lookupOrd(value)); + } } else if (dvWriter instanceof SortedNumericDocValuesWriterWrapper) { ((SortedNumericDocValuesWriterWrapper) dvWriter).addValue(docId, value); } @@ -622,7 +593,7 @@ protected StarTreeDocument getStarTreeDocument( /** * Sets dimensions / metric readers nnd numSegmentDocs */ - protected void setReadersAndNumSegmentDocs( + protected void setReadersAndNumSegmentDocsDuringMerge( SequentialDocValuesIterator[] dimensionReaders, List metricReaders, AtomicInteger numSegmentDocs, @@ -855,8 +826,8 @@ private static Long getLong(Object metric) { * Sets the sortedSetDocValuesMap. * This is needed as we need to write the ordinals and also the bytesRef associated with it */ - void setSortedSetDocValuesMap(Map sortedSetDocValuesMap) { - this.sortedSetDocValuesMap = sortedSetDocValuesMap; + void setFlushSortedSetDocValuesMap(Map flushSortedSetDocValuesMap) { + this.flushSortedSetDocValuesMap = flushSortedSetDocValuesMap; } /** @@ -1128,7 +1099,7 @@ public void close() throws IOException { } - abstract Iterator mergeStarTrees(List starTreeValues, MergeState mergeState) throws IOException; + abstract Iterator mergeStarTrees(List starTreeValues) throws IOException; public InMemoryTreeNode getRootNode() { return rootNode; diff --git a/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/builder/OffHeapStarTreeBuilder.java b/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/builder/OffHeapStarTreeBuilder.java index 7daa7742937c9..00a3fd66d521f 100644 --- a/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/builder/OffHeapStarTreeBuilder.java +++ b/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/builder/OffHeapStarTreeBuilder.java @@ -11,10 +11,8 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.lucene.codecs.DocValuesConsumer; -import org.apache.lucene.index.MergeState; import org.apache.lucene.index.OrdinalMap; import org.apache.lucene.index.SegmentWriteState; -import org.apache.lucene.index.SortedSetDocValues; import org.apache.lucene.store.IndexOutput; import org.apache.lucene.util.LongValues; import org.opensearch.common.annotation.ExperimentalApi; @@ -91,14 +89,11 @@ public void appendStarTreeDocument(StarTreeDocument starTreeDocument) throws IOE public void build( List starTreeValuesSubs, AtomicInteger fieldNumberAcrossStarTrees, - DocValuesConsumer starTreeDocValuesConsumer, - MergeState mergeState, - Map sortedSetDocValuesMap + DocValuesConsumer starTreeDocValuesConsumer ) throws IOException { boolean success = false; - setSortedSetDocValuesMap(sortedSetDocValuesMap); try { - build(mergeStarTrees(starTreeValuesSubs, mergeState), fieldNumberAcrossStarTrees, starTreeDocValuesConsumer); + build(mergeStarTrees(starTreeValuesSubs), fieldNumberAcrossStarTrees, starTreeDocValuesConsumer); success = true; } finally { starTreeDocumentFileManager.deleteFiles(success); @@ -144,17 +139,18 @@ public Iterator sortAndAggregateSegmentDocuments( * @param starTreeValuesSubs StarTreeValues from multiple segments * @return iterator of star tree documents */ - Iterator mergeStarTrees(List starTreeValuesSubs, MergeState mergeState) throws IOException { + Iterator mergeStarTrees(List starTreeValuesSubs) throws IOException { int numDocs = 0; int[] docIds; - Map ordinalMaps = getOrdinalMaps(starTreeValuesSubs, mergeState); + this.isMerge = true; + Map ordinalMaps = getOrdinalMaps(starTreeValuesSubs); try { int seg = 0; for (StarTreeValues starTreeValues : starTreeValuesSubs) { SequentialDocValuesIterator[] dimensionReaders = new SequentialDocValuesIterator[numDimensions]; List metricReaders = new ArrayList<>(); AtomicInteger numSegmentDocs = new AtomicInteger(); - setReadersAndNumSegmentDocs(dimensionReaders, metricReaders, numSegmentDocs, starTreeValues); + setReadersAndNumSegmentDocsDuringMerge(dimensionReaders, metricReaders, numSegmentDocs, starTreeValues); int currentDocId = 0; Map longValuesMap = new LinkedHashMap<>(); for (Map.Entry entry : ordinalMaps.entrySet()) { diff --git a/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/builder/OnHeapStarTreeBuilder.java b/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/builder/OnHeapStarTreeBuilder.java index 9a06f8a705ae5..c91f4c5db98bb 100644 --- a/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/builder/OnHeapStarTreeBuilder.java +++ b/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/builder/OnHeapStarTreeBuilder.java @@ -8,10 +8,8 @@ package org.opensearch.index.compositeindex.datacube.startree.builder; import org.apache.lucene.codecs.DocValuesConsumer; -import org.apache.lucene.index.MergeState; import org.apache.lucene.index.OrdinalMap; import org.apache.lucene.index.SegmentWriteState; -import org.apache.lucene.index.SortedSetDocValues; import org.apache.lucene.store.IndexOutput; import org.apache.lucene.util.LongValues; import org.opensearch.common.annotation.ExperimentalApi; @@ -105,12 +103,9 @@ public Iterator sortAndAggregateSegmentDocuments( public void build( List starTreeValuesSubs, AtomicInteger fieldNumberAcrossStarTrees, - DocValuesConsumer starTreeDocValuesConsumer, - MergeState mergeState, - Map sortedSetDocValuesMap + DocValuesConsumer starTreeDocValuesConsumer ) throws IOException { - setSortedSetDocValuesMap(sortedSetDocValuesMap); - build(mergeStarTrees(starTreeValuesSubs, mergeState), fieldNumberAcrossStarTrees, starTreeDocValuesConsumer); + build(mergeStarTrees(starTreeValuesSubs), fieldNumberAcrossStarTrees, starTreeDocValuesConsumer); } /** @@ -121,8 +116,9 @@ public void build( * @return iterator of star tree documents */ @Override - Iterator mergeStarTrees(List starTreeValuesSubs, MergeState mergeState) throws IOException { - return sortAndAggregateStarTreeDocuments(getSegmentsStarTreeDocuments(starTreeValuesSubs, mergeState), true); + Iterator mergeStarTrees(List starTreeValuesSubs) throws IOException { + this.isMerge = true; + return sortAndAggregateStarTreeDocuments(getSegmentsStarTreeDocuments(starTreeValuesSubs), true); } /** @@ -132,15 +128,15 @@ Iterator mergeStarTrees(List starTreeValuesSub * @param starTreeValuesSubs StarTreeValues from multiple segments * @return array of star tree documents */ - StarTreeDocument[] getSegmentsStarTreeDocuments(List starTreeValuesSubs, MergeState mergeState) throws IOException { + StarTreeDocument[] getSegmentsStarTreeDocuments(List starTreeValuesSubs) throws IOException { List starTreeDocuments = new ArrayList<>(); - Map ordinalMaps = getOrdinalMaps(starTreeValuesSubs, mergeState); + Map ordinalMaps = getOrdinalMaps(starTreeValuesSubs); int seg = 0; for (StarTreeValues starTreeValues : starTreeValuesSubs) { SequentialDocValuesIterator[] dimensionReaders = new SequentialDocValuesIterator[numDimensions]; List metricReaders = new ArrayList<>(); AtomicInteger numSegmentDocs = new AtomicInteger(); - setReadersAndNumSegmentDocs(dimensionReaders, metricReaders, numSegmentDocs, starTreeValues); + setReadersAndNumSegmentDocsDuringMerge(dimensionReaders, metricReaders, numSegmentDocs, starTreeValues); int currentDocId = 0; Map longValuesMap = new LinkedHashMap<>(); for (Map.Entry entry : ordinalMaps.entrySet()) { diff --git a/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/builder/StarTreeBuilder.java b/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/builder/StarTreeBuilder.java index 6df8d1259835d..038164c9c842d 100644 --- a/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/builder/StarTreeBuilder.java +++ b/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/builder/StarTreeBuilder.java @@ -10,8 +10,6 @@ import org.apache.lucene.codecs.DocValuesConsumer; import org.apache.lucene.codecs.DocValuesProducer; -import org.apache.lucene.index.MergeState; -import org.apache.lucene.index.SortedSetDocValues; import org.opensearch.common.annotation.ExperimentalApi; import org.opensearch.index.compositeindex.datacube.startree.index.StarTreeValues; @@ -49,13 +47,12 @@ void build( * @param starTreeValuesSubs contains the star tree values from multiple segments * @param fieldNumberAcrossStarTrees maintains the unique field number across the fields in the star tree * @param starTreeDocValuesConsumer consumer of star-tree doc values + * * @throws IOException when we are unable to build star-tree */ void build( List starTreeValuesSubs, AtomicInteger fieldNumberAcrossStarTrees, - DocValuesConsumer starTreeDocValuesConsumer, - MergeState mergeState, - Map fieldDocIdSetIteratorMap + DocValuesConsumer starTreeDocValuesConsumer ) throws IOException; } diff --git a/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/builder/StarTreesBuilder.java b/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/builder/StarTreesBuilder.java index f975d0c8888c0..3d1a780c1c7ef 100644 --- a/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/builder/StarTreesBuilder.java +++ b/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/builder/StarTreesBuilder.java @@ -12,9 +12,7 @@ import org.apache.logging.log4j.Logger; import org.apache.lucene.codecs.DocValuesConsumer; import org.apache.lucene.codecs.DocValuesProducer; -import org.apache.lucene.index.MergeState; import org.apache.lucene.index.SegmentWriteState; -import org.apache.lucene.index.SortedSetDocValues; import org.apache.lucene.store.IndexOutput; import org.opensearch.common.annotation.ExperimentalApi; import org.opensearch.index.compositeindex.datacube.startree.StarTreeField; @@ -112,15 +110,12 @@ public void close() throws IOException { * @param dataOut an IndexInput for star-tree data * @param starTreeValuesSubsPerField starTreeValuesSubs per field * @param starTreeDocValuesConsumer a consumer to write star-tree doc values - * @param fieldDocIdSetIteratorMap */ public void buildDuringMerge( IndexOutput metaOut, IndexOutput dataOut, final Map> starTreeValuesSubsPerField, - DocValuesConsumer starTreeDocValuesConsumer, - MergeState mergeState, - Map fieldDocIdSetIteratorMap + DocValuesConsumer starTreeDocValuesConsumer ) throws IOException { logger.debug("Starting merge of {} star-trees with star-tree fields", starTreeValuesSubsPerField.size()); long startTime = System.currentTimeMillis(); @@ -132,16 +127,10 @@ public void buildDuringMerge( } StarTreeField starTreeField = starTreeValuesList.get(0).getStarTreeField(); try (StarTreeBuilder builder = getStarTreeBuilder(metaOut, dataOut, starTreeField, state, mapperService)) { - builder.build( - starTreeValuesList, - fieldNumberAcrossStarTrees, - starTreeDocValuesConsumer, - mergeState, - fieldDocIdSetIteratorMap - ); + builder.build(starTreeValuesList, fieldNumberAcrossStarTrees, starTreeDocValuesConsumer); } } - logger.info( + logger.debug( "Took {} ms to merge {} star-trees with star-tree fields", System.currentTimeMillis() - startTime, starTreeValuesSubsPerField.size() diff --git a/server/src/test/java/org/opensearch/index/codec/composite/SortedSetDocValuesWriterWrapperTests.java b/server/src/test/java/org/opensearch/index/codec/composite/SortedSetDocValuesWriterWrapperTests.java new file mode 100644 index 0000000000000..b0fdd712beafb --- /dev/null +++ b/server/src/test/java/org/opensearch/index/codec/composite/SortedSetDocValuesWriterWrapperTests.java @@ -0,0 +1,98 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.codec.composite; + +import org.apache.lucene.index.DocValuesType; +import org.apache.lucene.index.FieldInfo; +import org.apache.lucene.index.IndexOptions; +import org.apache.lucene.index.SortedSetDocValues; +import org.apache.lucene.index.SortedSetDocValuesWriterWrapper; +import org.apache.lucene.index.VectorEncoding; +import org.apache.lucene.index.VectorSimilarityFunction; +import org.apache.lucene.util.ByteBlockPool; +import org.apache.lucene.util.BytesRef; +import org.apache.lucene.util.Counter; +import org.opensearch.test.OpenSearchTestCase; + +import java.io.IOException; +import java.util.Collections; + +public class SortedSetDocValuesWriterWrapperTests extends OpenSearchTestCase { + + private SortedSetDocValuesWriterWrapper wrapper; + private FieldInfo fieldInfo; + private Counter counter; + + @Override + public void setUp() throws Exception { + super.setUp(); + fieldInfo = new FieldInfo( + "field", + 1, + false, + false, + true, + IndexOptions.NONE, + DocValuesType.NONE, + -1, + Collections.emptyMap(), + 0, + 0, + 0, + 0, + VectorEncoding.FLOAT32, + VectorSimilarityFunction.EUCLIDEAN, + false, + false + ); + counter = Counter.newCounter(); + ByteBlockPool.DirectTrackingAllocator byteBlockAllocator = new ByteBlockPool.DirectTrackingAllocator(counter); + ByteBlockPool docValuesBytePool = new ByteBlockPool(byteBlockAllocator); + wrapper = new SortedSetDocValuesWriterWrapper(fieldInfo, counter, docValuesBytePool); + } + + public void testAddValue() throws IOException { + wrapper.addValue(0, new BytesRef("text1")); + wrapper.addValue(1, new BytesRef("text2")); + wrapper.addValue(2, new BytesRef("text3")); + + SortedSetDocValues docValues = wrapper.getDocValues(); + assertNotNull(docValues); + + assertEquals(0, docValues.nextDoc()); + assertEquals(0, docValues.nextOrd()); + assertEquals(1, docValues.nextDoc()); + assertEquals(1, docValues.nextOrd()); + assertEquals(2, docValues.nextDoc()); + assertEquals(2, docValues.nextOrd()); + } + + public void testGetDocValues() { + SortedSetDocValues docValues = wrapper.getDocValues(); + assertNotNull(docValues); + } + + public void testMultipleValues() throws IOException { + wrapper.addValue(0, new BytesRef("text1")); + wrapper.addValue(0, new BytesRef("text2")); + wrapper.addValue(1, new BytesRef("text3")); + + SortedSetDocValues docValues = wrapper.getDocValues(); + assertNotNull(docValues); + + assertEquals(0, docValues.nextDoc()); + assertEquals(0, docValues.nextOrd()); + assertEquals(1, docValues.nextOrd()); + assertEquals(-1, docValues.nextOrd()); + + assertEquals(1, docValues.nextDoc()); + assertEquals(2, docValues.nextOrd()); + assertEquals(-1, docValues.nextOrd()); + } +} diff --git a/server/src/test/java/org/opensearch/index/codec/composite912/datacube/startree/StarTreeKeywordDocValuesFormatTests.java b/server/src/test/java/org/opensearch/index/codec/composite912/datacube/startree/StarTreeKeywordDocValuesFormatTests.java index bf7e7a25ebdbd..f04e209cc6c02 100644 --- a/server/src/test/java/org/opensearch/index/codec/composite912/datacube/startree/StarTreeKeywordDocValuesFormatTests.java +++ b/server/src/test/java/org/opensearch/index/codec/composite912/datacube/startree/StarTreeKeywordDocValuesFormatTests.java @@ -9,12 +9,15 @@ package org.opensearch.index.codec.composite912.datacube.startree; import org.apache.lucene.document.Document; +import org.apache.lucene.document.Field; import org.apache.lucene.document.SortedNumericDocValuesField; import org.apache.lucene.document.SortedSetDocValuesField; +import org.apache.lucene.document.StringField; import org.apache.lucene.index.DirectoryReader; import org.apache.lucene.index.IndexWriterConfig; import org.apache.lucene.index.LeafReaderContext; import org.apache.lucene.index.SegmentReader; +import org.apache.lucene.index.Term; import org.apache.lucene.store.Directory; import org.apache.lucene.tests.index.RandomIndexWriter; import org.apache.lucene.tests.util.LuceneTestCase; @@ -28,10 +31,15 @@ import org.opensearch.index.compositeindex.datacube.startree.StarTreeFieldConfiguration; import org.opensearch.index.compositeindex.datacube.startree.StarTreeTestUtils; import org.opensearch.index.compositeindex.datacube.startree.index.StarTreeValues; +import org.opensearch.index.compositeindex.datacube.startree.utils.iterator.SortedSetStarTreeValuesIterator; import org.opensearch.index.mapper.NumberFieldMapper; import java.io.IOException; +import java.util.HashMap; +import java.util.HashSet; import java.util.List; +import java.util.Map; +import java.util.Set; import static org.opensearch.index.compositeindex.CompositeIndexConstants.STAR_TREE_DOCS_COUNT; import static org.opensearch.index.compositeindex.datacube.startree.StarTreeTestUtils.assertStarTreeDocuments; @@ -52,44 +60,46 @@ public void testStarTreeKeywordDocValues() throws IOException { conf.setMergePolicy(newLogMergePolicy()); RandomIndexWriter iw = new RandomIndexWriter(random(), directory, conf); Document doc = new Document(); + doc.add(new StringField("_id", "1", Field.Store.NO)); doc.add(new SortedNumericDocValuesField("sndv", 1)); doc.add(new SortedSetDocValuesField("keyword1", new BytesRef("text1"))); doc.add(new SortedSetDocValuesField("keyword2", new BytesRef("text2"))); iw.addDocument(doc); doc = new Document(); + doc.add(new StringField("_id", "2", Field.Store.NO)); doc.add(new SortedNumericDocValuesField("sndv", 1)); doc.add(new SortedSetDocValuesField("keyword1", new BytesRef("text11"))); doc.add(new SortedSetDocValuesField("keyword2", new BytesRef("text22"))); iw.addDocument(doc); - iw.forceMerge(1); + iw.flush(); + iw.deleteDocuments(new Term("_id", "2")); + iw.flush(); doc = new Document(); + doc.add(new StringField("_id", "3", Field.Store.NO)); doc.add(new SortedNumericDocValuesField("sndv", 2)); doc.add(new SortedSetDocValuesField("keyword1", new BytesRef("text1"))); doc.add(new SortedSetDocValuesField("keyword2", new BytesRef("text2"))); iw.addDocument(doc); doc = new Document(); + doc.add(new StringField("_id", "4", Field.Store.NO)); doc.add(new SortedNumericDocValuesField("sndv", 2)); doc.add(new SortedSetDocValuesField("keyword1", new BytesRef("text11"))); doc.add(new SortedSetDocValuesField("keyword2", new BytesRef("text22"))); iw.addDocument(doc); + iw.flush(); + iw.deleteDocuments(new Term("_id", "4")); + iw.flush(); iw.forceMerge(1); + iw.close(); DirectoryReader ir = maybeWrapWithMergingReader(DirectoryReader.open(directory)); TestUtil.checkReader(ir); assertEquals(1, ir.leaves().size()); - // Segment documents - /** - * sndv dv field - * [1, 1, -1] - * [1, 1, -1] - * [2, 2, -2] - * [2, 2, -2] - */ // Star tree documents /** - SortedSet dv | [ sum, value_count, min, max[sndv]] , doc_count + keyword1 keyword2 | [ sum, value_count, min, max[sndv]] , doc_count [0, 0] | [3.0, 2.0, 1.0, 2.0, 2.0] [1, 1] | [3.0, 2.0, 1.0, 2.0, 2.0] [null, 0] | [3.0, 2.0, 1.0, 2.0, 2.0] @@ -128,6 +138,107 @@ public void testStarTreeKeywordDocValues() throws IOException { directory.close(); } + public void testStarTreeKeywordDocValuesWithDeletions() throws IOException { + Directory directory = newDirectory(); + IndexWriterConfig conf = newIndexWriterConfig(null); + conf.setMergePolicy(newLogMergePolicy()); + RandomIndexWriter iw = new RandomIndexWriter(random(), directory, conf); + + int iterations = 3; + Set allIds = new HashSet<>(); + Map documents = new HashMap<>(); + Map map = new HashMap<>(); + for (int iter = 0; iter < iterations; iter++) { + // Add 10 documents + for (int i = 0; i < 10; i++) { + String id = String.valueOf(random().nextInt()); + allIds.add(id); + Document doc = new Document(); + doc.add(new StringField("_id", id, Field.Store.YES)); + int sndvValue = random().nextInt(5) + 1; + doc.add(new SortedNumericDocValuesField("sndv", sndvValue)); + + String keyword1Value = "text" + random().nextInt(3); + + doc.add(new SortedSetDocValuesField("keyword1", new BytesRef(keyword1Value))); + String keyword2Value = "text" + random().nextInt(3); + + doc.add(new SortedSetDocValuesField("keyword2", new BytesRef(keyword2Value))); + map.put(keyword1Value + "-" + keyword2Value, sndvValue + map.getOrDefault(keyword1Value + "-" + keyword2Value, 0)); + iw.addDocument(doc); + documents.put(id, doc); + } + + iw.flush(); + + // Delete random number of documents + int docsToDelete = random().nextInt(5); // Delete up to 5 documents + for (int i = 0; i < docsToDelete; i++) { + if (!allIds.isEmpty()) { + String idToDelete = allIds.iterator().next(); + iw.deleteDocuments(new Term("_id", idToDelete)); + allIds.remove(idToDelete); + documents.remove(idToDelete); + } + } + + iw.flush(); + } + + iw.forceMerge(1); + iw.close(); + + DirectoryReader ir = maybeWrapWithMergingReader(DirectoryReader.open(directory)); + TestUtil.checkReader(ir); + assertEquals(1, ir.leaves().size()); + + // Assert star tree documents + for (LeafReaderContext context : ir.leaves()) { + SegmentReader reader = Lucene.segmentReader(context.reader()); + CompositeIndexReader starTreeDocValuesReader = (CompositeIndexReader) reader.getDocValuesReader(); + List compositeIndexFields = starTreeDocValuesReader.getCompositeIndexFields(); + + for (CompositeIndexFieldInfo compositeIndexFieldInfo : compositeIndexFields) { + StarTreeValues starTreeValues = (StarTreeValues) starTreeDocValuesReader.getCompositeIndexValues(compositeIndexFieldInfo); + StarTreeDocument[] actualStarTreeDocuments = StarTreeTestUtils.getSegmentsStarTreeDocuments( + List.of(starTreeValues), + List.of( + NumberFieldMapper.NumberType.DOUBLE, + NumberFieldMapper.NumberType.LONG, + NumberFieldMapper.NumberType.DOUBLE, + NumberFieldMapper.NumberType.DOUBLE, + NumberFieldMapper.NumberType.LONG + ), + Integer.parseInt(starTreeValues.getAttributes().get(STAR_TREE_DOCS_COUNT)) + ); + SortedSetStarTreeValuesIterator k1 = (SortedSetStarTreeValuesIterator) starTreeValues.getDimensionValuesIterator( + "keyword1" + ); + SortedSetStarTreeValuesIterator k2 = (SortedSetStarTreeValuesIterator) starTreeValues.getDimensionValuesIterator( + "keyword2" + ); + for (StarTreeDocument starDoc : actualStarTreeDocuments) { + String keyword1 = null; + if (starDoc.dimensions[0] != null) { + keyword1 = k1.lookupOrd(starDoc.dimensions[0]).utf8ToString(); + } + + String keyword2 = null; + if (starDoc.dimensions[1] != null) { + keyword2 = k2.lookupOrd(starDoc.dimensions[1]).utf8ToString(); + } + double metric = (double) starDoc.metrics[0]; + if (map.containsKey(keyword1 + "-" + keyword2)) { + assertEquals((int) map.get(keyword1 + "-" + keyword2), (int) metric); + } + } + } + } + + ir.close(); + directory.close(); + } + public void testStarKeywordDocValuesWithMissingDocs() throws IOException { Directory directory = newDirectory(); IndexWriterConfig conf = newIndexWriterConfig(null); @@ -159,17 +270,9 @@ public void testStarKeywordDocValuesWithMissingDocs() throws IOException { TestUtil.checkReader(ir); assertEquals(1, ir.leaves().size()); - // Segment documents - /** - * sndv dv field - * [1, 1, -1] - * [1, 1, -1] - * [2, 2, -2] - * [2, 2, -2] - */ // Star tree documents /** - * sndv dv | [ sum, value_count, min, max[sndv]] , doc_count + * keyword1 keyword2 | [ sum, value_count, min, max[sndv]] , doc_count [0, 0] | [2.0, 1.0, 2.0, 2.0, 1.0] [1, 1] | [2.0, 1.0, 2.0, 2.0, 1.0] [null, 0] | [1.0, 1.0, 1.0, 1.0, 1.0] @@ -243,17 +346,9 @@ public void testStarKeywordDocValuesWithMissingDocsInSegment() throws IOExceptio TestUtil.checkReader(ir); assertEquals(1, ir.leaves().size()); - // Segment documents - /** - * sndv dv field - * [1, 1, -1] - * [1, 1, -1] - * [2, 2, -2] - * [2, 2, -2] - */ // Star tree documents /** - * sndv dv | [ sum, value_count, min, max[sndv]] , doc_count + * keyword1 keyword2 | [ sum, value_count, min, max[sndv]] , doc_count [0, 0] | [2.0, 1.0, 2.0, 2.0, 1.0] [1, 1] | [2.0, 1.0, 2.0, 2.0, 1.0] [null, null] | [2.0, 2.0, 1.0, 1.0, 2.0] // This is for missing doc @@ -321,17 +416,9 @@ public void testStarKeywordDocValuesWithMissingDocsInAllSegments() throws IOExce TestUtil.checkReader(ir); assertEquals(1, ir.leaves().size()); - // Segment documents - /** - * sndv dv field - * [1, 1, -1] - * [1, 1, -1] - * [2, 2, -2] - * [2, 2, -2] - */ // Star tree documents /** - * sndv dv | [ sum, value_count, min, max[sndv]] , doc_count + * keyword1 keyword2 | [ sum, value_count, min, max[sndv]] , doc_count [null, null] | [6.0, 4.0, 1.0, 2.0, 4.0] */ @@ -389,17 +476,9 @@ public void testStarKeywordDocValuesWithMissingDocsInMixedSegments() throws IOEx TestUtil.checkReader(ir); assertEquals(1, ir.leaves().size()); - // Segment documents - /** - * sndv dv field - * [1, 1, -1] - * [1, 1, -1] - * [2, 2, -2] - * [2, 2, -2] - */ // Star tree documents /** - * sndv dv | [ sum, value_count, min, max[sndv]] , doc_count + * keyword1 keyword2 | [ sum, value_count, min, max[sndv]] , doc_count [0, 0] | [2.0, 1.0, 2.0, 2.0, 1.0] [1, 1] | [2.0, 1.0, 2.0, 2.0, 1.0] [null, 0] | [1.0, 1.0, 1.0, 1.0, 1.0] diff --git a/server/src/test/java/org/opensearch/index/compositeindex/datacube/startree/StarTreeTestUtils.java b/server/src/test/java/org/opensearch/index/compositeindex/datacube/startree/StarTreeTestUtils.java index dc8b3320f3de2..cdab20acf7fef 100644 --- a/server/src/test/java/org/opensearch/index/compositeindex/datacube/startree/StarTreeTestUtils.java +++ b/server/src/test/java/org/opensearch/index/compositeindex/datacube/startree/StarTreeTestUtils.java @@ -99,6 +99,7 @@ public static StarTreeDocument getStarTreeDocument( ) throws IOException { Long[] dims = new Long[dimensionReaders.length]; int i = 0; + for (SequentialDocValuesIterator dimensionDocValueIterator : dimensionReaders) { dimensionDocValueIterator.nextEntry(currentDocId); Long val = dimensionDocValueIterator.value(currentDocId); diff --git a/server/src/test/java/org/opensearch/index/compositeindex/datacube/startree/builder/BaseStarTreeBuilderTests.java b/server/src/test/java/org/opensearch/index/compositeindex/datacube/startree/builder/BaseStarTreeBuilderTests.java index b3ce145c6a0a6..ac729f6392f63 100644 --- a/server/src/test/java/org/opensearch/index/compositeindex/datacube/startree/builder/BaseStarTreeBuilderTests.java +++ b/server/src/test/java/org/opensearch/index/compositeindex/datacube/startree/builder/BaseStarTreeBuilderTests.java @@ -16,10 +16,8 @@ import org.apache.lucene.index.FieldInfos; import org.apache.lucene.index.IndexFileNames; import org.apache.lucene.index.IndexOptions; -import org.apache.lucene.index.MergeState; import org.apache.lucene.index.SegmentInfo; import org.apache.lucene.index.SegmentWriteState; -import org.apache.lucene.index.SortedSetDocValues; import org.apache.lucene.index.VectorEncoding; import org.apache.lucene.index.VectorSimilarityFunction; import org.apache.lucene.store.Directory; @@ -185,9 +183,7 @@ public static void setup() throws IOException { public void build( List starTreeValuesSubs, AtomicInteger fieldNumberAcrossStarTrees, - DocValuesConsumer starTreeDocValuesConsumer, - MergeState mergeState, - Map fieldDocIdSetIteratorMap + DocValuesConsumer starTreeDocValuesConsumer ) throws IOException {} @Override @@ -205,7 +201,7 @@ public List getStarTreeDocuments() { @Override public Long getDimensionValue(int docId, int dimensionId) throws IOException { - return 0l; + return 0L; } @Override @@ -223,7 +219,7 @@ public Iterator generateStarTreeDocumentsForStarNode(int start } @Override - Iterator mergeStarTrees(List starTreeValues, MergeState mergeState) throws IOException { + Iterator mergeStarTrees(List starTreeValues) throws IOException { return null; } }; diff --git a/server/src/test/java/org/opensearch/index/compositeindex/datacube/startree/builder/BuilderTestsUtils.java b/server/src/test/java/org/opensearch/index/compositeindex/datacube/startree/builder/BuilderTestsUtils.java index f9c5bb3e86975..077bf0422ab50 100644 --- a/server/src/test/java/org/opensearch/index/compositeindex/datacube/startree/builder/BuilderTestsUtils.java +++ b/server/src/test/java/org/opensearch/index/compositeindex/datacube/startree/builder/BuilderTestsUtils.java @@ -153,6 +153,10 @@ public long cost() { } public static SortedSetDocValues getSortedSetMock(List dimList, List docsWithField) { + return getSortedSetMock(dimList, docsWithField, 1); + } + + public static SortedSetDocValues getSortedSetMock(List dimList, List docsWithField, int valueCount) { return new SortedSetDocValues() { int index = -1; @@ -173,7 +177,7 @@ public BytesRef lookupOrd(long l) throws IOException { @Override public long getValueCount() { - return 0; + return valueCount; } @Override diff --git a/server/src/test/java/org/opensearch/index/compositeindex/datacube/startree/builder/StarTreeBuilderFlushFlowTests.java b/server/src/test/java/org/opensearch/index/compositeindex/datacube/startree/builder/StarTreeBuilderFlushFlowTests.java index b70ae2247aadc..440268f1f803c 100644 --- a/server/src/test/java/org/opensearch/index/compositeindex/datacube/startree/builder/StarTreeBuilderFlushFlowTests.java +++ b/server/src/test/java/org/opensearch/index/compositeindex/datacube/startree/builder/StarTreeBuilderFlushFlowTests.java @@ -447,10 +447,10 @@ public void testFlushFlowForKeywords() throws IOException { * Asserting following dim / metrics [ dim1, dim2 / Sum [metric], count [metric] ] [0, 0] | [0.0, 1] [1, 1] | [10.0, 1] + [2, 2] | [20.0, 1] [3, 3] | [30.0, 1] [4, 4] | [40.0, 1] [5, 5] | [50.0, 1] - [null, 2] | [20.0, 1] */ SegmentWriteState w = getWriteState(DocIdSetIterator.NO_MORE_DOCS, writeState.segmentInfo.getId()); @@ -464,7 +464,7 @@ public void testFlushFlowForKeywords() throws IOException { Map dv = new LinkedHashMap<>(); dv.put("field1", getSortedSetMock(dimList, docsWithField)); dv.put("field3", getSortedSetMock(dimList2, docsWithField2)); - builder.setSortedSetDocValuesMap(dv); + builder.setFlushSortedSetDocValuesMap(dv); builder.build(starTreeDocumentIterator, new AtomicInteger(), docValuesConsumer); List starTreeDocuments = builder.getStarTreeDocuments(); diff --git a/server/src/test/java/org/opensearch/index/compositeindex/datacube/startree/builder/StarTreeBuilderMergeFlowTests.java b/server/src/test/java/org/opensearch/index/compositeindex/datacube/startree/builder/StarTreeBuilderMergeFlowTests.java index ed5860838723e..be16961e781db 100644 --- a/server/src/test/java/org/opensearch/index/compositeindex/datacube/startree/builder/StarTreeBuilderMergeFlowTests.java +++ b/server/src/test/java/org/opensearch/index/compositeindex/datacube/startree/builder/StarTreeBuilderMergeFlowTests.java @@ -12,6 +12,7 @@ import org.apache.lucene.index.DocValuesType; import org.apache.lucene.index.SegmentWriteState; import org.apache.lucene.index.SortedNumericDocValues; +import org.apache.lucene.index.SortedSetDocValues; import org.apache.lucene.search.DocIdSetIterator; import org.opensearch.common.settings.Settings; import org.opensearch.index.codec.composite.LuceneDocValuesConsumerFactory; @@ -27,6 +28,7 @@ import org.opensearch.index.compositeindex.datacube.startree.fileformats.meta.StarTreeMetadata; import org.opensearch.index.compositeindex.datacube.startree.index.StarTreeValues; import org.opensearch.index.compositeindex.datacube.startree.utils.iterator.SortedNumericStarTreeValuesIterator; +import org.opensearch.index.compositeindex.datacube.startree.utils.iterator.SortedSetStarTreeValuesIterator; import org.opensearch.index.compositeindex.datacube.startree.utils.iterator.StarTreeValuesIterator; import org.opensearch.index.mapper.ContentPath; import org.opensearch.index.mapper.DocumentMapper; @@ -50,6 +52,7 @@ import static org.opensearch.index.compositeindex.CompositeIndexConstants.SEGMENT_DOCS_COUNT; import static org.opensearch.index.compositeindex.datacube.startree.builder.BuilderTestsUtils.getSortedNumericMock; +import static org.opensearch.index.compositeindex.datacube.startree.builder.BuilderTestsUtils.getSortedSetMock; import static org.opensearch.index.compositeindex.datacube.startree.builder.BuilderTestsUtils.traverseStarTree; import static org.opensearch.index.compositeindex.datacube.startree.builder.BuilderTestsUtils.validateStarTree; import static org.opensearch.index.compositeindex.datacube.startree.utils.StarTreeUtils.fullyQualifiedFieldNameForStarTreeMetricsDocValues; @@ -202,7 +205,7 @@ public void testMergeFlow() throws IOException { Composite912DocValuesFormat.META_DOC_VALUES_EXTENSION ); builder = getStarTreeBuilder(metaOut, dataOut, compositeField, writeState, mapperService); - Iterator starTreeDocumentIterator = builder.mergeStarTrees(List.of(starTreeValues, starTreeValues2), mergeState); + Iterator starTreeDocumentIterator = builder.mergeStarTrees(List.of(starTreeValues, starTreeValues2)); /** [0, 0, 0, 0] | [0.0, 2] [1, 1, 1, 1] | [20.0, 2] @@ -318,7 +321,7 @@ public void testMergeFlowWithSum() throws IOException { Composite912DocValuesFormat.META_DOC_VALUES_EXTENSION ); builder = getStarTreeBuilder(metaOut, dataOut, compositeField, writeState, mapperService); - Iterator starTreeDocumentIterator = builder.mergeStarTrees(List.of(starTreeValues, starTreeValues2), mergeState); + Iterator starTreeDocumentIterator = builder.mergeStarTrees(List.of(starTreeValues, starTreeValues2)); /** * Asserting following dim / metrics [ dim1, dim2 / Sum [ metric] ] * [0, 0] | [0.0] @@ -396,7 +399,7 @@ public void testMergeFlowWithCount() throws IOException { Composite912DocValuesFormat.META_DOC_VALUES_EXTENSION ); builder = getStarTreeBuilder(metaOut, dataOut, compositeField, writeState, mapperService); - Iterator starTreeDocumentIterator = builder.mergeStarTrees(List.of(starTreeValues, starTreeValues2), mergeState); + Iterator starTreeDocumentIterator = builder.mergeStarTrees(List.of(starTreeValues, starTreeValues2)); /** * Asserting following dim / metrics [ dim1, dim2 / Count [ metric] ] [0, 0] | [0] @@ -472,7 +475,7 @@ public void testMergeFlowNumSegmentsDocs() throws IOException { "4" ); builder = getStarTreeBuilder(metaOut, dataOut, sf, getWriteState(4, writeState.segmentInfo.getId()), mapperService); - Iterator starTreeDocumentIterator = builder.mergeStarTrees(List.of(starTreeValues, starTreeValues2), mergeState); + Iterator starTreeDocumentIterator = builder.mergeStarTrees(List.of(starTreeValues, starTreeValues2)); /** * Asserting following dim / metrics [ dim1, dim2 / Count [ metric] ] [0, 0] | [0] @@ -540,7 +543,7 @@ public void testMergeFlowWithMissingDocs() throws IOException { Composite912DocValuesFormat.META_DOC_VALUES_EXTENSION ); builder = getStarTreeBuilder(metaOut, dataOut, compositeField, writeState, mapperService); - Iterator starTreeDocumentIterator = builder.mergeStarTrees(List.of(starTreeValues, starTreeValues2), mergeState); + Iterator starTreeDocumentIterator = builder.mergeStarTrees(List.of(starTreeValues, starTreeValues2)); /** * Asserting following dim / metrics [ dim1, dim2 / Count [ metric] ] [0, 0] | [0] @@ -629,7 +632,7 @@ public void testMergeFlowWithMissingDocsWithZero() throws IOException { Composite912DocValuesFormat.META_DOC_VALUES_EXTENSION ); builder = getStarTreeBuilder(metaOut, dataOut, compositeField, writeState, mapperService); - Iterator starTreeDocumentIterator = builder.mergeStarTrees(List.of(starTreeValues, starTreeValues2), mergeState); + Iterator starTreeDocumentIterator = builder.mergeStarTrees(List.of(starTreeValues, starTreeValues2)); /** * Asserting following dim / metrics [ dim1, dim2 / Count [ metric] ] [0, 0] | [9] @@ -718,7 +721,7 @@ public void testMergeFlowWithMissingDocsWithZeroComplexCase() throws IOException Composite912DocValuesFormat.META_DOC_VALUES_EXTENSION ); builder = getStarTreeBuilder(metaOut, dataOut, compositeField, writeState, mapperService); - Iterator starTreeDocumentIterator = builder.mergeStarTrees(List.of(starTreeValues, starTreeValues2), mergeState); + Iterator starTreeDocumentIterator = builder.mergeStarTrees(List.of(starTreeValues, starTreeValues2)); /** * Asserting following dim / metrics [ dim1, dim2 / Count [ metric] ] [0, 0] | [9] @@ -811,7 +814,7 @@ public void testMergeFlowWithMissingDocsInSecondDim() throws IOException { Composite912DocValuesFormat.META_DOC_VALUES_EXTENSION ); builder = getStarTreeBuilder(metaOut, dataOut, compositeField, writeState, mapperService); - Iterator starTreeDocumentIterator = builder.mergeStarTrees(List.of(starTreeValues, starTreeValues2), mergeState); + Iterator starTreeDocumentIterator = builder.mergeStarTrees(List.of(starTreeValues, starTreeValues2)); /** * Asserting following dim / metrics [ dim1, dim2 / Count [ metric] ] [0, 0] | [0] @@ -899,7 +902,7 @@ public void testMergeFlowWithDocsMissingAtTheEnd() throws IOException { Composite912DocValuesFormat.META_DOC_VALUES_EXTENSION ); builder = getStarTreeBuilder(metaOut, dataOut, compositeField, writeState, mapperService); - Iterator starTreeDocumentIterator = builder.mergeStarTrees(List.of(starTreeValues, starTreeValues2), mergeState); + Iterator starTreeDocumentIterator = builder.mergeStarTrees(List.of(starTreeValues, starTreeValues2)); /** * Asserting following dim / metrics [ dim1, dim2 / Count [ metric] ] [0, 0] | [0] @@ -979,7 +982,7 @@ public void testMergeFlowWithEmptyFieldsInOneSegment() throws IOException { Composite912DocValuesFormat.META_DOC_VALUES_EXTENSION ); builder = getStarTreeBuilder(metaOut, dataOut, compositeField, writeState, mapperService); - Iterator starTreeDocumentIterator = builder.mergeStarTrees(List.of(starTreeValues, starTreeValues2), mergeState); + Iterator starTreeDocumentIterator = builder.mergeStarTrees(List.of(starTreeValues, starTreeValues2)); /** * Asserting following dim / metrics [ dim1, dim2 / Count [ metric] ] [0, 0] | [0] @@ -1110,7 +1113,7 @@ public void testMergeFlowWithDuplicateDimensionValues() throws IOException { Composite912DocValuesFormat.META_DOC_VALUES_EXTENSION ); builder = getStarTreeBuilder(metaOut, dataOut, compositeField, writeState, mapperService); - builder.build(builder.mergeStarTrees(List.of(starTreeValues, starTreeValues2), mergeState), new AtomicInteger(), docValuesConsumer); + builder.build(builder.mergeStarTrees(List.of(starTreeValues, starTreeValues2)), new AtomicInteger(), docValuesConsumer); List starTreeDocuments = builder.getStarTreeDocuments(); assertEquals(401, starTreeDocuments.size()); int count = 0; @@ -1258,7 +1261,7 @@ public void testMergeFlowWithMaxLeafDocs() throws IOException { Composite912DocValuesFormat.META_DOC_VALUES_EXTENSION ); builder = getStarTreeBuilder(metaOut, dataOut, compositeField, writeState, mapperService); - builder.build(builder.mergeStarTrees(List.of(starTreeValues, starTreeValues2), mergeState), new AtomicInteger(), docValuesConsumer); + builder.build(builder.mergeStarTrees(List.of(starTreeValues, starTreeValues2)), new AtomicInteger(), docValuesConsumer); List starTreeDocuments = builder.getStarTreeDocuments(); /** 635 docs get generated @@ -1376,7 +1379,7 @@ public void testMergeFlowWithDifferentDocsFromSegments() throws IOException { Composite912DocValuesFormat.META_DOC_VALUES_EXTENSION ); builder = getStarTreeBuilder(metaOut, dataOut, compositeField, writeState, mapperService); - Iterator starTreeDocumentIterator = builder.mergeStarTrees(List.of(starTreeValues, starTreeValues2), mergeState); + Iterator starTreeDocumentIterator = builder.mergeStarTrees(List.of(starTreeValues, starTreeValues2)); /** * Asserting following dim / metrics [ dim1, dim2 / Count [ metric] ] [0, 0] | [0] @@ -1515,7 +1518,7 @@ public void testMergeFlowWithDuplicateDimensionValueWithMaxLeafDocs() throws IOE Composite912DocValuesFormat.META_DOC_VALUES_EXTENSION ); builder = getStarTreeBuilder(metaOut, dataOut, compositeField, writeState, mapperService); - builder.build(builder.mergeStarTrees(List.of(starTreeValues, starTreeValues2), mergeState), new AtomicInteger(), docValuesConsumer); + builder.build(builder.mergeStarTrees(List.of(starTreeValues, starTreeValues2)), new AtomicInteger(), docValuesConsumer); List starTreeDocuments = builder.getStarTreeDocuments(); assertEquals(401, starTreeDocuments.size()); validateStarTree(builder.getRootNode(), 4, compositeField.getStarTreeConfig().maxLeafDocs(), builder.getStarTreeDocuments()); @@ -1644,7 +1647,7 @@ public void testMergeFlowWithMaxLeafDocsAndStarTreeNodesAssertion() throws IOExc Composite912DocValuesFormat.META_DOC_VALUES_EXTENSION ); builder = getStarTreeBuilder(metaOut, dataOut, compositeField, writeState, mapperService); - builder.build(builder.mergeStarTrees(List.of(starTreeValues, starTreeValues2), mergeState), new AtomicInteger(), docValuesConsumer); + builder.build(builder.mergeStarTrees(List.of(starTreeValues, starTreeValues2)), new AtomicInteger(), docValuesConsumer); List starTreeDocuments = builder.getStarTreeDocuments(); Map> dimValueToDocIdMap = new HashMap<>(); traverseStarTree(builder.rootNode, dimValueToDocIdMap, true); @@ -1753,7 +1756,7 @@ public void testMergeFlowWithTimestamps() throws IOException { Composite912DocValuesFormat.META_DOC_VALUES_EXTENSION ); builder = getStarTreeBuilder(metaOut, dataOut, compositeField, getWriteState(4, writeState.segmentInfo.getId()), mapperService); - Iterator starTreeDocumentIterator = builder.mergeStarTrees(List.of(starTreeValues, starTreeValues2), mergeState); + Iterator starTreeDocumentIterator = builder.mergeStarTrees(List.of(starTreeValues, starTreeValues2)); /** [1655287972000, 1655287972000, 1655287972000, 3] | [30.0, 3] [1655288032000, 1655288032000, 1655288032000, 2] | [20.0, 2] @@ -1795,6 +1798,110 @@ public void testMergeFlowWithTimestamps() throws IOException { ); } + public void testMergeFlowWithKeywords() throws IOException { + List dimList = List.of(0L, 1L, 2L, 3L, 4L, 5L, 6L); + List docsWithField = List.of(0, 1, 2, 3, 4, 5, 6); + List dimList2 = List.of(0L, 1L, 2L, 3L, 4L, 5L, -1L); + List docsWithField2 = List.of(0, 1, 2, 3, 4, 5, 6); + List metricsList1 = List.of( + getLongFromDouble(0.0), + getLongFromDouble(10.0), + getLongFromDouble(20.0), + getLongFromDouble(30.0), + getLongFromDouble(40.0), + getLongFromDouble(50.0), + getLongFromDouble(60.0) + ); + List metricsWithField1 = List.of(0, 1, 2, 3, 4, 5, 6); + List metricsList = List.of(0L, 1L, 2L, 3L, 4L, 5L, 6L); + List metricsWithField = List.of(0, 1, 2, 3, 4, 5, 6); + + List dimList3 = List.of(0L, 1L, 2L, 3L, -1L); + List docsWithField3 = List.of(0, 1, 2, 3, 4); + List dimList4 = List.of(0L, 1L, 2L, 3L, -1L); + List docsWithField4 = List.of(0, 1, 2, 3, 4); + List metricsList21 = List.of( + getLongFromDouble(0.0), + getLongFromDouble(10.0), + getLongFromDouble(20.0), + getLongFromDouble(30.0), + getLongFromDouble(40.0) + ); + List metricsWithField21 = List.of(0, 1, 2, 3, 4); + List metricsList2 = List.of(0L, 1L, 2L, 3L, 4L); + List metricsWithField2 = List.of(0, 1, 2, 3, 4); + + compositeField = getStarTreeFieldWithKeywords(); + StarTreeValues starTreeValues = getStarTreeValuesWithKeywords( + getSortedSetMock(dimList, docsWithField), + getSortedSetMock(dimList2, docsWithField2), + getSortedNumericMock(metricsList, metricsWithField), + getSortedNumericMock(metricsList1, metricsWithField1), + compositeField, + "6" + ); + + StarTreeValues starTreeValues2 = getStarTreeValuesWithKeywords( + getSortedSetMock(dimList3, docsWithField3), + getSortedSetMock(dimList4, docsWithField4), + getSortedNumericMock(metricsList2, metricsWithField2), + getSortedNumericMock(metricsList21, metricsWithField21), + compositeField, + "4" + ); + this.docValuesConsumer = LuceneDocValuesConsumerFactory.getDocValuesConsumerForCompositeCodec( + writeState, + Composite912DocValuesFormat.DATA_DOC_VALUES_CODEC, + Composite912DocValuesFormat.DATA_DOC_VALUES_EXTENSION, + Composite912DocValuesFormat.META_DOC_VALUES_CODEC, + Composite912DocValuesFormat.META_DOC_VALUES_EXTENSION + ); + builder = getStarTreeBuilder(metaOut, dataOut, compositeField, getWriteState(4, writeState.segmentInfo.getId()), mapperService); + // Initialize the mock MergeState within the method + + Iterator starTreeDocumentIterator = builder.mergeStarTrees(List.of(starTreeValues, starTreeValues2)); + /** + [0, 0] | [5, 50.0] + [1, 1] | [7, 70.0] + [2, 2] | [9, 90.0] + [3, 3] | [11, 110.0] + [4, 4] | [4, 40.0] + [5, 5] | [5, 50.0] + */ + int count = 0; + builder.appendDocumentsToStarTree(starTreeDocumentIterator); + for (StarTreeDocument starTreeDocument : builder.getStarTreeDocuments()) { + count++; + if (count <= 4) { + assertEquals(starTreeDocument.dimensions[0] * 2, (long) starTreeDocument.metrics[0], 0); + assertEquals(starTreeDocument.dimensions[0] * 20.0, (double) starTreeDocument.metrics[1], 0); + } else { + assertEquals(starTreeDocument.dimensions[0], (long) starTreeDocument.metrics[0], 0); + assertEquals(starTreeDocument.dimensions[0] * 10.0, (double) starTreeDocument.metrics[1], 0); + } + } + assertEquals(6, count); + builder.build(starTreeDocumentIterator, new AtomicInteger(), docValuesConsumer); + validateStarTree(builder.getRootNode(), 4, 10, builder.getStarTreeDocuments()); + metaOut.close(); + dataOut.close(); + docValuesConsumer.close(); + + StarTreeMetadata starTreeMetadata = getStarTreeMetadata( + getStarTreeDimensionNames(compositeField.getDimensionsOrder()), + 6, + compositeField.getStarTreeConfig().maxLeafDocs(), + 264 + ); + + validateStarTreeFileFormats( + builder.getRootNode(), + builder.getStarTreeDocuments().size(), + starTreeMetadata, + builder.getStarTreeDocuments() + ); + } + private StarTreeValues getStarTreeValuesWithDates( SortedNumericDocValues dimList, SortedNumericDocValues dimList2, @@ -1876,6 +1983,93 @@ private StarTreeValues getStarTreeValues( return starTreeValues; } + private StarTreeValues getStarTreeValuesWithKeywords( + SortedSetDocValues dimList, + SortedSetDocValues dimList2, + SortedNumericDocValues metricsList, + SortedNumericDocValues metricsList1, + StarTreeField sf, + String number + ) { + SortedSetDocValues d1sndv = dimList; + SortedSetDocValues d2sndv = dimList2; + SortedNumericDocValues m1sndv = metricsList; + Map> dimDocIdSetIterators = Map.of( + "field1", + () -> new SortedSetStarTreeValuesIterator(d1sndv), + "field3", + () -> new SortedSetStarTreeValuesIterator(d2sndv) + ); + + Map> metricDocIdSetIterators = new LinkedHashMap<>(); + metricDocIdSetIterators.put( + fullyQualifiedFieldNameForStarTreeMetricsDocValues( + sf.getName(), + "field2", + sf.getMetrics().get(0).getMetrics().get(0).getTypeName() + ), + () -> new SortedNumericStarTreeValuesIterator(metricsList) + ); + metricDocIdSetIterators.put( + fullyQualifiedFieldNameForStarTreeMetricsDocValues( + sf.getName(), + "field2", + sf.getMetrics().get(0).getMetrics().get(1).getTypeName() + ), + () -> new SortedNumericStarTreeValuesIterator(metricsList1) + ); + StarTreeValues starTreeValues = new StarTreeValues( + sf, + null, + dimDocIdSetIterators, + metricDocIdSetIterators, + Map.of(CompositeIndexConstants.SEGMENT_DOCS_COUNT, number), + null + ); + return starTreeValues; + } + + private StarTreeValues getStarTreeValuesWithKeywords( + SortedSetDocValues dimList, + SortedSetDocValues dimList2, + SortedSetDocValues dimList4, + SortedSetDocValues dimList3, + SortedNumericDocValues metricsList, + SortedNumericDocValues metricsList1, + StarTreeField sf, + String number + ) { + Map> dimDocIdSetIterators = Map.of( + "field1_minute", + () -> new SortedSetStarTreeValuesIterator(dimList), + "field1_half-hour", + () -> new SortedSetStarTreeValuesIterator(dimList4), + "field1_hour", + () -> new SortedSetStarTreeValuesIterator(dimList2), + "field3", + () -> new SortedSetStarTreeValuesIterator(dimList3) + ); + Map> metricDocIdSetIterators = new LinkedHashMap<>(); + + metricDocIdSetIterators.put( + fullyQualifiedFieldNameForStarTreeMetricsDocValues( + sf.getName(), + "field2", + sf.getMetrics().get(0).getMetrics().get(0).getTypeName() + ), + () -> new SortedNumericStarTreeValuesIterator(metricsList) + ); + metricDocIdSetIterators.put( + fullyQualifiedFieldNameForStarTreeMetricsDocValues( + sf.getName(), + "field2", + sf.getMetrics().get(0).getMetrics().get(1).getTypeName() + ), + () -> new SortedNumericStarTreeValuesIterator(metricsList1) + ); + return new StarTreeValues(sf, null, dimDocIdSetIterators, metricDocIdSetIterators, Map.of(SEGMENT_DOCS_COUNT, number), null); + } + private StarTreeValues getStarTreeValues( List dimList1, List docsWithField1, diff --git a/server/src/test/java/org/opensearch/index/compositeindex/datacube/startree/builder/StarTreeBuilderTestCase.java b/server/src/test/java/org/opensearch/index/compositeindex/datacube/startree/builder/StarTreeBuilderTestCase.java index f0abc2a8aa580..9c9beaea4f52c 100644 --- a/server/src/test/java/org/opensearch/index/compositeindex/datacube/startree/builder/StarTreeBuilderTestCase.java +++ b/server/src/test/java/org/opensearch/index/compositeindex/datacube/startree/builder/StarTreeBuilderTestCase.java @@ -32,6 +32,7 @@ import org.opensearch.index.compositeindex.datacube.DataCubeDateTimeUnit; import org.opensearch.index.compositeindex.datacube.DateDimension; import org.opensearch.index.compositeindex.datacube.Dimension; +import org.opensearch.index.compositeindex.datacube.KeywordDimension; import org.opensearch.index.compositeindex.datacube.Metric; import org.opensearch.index.compositeindex.datacube.MetricStat; import org.opensearch.index.compositeindex.datacube.NumericDimension; @@ -351,6 +352,17 @@ protected StarTreeMetadata getStarTreeMetadata( ); } + protected StarTreeField getStarTreeFieldWithKeywords() { + Dimension d1 = new KeywordDimension("field1"); + Dimension d2 = new KeywordDimension("field3"); + Metric m1 = new Metric("field2", List.of(MetricStat.VALUE_COUNT, MetricStat.SUM)); + List dims = List.of(d1, d2); + List metrics = List.of(m1); + StarTreeFieldConfiguration c = new StarTreeFieldConfiguration(10, new HashSet<>(), getBuildMode()); + StarTreeField sf = new StarTreeField("sf", dims, metrics, c); + return sf; + } + protected StarTreeField getStarTreeFieldWithDateDimension() { List intervals = new ArrayList<>(); intervals.add(new DateTimeUnitAdapter(Rounding.DateTimeUnit.MINUTES_OF_HOUR));