Skip to content

Commit

Permalink
adding tests and fixes
Browse files Browse the repository at this point in the history
Signed-off-by: Bharathwaj G <bharath78910@gmail.com>
  • Loading branch information
bharath-techie committed Oct 12, 2024
1 parent a48de57 commit 9238f17
Show file tree
Hide file tree
Showing 9 changed files with 859 additions and 142 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ public Composite912DocValuesReader(DocValuesProducer producer, SegmentReadState
dimensionEntry.getKey()
);
fields.add(dimName);
dimensionFieldTypeMap.put(dimName, dimensionEntry.getValue());
}
// adding metric fields
for (Metric metric : starTreeMetadata.getMetrics()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
Expand Down Expand Up @@ -322,48 +323,64 @@ private SequentialDocValuesIterator getSequentialDocValuesIterator(
}

/**
* Returns the ordinal map based on given star-tree values across different segments
* Returns the ordinal map per field based on given star-tree values across different segments
*/
protected OrdinalMap getOrdinalMap(List<StarTreeValues> starTreeValuesSubs, MergeState mergeState) throws IOException {
protected Map<String, OrdinalMap> getOrdinalMaps(List<StarTreeValues> starTreeValuesSubs, MergeState mergeState) throws IOException {
long curr = System.currentTimeMillis();
List<SortedSetStarTreeValuesIterator> toMerge = new ArrayList<>();
Map<String, List<SortedSetStarTreeValuesIterator>> dimensionToIterators = new HashMap<>();

// Group iterators by dimension
for (StarTreeValues starTree : starTreeValuesSubs) {
for (String dimName : starTree.getStarTreeField().getDimensionNames()) {
if (starTree.getDimensionValuesIterator(dimName) instanceof SortedSetStarTreeValuesIterator) {
toMerge.add((SortedSetStarTreeValuesIterator) starTree.getDimensionValuesIterator(dimName));
dimensionToIterators.computeIfAbsent(dimName, k -> new ArrayList<>())
.add((SortedSetStarTreeValuesIterator) starTree.getDimensionValuesIterator(dimName));
}
}
}
if (toMerge.isEmpty()) return null;
// step 1: iterate through each sub and mark terms still in use
TermsEnum[] liveTerms = new TermsEnum[toMerge.size()];
long[] weights = new long[liveTerms.length];
for (int sub = 0; sub < liveTerms.length; sub++) {
SortedSetStarTreeValuesIterator dv = toMerge.get(sub);
Bits liveDocs = mergeState.liveDocs[sub];
if (liveDocs == null) {
liveTerms[sub] = dv.termsEnum();
weights[sub] = dv.getValueCount();
} else {
LongBitSet bitset = new LongBitSet(dv.getValueCount());
int docID;
while ((docID = dv.nextEntry()) != NO_MORE_ENTRIES) {
if (liveDocs.get(docID)) {
for (int i = 0; i < dv.docValueCount(); i++) {
bitset.set(dv.nextOrd());

if (dimensionToIterators.isEmpty()) return Collections.emptyMap();

Map<String, OrdinalMap> dimensionToOrdinalMap = new HashMap<>();

for (Map.Entry<String, List<SortedSetStarTreeValuesIterator>> entry : dimensionToIterators.entrySet()) {
String dimName = entry.getKey();
List<SortedSetStarTreeValuesIterator> iterators = entry.getValue();

// step 1: iterate through each sub and mark terms still in use
TermsEnum[] liveTerms = new TermsEnum[iterators.size()];
long[] weights = new long[liveTerms.length];

for (int sub = 0; sub < liveTerms.length; sub++) {
SortedSetStarTreeValuesIterator dv = iterators.get(sub);
Bits liveDocs = mergeState.liveDocs[sub];
if (liveDocs == null) {
liveTerms[sub] = dv.termsEnum();
weights[sub] = dv.getValueCount();
} else {
LongBitSet bitset = new LongBitSet(dv.getValueCount());

Check warning on line 361 in server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/builder/BaseStarTreeBuilder.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/builder/BaseStarTreeBuilder.java#L361

Added line #L361 was not covered by tests
int docID;
while ((docID = dv.nextEntry()) != NO_MORE_ENTRIES) {
if (liveDocs.get(docID)) {
for (int i = 0; i < dv.docValueCount(); i++) {
bitset.set(dv.nextOrd());

Check warning on line 366 in server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/builder/BaseStarTreeBuilder.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/builder/BaseStarTreeBuilder.java#L366

Added line #L366 was not covered by tests
}
}
}
liveTerms[sub] = new BitsFilteredTermsEnum(dv.termsEnum(), bitset);
weights[sub] = bitset.cardinality();

Check warning on line 371 in server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/builder/BaseStarTreeBuilder.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/builder/BaseStarTreeBuilder.java#L370-L371

Added lines #L370 - L371 were not covered by tests
}
liveTerms[sub] = new BitsFilteredTermsEnum(dv.termsEnum(), bitset);
weights[sub] = bitset.cardinality();
}

// step 2: create ordinal map for this dimension
OrdinalMap map = OrdinalMap.build(null, liveTerms, weights, PackedInts.COMPACT);
dimensionToOrdinalMap.put(dimName, map);

logger.debug("Ordinal map for dimension {} - Size in bytes: {}", dimName, map.ramBytesUsed());
}
// step 2: create ordinal map (this conceptually does the "merging")
final OrdinalMap map = OrdinalMap.build(null, liveTerms, weights, PackedInts.COMPACT);
logger.debug("Map size in bytes : {}", map.ramBytesUsed());
logger.debug("Ordinal map takes : {} ", System.currentTimeMillis() - curr);
return map;

logger.debug("Total time to build ordinal maps: {} ms", System.currentTimeMillis() - curr);
return dimensionToOrdinalMap;
}

/**
Expand Down Expand Up @@ -579,13 +596,13 @@ protected StarTreeDocument getStarTreeDocument(
int currentDocId,
SequentialDocValuesIterator[] dimensionReaders,
List<SequentialDocValuesIterator> metricReaders,
LongValues longValues
Map<String, LongValues> longValues
) throws IOException {
Long[] dims = new Long[numDimensions];
int i = 0;
for (SequentialDocValuesIterator dimensionValueIterator : dimensionReaders) {
dimensionValueIterator.nextEntry(currentDocId);
Long val = dimensionValueIterator.value(currentDocId, longValues);
Long val = dimensionValueIterator.value(currentDocId, longValues.get(starTreeField.getDimensionNames().get(i)));
dims[i] = val;
i++;
}
Expand Down Expand Up @@ -838,7 +855,7 @@ private static Long getLong(Object metric) {
* Sets the sortedSetDocValuesMap.
* This is needed as we need to write the ordinals and also the bytesRef associated with it
*/
protected void setSortedSetDocValuesMap(Map<String, SortedSetDocValues> sortedSetDocValuesMap) {
void setSortedSetDocValuesMap(Map<String, SortedSetDocValues> sortedSetDocValuesMap) {
this.sortedSetDocValuesMap = sortedSetDocValuesMap;
}

Expand Down Expand Up @@ -908,7 +925,6 @@ private SequentialDocValuesIterator getIteratorForNumericField(
* @throws IOException throws an exception if we are unable to add the doc
*/
private void appendToStarTree(StarTreeDocument starTreeDocument) throws IOException {
// System.out.println(starTreeDocument);
appendStarTreeDocument(starTreeDocument);
numStarTreeDocs++;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand Down Expand Up @@ -146,7 +147,7 @@ public Iterator<StarTreeDocument> sortAndAggregateSegmentDocuments(
Iterator<StarTreeDocument> mergeStarTrees(List<StarTreeValues> starTreeValuesSubs, MergeState mergeState) throws IOException {
int numDocs = 0;
int[] docIds;
OrdinalMap ordinalMap = getOrdinalMap(starTreeValuesSubs, mergeState);
Map<String, OrdinalMap> ordinalMaps = getOrdinalMaps(starTreeValuesSubs, mergeState);
try {
int seg = 0;
for (StarTreeValues starTreeValues : starTreeValuesSubs) {
Expand All @@ -155,12 +156,12 @@ Iterator<StarTreeDocument> mergeStarTrees(List<StarTreeValues> starTreeValuesSub
AtomicInteger numSegmentDocs = new AtomicInteger();
setReadersAndNumSegmentDocs(dimensionReaders, metricReaders, numSegmentDocs, starTreeValues);
int currentDocId = 0;
LongValues longValues = null;
if (ordinalMap != null) {
longValues = ordinalMap.getGlobalOrds(seg);
Map<String, LongValues> longValuesMap = new LinkedHashMap<>();
for (Map.Entry<String, OrdinalMap> entry : ordinalMaps.entrySet()) {
longValuesMap.put(entry.getKey(), entry.getValue().getGlobalOrds(seg));
}
while (currentDocId < numSegmentDocs.get()) {
StarTreeDocument starTreeDocument = getStarTreeDocument(currentDocId, dimensionReaders, metricReaders, longValues);
StarTreeDocument starTreeDocument = getStarTreeDocument(currentDocId, dimensionReaders, metricReaders, longValuesMap);
segmentDocumentFileManager.writeStarTreeDocument(starTreeDocument, true);
numDocs++;
currentDocId++;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand Down Expand Up @@ -134,21 +134,20 @@ Iterator<StarTreeDocument> mergeStarTrees(List<StarTreeValues> starTreeValuesSub
*/
StarTreeDocument[] getSegmentsStarTreeDocuments(List<StarTreeValues> starTreeValuesSubs, MergeState mergeState) throws IOException {
List<StarTreeDocument> starTreeDocuments = new ArrayList<>();
OrdinalMap ordinalMap = getOrdinalMap(starTreeValuesSubs, mergeState);
Map<String, OrdinalMap> ordinalMaps = getOrdinalMaps(starTreeValuesSubs, mergeState);
int seg = 0;
for (StarTreeValues starTreeValues : starTreeValuesSubs) {
Map<Long, Long> segToGlobalOrdMap = new HashMap<>();
SequentialDocValuesIterator[] dimensionReaders = new SequentialDocValuesIterator[numDimensions];
List<SequentialDocValuesIterator> metricReaders = new ArrayList<>();
AtomicInteger numSegmentDocs = new AtomicInteger();
setReadersAndNumSegmentDocs(dimensionReaders, metricReaders, numSegmentDocs, starTreeValues);
int currentDocId = 0;
LongValues longValues = null;
if (ordinalMap != null) {
longValues = ordinalMap.getGlobalOrds(seg);
Map<String, LongValues> longValuesMap = new LinkedHashMap<>();
for (Map.Entry<String, OrdinalMap> entry : ordinalMaps.entrySet()) {
longValuesMap.put(entry.getKey(), entry.getValue().getGlobalOrds(seg));
}

Check warning on line 148 in server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/builder/OnHeapStarTreeBuilder.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/builder/OnHeapStarTreeBuilder.java#L147-L148

Added lines #L147 - L148 were not covered by tests
while (currentDocId < numSegmentDocs.get()) {
starTreeDocuments.add(getStarTreeDocument(currentDocId, dimensionReaders, metricReaders, longValues));
starTreeDocuments.add(getStarTreeDocument(currentDocId, dimensionReaders, metricReaders, longValuesMap));
currentDocId++;
}
seg++;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.index.codec.composite912.datacube.startree;

import com.carrotsearch.randomizedtesting.annotations.ParametersFactory;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.codecs.Codec;
import org.apache.lucene.codecs.lucene912.Lucene912Codec;
import org.apache.lucene.tests.index.BaseDocValuesFormatTestCase;
import org.apache.lucene.tests.util.LuceneTestCase;
import org.opensearch.Version;
import org.opensearch.cluster.ClusterModule;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.CheckedConsumer;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.common.xcontent.XContentFactory;
import org.opensearch.core.common.unit.ByteSizeUnit;
import org.opensearch.core.common.unit.ByteSizeValue;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.MapperTestUtils;
import org.opensearch.index.codec.composite.composite912.Composite912Codec;
import org.opensearch.index.compositeindex.datacube.startree.StarTreeFieldConfiguration;
import org.opensearch.index.compositeindex.datacube.startree.StarTreeIndexSettings;
import org.opensearch.index.mapper.MapperService;
import org.opensearch.indices.IndicesModule;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.BeforeClass;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;

import static org.opensearch.common.util.FeatureFlags.STAR_TREE_INDEX;

/**
* Abstract star tree doc values Lucene tests
*/
@LuceneTestCase.SuppressSysoutChecks(bugUrl = "we log a lot on purpose")
public abstract class AbstractStarTreeDVFormatTests extends BaseDocValuesFormatTestCase {
MapperService mapperService = null;
StarTreeFieldConfiguration.StarTreeBuildMode buildMode;

public AbstractStarTreeDVFormatTests(StarTreeFieldConfiguration.StarTreeBuildMode buildMode) {
this.buildMode = buildMode;
}

@ParametersFactory
public static Collection<Object[]> parameters() {
List<Object[]> parameters = new ArrayList<>();
parameters.add(new Object[] { StarTreeFieldConfiguration.StarTreeBuildMode.ON_HEAP });
parameters.add(new Object[] { StarTreeFieldConfiguration.StarTreeBuildMode.OFF_HEAP });
return parameters;
}

@BeforeClass
public static void createMapper() throws Exception {
FeatureFlags.initializeFeatureFlags(Settings.builder().put(STAR_TREE_INDEX, "true").build());
}

@AfterClass
public static void clearMapper() {
FeatureFlags.initializeFeatureFlags(Settings.EMPTY);
}

@After
public void teardown() throws IOException {
mapperService.close();
}

@Override
protected Codec getCodec() {
final Logger testLogger = LogManager.getLogger(StarTreeDocValuesFormatTests.class);

try {
createMapperService(getExpandedMapping());
} catch (IOException e) {
throw new RuntimeException(e);
}
Codec codec = new Composite912Codec(Lucene912Codec.Mode.BEST_SPEED, mapperService, testLogger);
return codec;
}

private void createMapperService(XContentBuilder builder) throws IOException {
Settings settings = Settings.builder()
.put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT)
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)
.put(StarTreeIndexSettings.IS_COMPOSITE_INDEX_SETTING.getKey(), true)
.put(IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING.getKey(), new ByteSizeValue(512, ByteSizeUnit.MB))
.build();
IndexMetadata indexMetadata = IndexMetadata.builder("test").settings(settings).putMapping(builder.toString()).build();
IndicesModule indicesModule = new IndicesModule(Collections.emptyList());
mapperService = MapperTestUtils.newMapperServiceWithHelperAnalyzer(
new NamedXContentRegistry(ClusterModule.getNamedXWriteables()),
createTempDir(),
settings,
indicesModule,
"test"
);
mapperService.merge(indexMetadata, MapperService.MergeReason.INDEX_TEMPLATE);
}

abstract XContentBuilder getExpandedMapping() throws IOException;

XContentBuilder topMapping(CheckedConsumer<XContentBuilder, IOException> buildFields) throws IOException {
XContentBuilder builder = XContentFactory.jsonBuilder().startObject().startObject("_doc");
buildFields.accept(builder);
return builder.endObject().endObject();
}

}
Loading

0 comments on commit 9238f17

Please sign in to comment.