Skip to content

Commit

Permalink
some refactors
Browse files Browse the repository at this point in the history
Signed-off-by: Sarthak Aggarwal <sarthagg@amazon.com>
  • Loading branch information
sarthakaggarwal97 committed Jul 18, 2024
1 parent faf2a1f commit 1b5009e
Show file tree
Hide file tree
Showing 17 changed files with 197 additions and 209 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public class Composite99DocValuesFormat extends DocValuesFormat {
public static final String DATA_DOC_VALUES_CODEC = "Composite99DocValuesData";

/** Meta doc values codec name for Composite Doc Values Format */
static final String META_DOC_VALUES_CODEC = "Composite99DocValuesMetadata";
public static final String META_DOC_VALUES_CODEC = "Composite99DocValuesMetadata";

/** Filename extension for the composite index data doc values */
public static final String DATA_DOC_VALUES_EXTENSION = "cidvd";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,14 @@
import org.opensearch.index.compositeindex.datacube.startree.StarTreeFieldConfiguration;
import org.opensearch.index.compositeindex.datacube.startree.aggregators.MetricEntry;
import org.opensearch.index.compositeindex.datacube.startree.meta.StarTreeMetadata;
import org.opensearch.index.compositeindex.datacube.startree.node.OffHeapStarTree;
import org.opensearch.index.compositeindex.datacube.startree.node.StarTree;
import org.opensearch.index.compositeindex.datacube.startree.node.StarTreeNode;
import org.opensearch.index.compositeindex.datacube.startree.node.Tree;
import org.opensearch.index.compositeindex.datacube.startree.utils.StarTreeHelper;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
Expand All @@ -64,7 +65,7 @@ public class Composite99DocValuesReader extends DocValuesProducer implements Com
private final DocValuesProducer delegate;
private IndexInput dataIn;
private ChecksumIndexInput metaIn;
private final Map<String, StarTree> starTreeMap = new LinkedHashMap<>();
private final Map<String, Tree> starTreeMap = new LinkedHashMap<>();
private final Map<String, CompositeIndexMetadata> compositeIndexMetadataMap = new LinkedHashMap<>();
private final Map<String, DocValuesProducer> compositeDocValuesProducerMap = new LinkedHashMap<>();
private final List<CompositeIndexFieldInfo> compositeFieldInfos = new ArrayList<>();
Expand Down Expand Up @@ -137,7 +138,7 @@ public Composite99DocValuesReader(DocValuesProducer producer, SegmentReadState r
switch (compositeIndexMetadata.getCompositeFieldType()) {
case STAR_TREE:
StarTreeMetadata starTreeMetadata = compositeIndexMetadata.getStarTreeMetadata();
StarTree starTree = new OffHeapStarTree(dataIn, starTreeMetadata);
Tree starTree = new StarTree(dataIn, starTreeMetadata);
starTreeMap.put(compositeFieldName, starTree);
compositeIndexMetadataMap.put(compositeFieldName, compositeIndexMetadata);

Expand All @@ -147,6 +148,7 @@ public Composite99DocValuesReader(DocValuesProducer producer, SegmentReadState r
dimensions.add(readState.fieldInfos.fieldInfo(fieldNumber));
}

// initialize star-tree doc values producer
StarTree99DocValuesProducer starTreeDocValuesProducer = new StarTree99DocValuesProducer(
readState,
Composite99DocValuesFormat.DATA_DOC_VALUES_CODEC,
Expand Down Expand Up @@ -240,14 +242,18 @@ public CompositeIndexValues getCompositeIndexValues(CompositeIndexFieldInfo comp

switch (compositeIndexFieldInfo.getType()) {
case STAR_TREE:
// building star tree values
CompositeIndexMetadata compositeIndexMetadata = compositeIndexMetadataMap.get(compositeIndexFieldInfo.getField());
StarTreeMetadata starTreeMetadata = compositeIndexMetadata.getStarTreeMetadata();

// build skip star node dimensions
Set<Integer> skipStarNodeCreationInDimsFieldNumbers = starTreeMetadata.getSkipStarNodeCreationInDims();
Set<String> skipStarNodeCreationInDims = new HashSet<>();
for (Integer fieldNumber : skipStarNodeCreationInDimsFieldNumbers) {
skipStarNodeCreationInDims.add(readState.fieldInfos.fieldInfo(fieldNumber).getName());
}

// build dimensions
List<Integer> dimensionFieldNumbers = starTreeMetadata.getDimensionFieldNumbers();
List<String> dimensions = new ArrayList<>();
List<Dimension> mergeDimensions = new ArrayList<>();
Expand All @@ -256,6 +262,7 @@ public CompositeIndexValues getCompositeIndexValues(CompositeIndexFieldInfo comp
mergeDimensions.add(new MergeDimension(readState.fieldInfos.fieldInfo(fieldNumber).name));
}

// build metrics
Map<String, Metric> starTreeMetricMap = new ConcurrentHashMap<>();
for (MetricEntry metricEntry : starTreeMetadata.getMetricEntries()) {
String metricName = metricEntry.getMetricName();
Expand All @@ -265,6 +272,7 @@ public CompositeIndexValues getCompositeIndexValues(CompositeIndexFieldInfo comp
}
List<Metric> starTreeMetrics = new ArrayList<>(starTreeMetricMap.values());

// star-tree field
StarTreeField starTreeField = new StarTreeField(
compositeIndexMetadata.getCompositeFieldName(),
mergeDimensions,
Expand All @@ -275,13 +283,18 @@ public CompositeIndexValues getCompositeIndexValues(CompositeIndexFieldInfo comp
starTreeMetadata.getStarTreeBuildMode()
)
);

// star-tree root node
StarTreeNode rootNode = starTreeMap.get(compositeIndexFieldInfo.getField()).getRoot();

// get doc id set iterators for metrics and dimensions
StarTree99DocValuesProducer starTree99DocValuesProducer = (StarTree99DocValuesProducer) compositeDocValuesProducerMap.get(
compositeIndexMetadata.getCompositeFieldName()
);
Map<String, DocIdSetIterator> dimensionsDocIdSetIteratorMap = new LinkedHashMap<>();
Map<String, DocIdSetIterator> metricsDocIdSetIteratorMap = new LinkedHashMap<>();

// get doc id set iterators for dimensions
for (String dimension : dimensions) {
dimensionsDocIdSetIteratorMap.put(
dimension,
Expand All @@ -291,6 +304,7 @@ public CompositeIndexValues getCompositeIndexValues(CompositeIndexFieldInfo comp
);
}

// get doc id set iterators for metrics
for (MetricEntry metricEntry : starTreeMetadata.getMetricEntries()) {
String metricFullName = StarTreeHelper.fullFieldNameForStarTreeMetricsDocValues(
starTreeField.getName(),
Expand All @@ -300,7 +314,18 @@ public CompositeIndexValues getCompositeIndexValues(CompositeIndexFieldInfo comp
metricsDocIdSetIteratorMap.put(metricFullName, starTree99DocValuesProducer.getSortedNumeric(metricFullName));
}

return new StarTreeValues(starTreeField, rootNode, dimensionsDocIdSetIteratorMap, metricsDocIdSetIteratorMap);
// create star-tree attributes map
Map<String, String> starTreeAttributes = new HashMap<>();
starTreeAttributes.put("segment_docs_count", String.valueOf(starTreeMetadata.getSegmentAggregatedDocCount()));

// return star-tree values
return new StarTreeValues(
starTreeField,
rootNode,
dimensionsDocIdSetIteratorMap,
metricsDocIdSetIteratorMap,
starTreeAttributes
);

default:
throw new CorruptIndexException("Unsupported composite index field type: ", compositeIndexFieldInfo.getType().getName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,20 @@ public class StarTreeValues implements CompositeIndexValues {
private final StarTreeNode root;
private final Map<String, DocIdSetIterator> dimensionDocValuesIteratorMap;
private final Map<String, DocIdSetIterator> metricDocValuesIteratorMap;
private final Map<String, String> attributes;

public StarTreeValues(
StarTreeField starTreeField,
StarTreeNode root,
Map<String, DocIdSetIterator> dimensionDocValuesIteratorMap,
Map<String, DocIdSetIterator> metricDocValuesIteratorMap
Map<String, DocIdSetIterator> metricDocValuesIteratorMap,
Map<String, String> attributes
) {
this.starTreeField = starTreeField;
this.root = root;
this.dimensionDocValuesIteratorMap = dimensionDocValuesIteratorMap;
this.metricDocValuesIteratorMap = metricDocValuesIteratorMap;
this.attributes = attributes;
}

@Override
Expand All @@ -60,4 +63,8 @@ public Map<String, DocIdSetIterator> getDimensionDocValuesIteratorMap() {
public Map<String, DocIdSetIterator> getMetricDocValuesIteratorMap() {
return metricDocValuesIteratorMap;
}

public Map<String, String> getAttributes() {
return attributes;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
/**
* This class represents the metadata of a Composite Index, which includes information about
* the composite field name, type, and the specific metadata for the type of composite field
* (e.g., StarTree metadata).
* (e.g., Tree metadata).
*
* @opensearch.experimental
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public interface ValueAggregator<A> {
A toStarTreeNumericTypeValue(Long rawValue);

/**
* Fetches an value that does not alter the result of aggregations
* Fetches a value that does not alter the result of aggregations
*/
long getIdempotentMetricValue();
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,9 @@
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;

import static org.opensearch.index.compositeindex.datacube.startree.utils.StarTreeBuilderUtils.ALL;
import static org.opensearch.index.compositeindex.datacube.startree.utils.StarTreeHelper.fullFieldNameForStarTreeDimensionsDocValues;
import static org.opensearch.index.compositeindex.datacube.startree.utils.StarTreeHelper.fullFieldNameForStarTreeMetricsDocValues;
import static org.opensearch.index.compositeindex.datacube.startree.utils.TreeNode.ALL;

/**
* Builder for star tree. Defines the algorithm to construct star-tree
Expand Down Expand Up @@ -195,8 +195,8 @@ public List<SequentialDocValuesIterator> getMetricReaders(SegmentWriteState stat
* Builds the star tree from the original segment documents
*
* @param fieldProducerMap contain s the docValues producer to get docValues associated with each field
* @param fieldNumberAcrossStarTrees
* @param starTreeDocValuesConsumer
* @param fieldNumberAcrossStarTrees maintains a counter for the number of star-tree fields
* @param starTreeDocValuesConsumer consumes the generated star-tree docValues
* @throws IOException when we are unable to build star-tree
*/
public void build(
Expand Down Expand Up @@ -227,8 +227,8 @@ public void build(
* Builds the star tree using sorted and aggregated star-tree Documents
*
* @param starTreeDocumentIterator contains the sorted and aggregated documents
* @param fieldNumberAcrossStarTrees
* @param starTreeDocValuesConsumer
* @param fieldNumberAcrossStarTrees maintains a counter for the number of star-tree fields
* @param starTreeDocValuesConsumer consumes the generated star-tree docValues
* @throws IOException when we are unable to build star-tree
*/
public void build(
Expand Down Expand Up @@ -374,7 +374,7 @@ private void createSortedDocValuesIndices(DocValuesConsumer docValuesConsumer, A
}

for (int docId = 0; docId < numStarTreeDocs; docId++) {
StarTreeDocument starTreeDocument = getStarTreeDocument(docId);
StarTreeDocument starTreeDocument = getStarTreeDocumentForCreatingDocValues(docId);
for (int i = 0; i < starTreeDocument.dimensions.length; i++) {
Long val = starTreeDocument.dimensions[i];
if (val != null) {
Expand Down Expand Up @@ -439,6 +439,13 @@ public SortedNumericDocValues getSortedNumeric(FieldInfo field) {
*/
public abstract StarTreeDocument getStarTreeDocument(int docId) throws IOException;

/**
* Returns the star-tree document for the given doc id while creating doc values
*
* @param docId document id
* @return star tree document
* @throws IOException if an I/O error occurs while fetching the star-tree document
*/
public abstract StarTreeDocument getStarTreeDocumentForCreatingDocValues(int docId) throws IOException;

/**
Expand Down Expand Up @@ -620,12 +627,12 @@ protected StarTreeDocument reduceSegmentStarTreeDocuments(
*/
private static long getLong(Object metric, long idempotentMetricValue) {

Long metricValue;
long metricValue;
try {
if (metric instanceof Long) {
metricValue = (long) metric;
} else if (metric != null) {
metricValue = Long.valueOf(String.valueOf(metric));
metricValue = Long.parseLong(String.valueOf(metric));
} else {
logger.debug("metric value is null, returning idempotent metric value for the aggregator");
return idempotentMetricValue;
Expand Down Expand Up @@ -677,75 +684,6 @@ public StarTreeDocument reduceStarTreeDocuments(StarTreeDocument aggregatedDocum
}
}

/**
* Builds the star tree from the original segment documents
*
* @param fieldProducerMap contain s the docValues producer to get docValues associated with each field
* @throws IOException when we are unable to build star-tree
*/
public void build(Map<String, DocValuesProducer> fieldProducerMap) throws IOException {
long startTime = System.currentTimeMillis();
logger.debug("Star-tree build is a go with star tree field {}", starTreeField.getName());
if (totalSegmentDocs == 0) {
logger.debug("No documents found in the segment");
return;
}
List<SequentialDocValuesIterator> metricReaders = getMetricReaders(writeState, fieldProducerMap);
List<Dimension> dimensionsSplitOrder = starTreeField.getDimensionsOrder();
SequentialDocValuesIterator[] dimensionReaders = new SequentialDocValuesIterator[dimensionsSplitOrder.size()];
for (int i = 0; i < numDimensions; i++) {
String dimension = dimensionsSplitOrder.get(i).getField();
FieldInfo dimensionFieldInfo = writeState.fieldInfos.fieldInfo(dimension);
dimensionReaders[i] = new SequentialDocValuesIterator(
fieldProducerMap.get(dimensionFieldInfo.name).getSortedNumeric(dimensionFieldInfo)
);
}
Iterator<StarTreeDocument> starTreeDocumentIterator = sortAndAggregateSegmentDocuments(dimensionReaders, metricReaders);
logger.debug("Sorting and aggregating star-tree in ms : {}", (System.currentTimeMillis() - startTime));
build(starTreeDocumentIterator);
logger.debug("Finished Building star-tree in ms : {}", (System.currentTimeMillis() - startTime));
}

/**
* Builds the star tree using Star-Tree Document
*
* @param starTreeDocumentIterator contains the sorted and aggregated documents
* @throws IOException when we are unable to build star-tree
*/
void build(Iterator<StarTreeDocument> starTreeDocumentIterator) throws IOException {
int numSegmentStarTreeDocument = totalSegmentDocs;

while (starTreeDocumentIterator.hasNext()) {
appendToStarTree(starTreeDocumentIterator.next());
}
int numStarTreeDocument = numStarTreeDocs;
logger.debug("Generated star tree docs : [{}] from segment docs : [{}]", numStarTreeDocument, numSegmentStarTreeDocument);

if (numStarTreeDocs == 0) {
// TODO: Uncomment when segment codec and file formats is ready
// StarTreeBuilderUtils.serializeTree(indexOutput, rootNode, dimensionsSplitOrder, numNodes);
return;
}

constructStarTree(rootNode, 0, numStarTreeDocs);
int numStarTreeDocumentUnderStarNode = numStarTreeDocs - numStarTreeDocument;
logger.debug(
"Finished constructing star-tree, got [ {} ] tree nodes and [ {} ] starTreeDocument under star-node",
numStarTreeNodes,
numStarTreeDocumentUnderStarNode
);

createAggregatedDocs(rootNode);
int numAggregatedStarTreeDocument = numStarTreeDocs - numStarTreeDocument - numStarTreeDocumentUnderStarNode;
logger.debug("Finished creating aggregated documents : {}", numAggregatedStarTreeDocument);

// TODO: When StarTree Codec is ready
// Create doc values indices in disk
// Serialize and save in disk
// Write star tree metadata for off heap implementation

}

/**
* Adds a document to star-tree
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ void build(
) throws IOException;

/**
* Builds the star tree using StarTree values from multiple segments
* Builds the star tree using Tree values from multiple segments
*
* @param starTreeValuesSubs contains the star tree values from multiple segments
* @param fieldNumberAcrossStarTrees maintains the unique field number across the fields in the star tree
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,13 @@ public StarTreesBuilder(SegmentWriteState segmentWriteState, MapperService mappe
}

/**
* Builds the star-trees.
* Builds all star-trees for given star-tree fields.
*
* @param metaOut an IndexInput for star-tree metadata
* @param dataOut an IndexInput for star-tree data
* @param fieldProducerMap fetches iterators for the fields (dimensions and metrics)
* @param starTreeDocValuesConsumer a consumer to write star-tree doc values
* @throws IOException
*/
public void build(
IndexOutput metaOut,
Expand Down Expand Up @@ -101,11 +107,11 @@ public void close() throws IOException {
/**
* Merges star tree fields from multiple segments
*
* @param metaOut
* @param dataOut
* @param metaOut an IndexInput for star-tree metadata
* @param dataOut an IndexInput for star-tree data
* @param starTreeFieldMap StarTreeField configuration per field
* @param starTreeValuesSubsPerField starTreeValuesSubs per field
* @param starTreeDocValuesConsumer
* @param starTreeDocValuesConsumer a consumer to write star-tree doc values
*/
public void buildDuringMerge(
IndexOutput metaOut,
Expand Down
Loading

0 comments on commit 1b5009e

Please sign in to comment.