Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.x] [Star tree] Star tree merge changes #15380

Merged
merged 1 commit into from
Aug 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@
import org.apache.lucene.index.SortedNumericDocValues;
import org.apache.lucene.index.SortedSetDocValues;
import org.opensearch.common.annotation.ExperimentalApi;
import org.opensearch.index.mapper.CompositeMappedFieldType;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

/**
Expand Down Expand Up @@ -74,15 +74,13 @@ public void close() throws IOException {
}

@Override
public List<String> getCompositeIndexFields() {
public List<CompositeIndexFieldInfo> getCompositeIndexFields() {
// todo : read from file formats and get the field names.
throw new UnsupportedOperationException();

return new ArrayList<>();
}

@Override
public CompositeIndexValues getCompositeIndexValues(String field, CompositeMappedFieldType.CompositeFieldType fieldType)
throws IOException {
public CompositeIndexValues getCompositeIndexValues(CompositeIndexFieldInfo compositeIndexFieldInfo) throws IOException {
// TODO : read compositeIndexValues [starTreeValues] from star tree files
throw new UnsupportedOperationException();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,29 @@

package org.opensearch.index.codec.composite;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.codecs.DocValuesConsumer;
import org.apache.lucene.codecs.DocValuesProducer;
import org.apache.lucene.index.DocValues;
import org.apache.lucene.index.DocValuesType;
import org.apache.lucene.index.EmptyDocValuesProducer;
import org.apache.lucene.index.FieldInfo;
import org.apache.lucene.index.MergeState;
import org.apache.lucene.index.SegmentWriteState;
import org.apache.lucene.index.SortedNumericDocValues;
import org.opensearch.common.annotation.ExperimentalApi;
import org.opensearch.index.codec.composite.datacube.startree.StarTreeValues;
import org.opensearch.index.compositeindex.datacube.startree.StarTreeField;
import org.opensearch.index.compositeindex.datacube.startree.builder.StarTreesBuilder;
import org.opensearch.index.mapper.CompositeMappedFieldType;
import org.opensearch.index.mapper.MapperService;
import org.opensearch.index.mapper.StarTreeMapper;

import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
Expand All @@ -40,8 +49,10 @@
AtomicReference<MergeState> mergeState = new AtomicReference<>();
private final Set<CompositeMappedFieldType> compositeMappedFieldTypes;
private final Set<String> compositeFieldSet;
private final Set<String> segmentFieldSet;

private final Map<String, DocValuesProducer> fieldProducerMap = new HashMap<>();
private static final Logger logger = LogManager.getLogger(Composite99DocValuesWriter.class);

public Composite99DocValuesWriter(DocValuesConsumer delegate, SegmentWriteState segmentWriteState, MapperService mapperService) {

Expand All @@ -50,6 +61,12 @@
this.mapperService = mapperService;
this.compositeMappedFieldTypes = mapperService.getCompositeFieldTypes();
compositeFieldSet = new HashSet<>();
segmentFieldSet = new HashSet<>();
for (FieldInfo fi : segmentWriteState.fieldInfos) {
if (DocValuesType.SORTED_NUMERIC.equals(fi.getDocValuesType())) {
segmentFieldSet.add(fi.name);
}
}
for (CompositeMappedFieldType type : compositeMappedFieldTypes) {
compositeFieldSet.addAll(type.fields());
}
Expand Down Expand Up @@ -95,23 +112,91 @@
fieldProducerMap.put(field.name, valuesProducer);
compositeFieldSet.remove(field.name);
}
segmentFieldSet.remove(field.name);
if (segmentFieldSet.isEmpty()) {
Set<String> compositeFieldSetCopy = new HashSet<>(compositeFieldSet);
for (String compositeField : compositeFieldSetCopy) {
fieldProducerMap.put(compositeField, new EmptyDocValuesProducer() {
@Override
public SortedNumericDocValues getSortedNumeric(FieldInfo field) {
return DocValues.emptySortedNumeric();
}
});
compositeFieldSet.remove(compositeField);
}
}
// we have all the required fields to build composite fields
if (compositeFieldSet.isEmpty()) {
for (CompositeMappedFieldType mappedType : compositeMappedFieldTypes) {
if (mappedType instanceof StarTreeMapper.StarTreeFieldType) {
try (StarTreesBuilder starTreesBuilder = new StarTreesBuilder(fieldProducerMap, state, mapperService)) {
starTreesBuilder.build();
if (mappedType.getCompositeIndexType().equals(CompositeMappedFieldType.CompositeFieldType.STAR_TREE)) {
try (StarTreesBuilder starTreesBuilder = new StarTreesBuilder(state, mapperService)) {
starTreesBuilder.build(fieldProducerMap);
}
}
}
}

}

@Override
public void merge(MergeState mergeState) throws IOException {
this.mergeState.compareAndSet(null, mergeState);
super.merge(mergeState);
// TODO : handle merge star tree
// mergeStarTreeFields(mergeState);
mergeCompositeFields(mergeState);
}

/**
* Merges composite fields from multiple segments
* @param mergeState merge state
*/
private void mergeCompositeFields(MergeState mergeState) throws IOException {
mergeStarTreeFields(mergeState);
}

/**
* Merges star tree data fields from multiple segments
* @param mergeState merge state
*/
private void mergeStarTreeFields(MergeState mergeState) throws IOException {
Map<String, List<StarTreeValues>> starTreeSubsPerField = new HashMap<>();
StarTreeField starTreeField = null;
for (int i = 0; i < mergeState.docValuesProducers.length; i++) {
CompositeIndexReader reader = null;
if (mergeState.docValuesProducers[i] == null) {
continue;
}
if (mergeState.docValuesProducers[i] instanceof CompositeIndexReader) {
reader = (CompositeIndexReader) mergeState.docValuesProducers[i];
} else {
continue;
}

List<CompositeIndexFieldInfo> compositeFieldInfo = reader.getCompositeIndexFields();
for (CompositeIndexFieldInfo fieldInfo : compositeFieldInfo) {
if (fieldInfo.getType().equals(CompositeMappedFieldType.CompositeFieldType.STAR_TREE)) {
CompositeIndexValues compositeIndexValues = reader.getCompositeIndexValues(fieldInfo);

Check warning on line 177 in server/src/main/java/org/opensearch/index/codec/composite/Composite99DocValuesWriter.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/codec/composite/Composite99DocValuesWriter.java#L177

Added line #L177 was not covered by tests
if (compositeIndexValues instanceof StarTreeValues) {
StarTreeValues starTreeValues = (StarTreeValues) compositeIndexValues;
List<StarTreeValues> fieldsList = starTreeSubsPerField.getOrDefault(fieldInfo.getField(), Collections.emptyList());

Check warning on line 180 in server/src/main/java/org/opensearch/index/codec/composite/Composite99DocValuesWriter.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/codec/composite/Composite99DocValuesWriter.java#L179-L180

Added lines #L179 - L180 were not covered by tests
if (starTreeField == null) {
starTreeField = starTreeValues.getStarTreeField();

Check warning on line 182 in server/src/main/java/org/opensearch/index/codec/composite/Composite99DocValuesWriter.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/codec/composite/Composite99DocValuesWriter.java#L182

Added line #L182 was not covered by tests
}
// assert star tree configuration is same across segments
else {
if (starTreeField.equals(starTreeValues.getStarTreeField()) == false) {
throw new IllegalArgumentException(

Check warning on line 187 in server/src/main/java/org/opensearch/index/codec/composite/Composite99DocValuesWriter.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/codec/composite/Composite99DocValuesWriter.java#L187

Added line #L187 was not covered by tests
"star tree field configuration must match the configuration of the field being merged"
);
}
}
fieldsList.add(starTreeValues);
starTreeSubsPerField.put(fieldInfo.getField(), fieldsList);

Check warning on line 193 in server/src/main/java/org/opensearch/index/codec/composite/Composite99DocValuesWriter.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/codec/composite/Composite99DocValuesWriter.java#L192-L193

Added lines #L192 - L193 were not covered by tests
}
}
}

Check warning on line 196 in server/src/main/java/org/opensearch/index/codec/composite/Composite99DocValuesWriter.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/codec/composite/Composite99DocValuesWriter.java#L196

Added line #L196 was not covered by tests
}
try (StarTreesBuilder starTreesBuilder = new StarTreesBuilder(state, mapperService)) {
starTreesBuilder.buildDuringMerge(starTreeSubsPerField);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@

/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.index.codec.composite;

import org.opensearch.common.annotation.ExperimentalApi;
import org.opensearch.index.mapper.CompositeMappedFieldType;

/**
* Field info details of composite index fields
*
* @opensearch.experimental
*/
@ExperimentalApi
public class CompositeIndexFieldInfo {
private final String field;
private final CompositeMappedFieldType.CompositeFieldType type;

public CompositeIndexFieldInfo(String field, CompositeMappedFieldType.CompositeFieldType type) {
this.field = field;
this.type = type;
}

Check warning on line 28 in server/src/main/java/org/opensearch/index/codec/composite/CompositeIndexFieldInfo.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/codec/composite/CompositeIndexFieldInfo.java#L25-L28

Added lines #L25 - L28 were not covered by tests

public String getField() {
return field;

Check warning on line 31 in server/src/main/java/org/opensearch/index/codec/composite/CompositeIndexFieldInfo.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/codec/composite/CompositeIndexFieldInfo.java#L31

Added line #L31 was not covered by tests
}

public CompositeMappedFieldType.CompositeFieldType getType() {
return type;

Check warning on line 35 in server/src/main/java/org/opensearch/index/codec/composite/CompositeIndexFieldInfo.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/codec/composite/CompositeIndexFieldInfo.java#L35

Added line #L35 was not covered by tests
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
package org.opensearch.index.codec.composite;

import org.opensearch.common.annotation.ExperimentalApi;
import org.opensearch.index.mapper.CompositeMappedFieldType;

import java.io.IOException;
import java.util.List;
Expand All @@ -25,10 +24,10 @@ public interface CompositeIndexReader {
* Get list of composite index fields from the segment
*
*/
List<String> getCompositeIndexFields();
List<CompositeIndexFieldInfo> getCompositeIndexFields();

/**
* Get composite index values based on the field name and the field type
*/
CompositeIndexValues getCompositeIndexValues(String field, CompositeMappedFieldType.CompositeFieldType fieldType) throws IOException;
CompositeIndexValues getCompositeIndexValues(CompositeIndexFieldInfo fieldInfo) throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,13 @@

package org.opensearch.index.codec.composite.datacube.startree;

import org.apache.lucene.search.DocIdSetIterator;
import org.opensearch.common.annotation.ExperimentalApi;
import org.opensearch.index.codec.composite.CompositeIndexValues;
import org.opensearch.index.compositeindex.datacube.startree.StarTreeField;
import org.opensearch.index.compositeindex.datacube.startree.node.StarTreeNode;

import java.util.List;
import java.util.Map;

/**
* Concrete class that holds the star tree associated values from the segment
Expand All @@ -20,16 +23,48 @@
*/
@ExperimentalApi
public class StarTreeValues implements CompositeIndexValues {
private final List<String> dimensionsOrder;
private final StarTreeField starTreeField;
private final StarTreeNode root;
private final Map<String, DocIdSetIterator> dimensionDocValuesIteratorMap;
private final Map<String, DocIdSetIterator> metricDocValuesIteratorMap;
private final Map<String, String> attributes;

// TODO : come up with full set of vales such as dimensions and metrics doc values + star tree
public StarTreeValues(List<String> dimensionsOrder) {
super();
this.dimensionsOrder = List.copyOf(dimensionsOrder);
public StarTreeValues(
StarTreeField starTreeField,
StarTreeNode root,
Map<String, DocIdSetIterator> dimensionDocValuesIteratorMap,
Map<String, DocIdSetIterator> metricDocValuesIteratorMap,
Map<String, String> attributes
) {
this.starTreeField = starTreeField;
this.root = root;
this.dimensionDocValuesIteratorMap = dimensionDocValuesIteratorMap;
this.metricDocValuesIteratorMap = metricDocValuesIteratorMap;
this.attributes = attributes;
}

@Override
public CompositeIndexValues getValues() {
return this;
}

public StarTreeField getStarTreeField() {
return starTreeField;
}

public StarTreeNode getRoot() {
return root;

Check warning on line 56 in server/src/main/java/org/opensearch/index/codec/composite/datacube/startree/StarTreeValues.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/codec/composite/datacube/startree/StarTreeValues.java#L56

Added line #L56 was not covered by tests
}

public Map<String, DocIdSetIterator> getDimensionDocValuesIteratorMap() {
return dimensionDocValuesIteratorMap;
}

public Map<String, DocIdSetIterator> getMetricDocValuesIteratorMap() {
return metricDocValuesIteratorMap;
}

public Map<String, String> getAttributes() {
return attributes;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@
public class CountValueAggregator implements ValueAggregator<Long> {
public static final StarTreeNumericType VALUE_AGGREGATOR_TYPE = StarTreeNumericType.LONG;
public static final long DEFAULT_INITIAL_VALUE = 1L;
private StarTreeNumericType starTreeNumericType;

public CountValueAggregator(StarTreeNumericType starTreeNumericType) {
this.starTreeNumericType = starTreeNumericType;
}

@Override
public MetricStat getAggregationType() {
Expand All @@ -30,12 +35,12 @@ public StarTreeNumericType getAggregatedValueType() {
}

@Override
public Long getInitialAggregatedValueForSegmentDocValue(Long segmentDocValue, StarTreeNumericType starTreeNumericType) {
public Long getInitialAggregatedValueForSegmentDocValue(Long segmentDocValue) {
return DEFAULT_INITIAL_VALUE;
}

@Override
public Long mergeAggregatedValueAndSegmentValue(Long value, Long segmentDocValue, StarTreeNumericType starTreeNumericType) {
public Long mergeAggregatedValueAndSegmentValue(Long value, Long segmentDocValue) {
return value + 1;
}

Expand All @@ -60,7 +65,7 @@ public Long toLongValue(Long value) {
}

@Override
public Long toStarTreeNumericTypeValue(Long value, StarTreeNumericType type) {
public Long toStarTreeNumericTypeValue(Long value) {
return value;
}
}
Loading
Loading