Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve partition boundaries #505

Merged
merged 13 commits into from
Feb 15, 2024
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -33,5 +33,10 @@ public enum BoundsRule {
/**
* Include only the lower bound but not the upper bound
*/
INCLUDE_LOWER
INCLUDE_LOWER,
/**
* Include none
*/
INCLUDE_NEITHER;

}
22 changes: 14 additions & 8 deletions src/main/java/org/apache/datasketches/partitions/Partitioner.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@
import static java.lang.Math.pow;
import static java.lang.Math.round;
import static java.util.Collections.unmodifiableList;
import static org.apache.datasketches.partitions.BoundsRule.INCLUDE_BOTH;
import static org.apache.datasketches.partitions.BoundsRule.INCLUDE_LOWER;
import static org.apache.datasketches.partitions.BoundsRule.INCLUDE_NEITHER;
import static org.apache.datasketches.partitions.BoundsRule.INCLUDE_UPPER;
import static org.apache.datasketches.quantilescommon.QuantileSearchCriteria.INCLUSIVE;
import static org.apache.datasketches.quantilescommon.QuantilesAPI.EMPTY_MSG;

Expand Down Expand Up @@ -184,33 +188,35 @@ public static class PartitionBoundsRow<T> {

public PartitionBoundsRow(final StackElement<T> se) {
final GenericPartitionBoundaries<T> gpb = se.gpb;
this.part = se.part;
this.levelPartId = se.levelPartId + "." + part;
final QuantileSearchCriteria searchCrit = gpb.getSearchCriteria();
final T[] boundaries = gpb.getBoundaries();
final int numParts = gpb.getNumPartitions();
this.part = se.part;
this.levelPartId = se.levelPartId + "." + part;
final long num;
this.approxNumDeltaItems = num = gpb.getNumDeltaItems()[part];
if (searchCrit == INCLUSIVE) {
if (part == 1) {
lowerBound = gpb.getMinItem();
upperBound = boundaries[part];
rule = BoundsRule.INCLUDE_BOTH;
rule = (num == 0) ? INCLUDE_NEITHER : (lowerBound == upperBound) ? INCLUDE_UPPER : INCLUDE_BOTH;
} else {
lowerBound = boundaries[part - 1];
upperBound = boundaries[part];
rule = BoundsRule.INCLUDE_UPPER;
rule = (num == 0) ? INCLUDE_NEITHER : INCLUDE_UPPER;
}
} else { //EXCLUSIVE
}
else { //EXCLUSIVE
if (part == numParts) {
lowerBound = boundaries[part - 1];
upperBound = gpb.getMaxItem();
rule = BoundsRule.INCLUDE_BOTH;
rule = (num == 0) ? INCLUDE_NEITHER : (lowerBound == upperBound) ? INCLUDE_LOWER : INCLUDE_BOTH;
} else {
lowerBound = boundaries[part - 1];
upperBound = boundaries[part];
rule = BoundsRule.INCLUDE_LOWER;
rule = (num == 0) ? INCLUDE_NEITHER : INCLUDE_LOWER;
}
}
approxNumDeltaItems = gpb.getNumDeltaItems()[part];
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.datasketches.quantiles;

import static org.apache.datasketches.quantiles.ClassicUtil.getNormalizedRankError;
import static org.apache.datasketches.quantilescommon.GenericInequalitySearch.find;
import static org.apache.datasketches.quantilescommon.QuantileSearchCriteria.INCLUSIVE;
import static org.apache.datasketches.quantilescommon.QuantilesAPI.EMPTY_MSG;
Expand Down Expand Up @@ -55,6 +56,7 @@ public class ItemsSketchSortedView<T> implements GenericSortedView<T>, Partition
private final T maxItem;
private final T minItem;
private final Class<T> clazz;
private final int k;

/**
* Construct from elements for testing.
Expand All @@ -70,14 +72,16 @@ public class ItemsSketchSortedView<T> implements GenericSortedView<T>, Partition
final long totalN,
final Comparator<T> comparator,
final T maxItem,
final T minItem) {
final T minItem,
final int k) {
this.quantiles = quantiles;
this.cumWeights = cumWeights;
this.totalN = totalN;
this.comparator = comparator;
this.maxItem = maxItem;
this.minItem = minItem;
this.clazz = (Class<T>)quantiles[0].getClass();
this.k = k;
}

/**
Expand All @@ -88,14 +92,14 @@ public class ItemsSketchSortedView<T> implements GenericSortedView<T>, Partition
ItemsSketchSortedView(final ItemsSketch<T> sketch) {
if (sketch.isEmpty()) { throw new SketchesArgumentException(EMPTY_MSG); }
this.totalN = sketch.getN();
final int k = sketch.getK();
final int numQuantiles = sketch.getNumRetained();
this.quantiles = (T[]) Array.newInstance(sketch.clazz, numQuantiles);
this.minItem = sketch.minItem_;
this.maxItem = sketch.maxItem_;
cumWeights = new long[numQuantiles];
comparator = sketch.getComparator();
clazz = sketch.clazz;
this.cumWeights = new long[numQuantiles];
this.comparator = sketch.getComparator();
this.clazz = sketch.clazz;
this.k = sketch.getK();

final Object[] combinedBuffer = sketch.getCombinedBuffer();
final int baseBufferCount = sketch.getBaseBufferCount();
Expand Down Expand Up @@ -155,27 +159,68 @@ public GenericPartitionBoundaries<T> getPartitionBoundaries(final int numEqually
final QuantileSearchCriteria searchCrit) {
if (isEmpty()) { throw new IllegalArgumentException(QuantilesAPI.EMPTY_MSG); }
final long totalN = this.totalN;
final double delta = getNormalizedRankError(k, true) * totalN;
final int maxParts = (int) (totalN / Math.ceil(delta * 2) );
final int svLen = cumWeights.length;
//adjust ends of sortedView arrays
cumWeights[0] = 1L;
cumWeights[svLen - 1] = totalN;
quantiles[0] = this.getMinItem();
quantiles[svLen - 1] = this.getMaxItem();

final double[] evSpNormRanks = evenlySpacedDoubles(0, 1.0, numEquallySized + 1);
final int len = evSpNormRanks.length;
final T[] evSpQuantiles = (T[]) Array.newInstance(clazz, len);
final long[] evSpNatRanks = new long[len];
for (int i = 0; i < len; i++) {
final int index = getQuantileIndex(evSpNormRanks[i], searchCrit);
evSpQuantiles[i] = quantiles[index];
evSpNatRanks[i] = cumWeights[index];
if (numEquallySized > maxParts) {
throw new SketchesArgumentException(QuantilesAPI.UNSUPPORTED_MSG
+ "Requested number of partitions is too large for this sized sketch. "
+ "It exceeds maximum number of partitions allowed by the error threshold for this size sketch."
+ "Requested Partitions: " + numEquallySized + " > " + maxParts);
}
if (numEquallySized > svLen / 2) {
throw new SketchesArgumentException(QuantilesAPI.UNSUPPORTED_MSG
+ "Requested number of partitions is too large for this sized sketch. "
+ "It exceeds maximum number of retained items divided by 2."
+ "Requested Partitions: " + numEquallySized + " > "
+ "Retained Items / 2: " + (svLen / 2));
}

final double[] searchNormRanks = evenlySpacedDoubles(0, 1.0, numEquallySized + 1);
final int partArrLen = searchNormRanks.length;
final T[] partQuantiles = (T[]) Array.newInstance(clazz, partArrLen);
final long[] partNatRanks = new long[partArrLen];
final double[] partNormRanks = new double[partArrLen];
//adjust ends
int adjLen = svLen;
final boolean adjLow = quantiles[0] != minItem;
final boolean adjHigh = quantiles[svLen - 1] != maxItem;
adjLen += adjLow ? 1 : 0;
adjLen += adjHigh ? 1 : 0;

final T[] adjQuantiles;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the start of a long code block with no indication of what it's doing. It'd be useful to have a little description of what's going on here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good feedback. Will do.

final long[] adjCumWeights;
if (adjLen > svLen) {
adjQuantiles = (T[]) new Object[adjLen];
adjCumWeights = new long[adjLen];
final int offset = adjLow ? 1 : 0;
System.arraycopy(quantiles, 0, adjQuantiles, offset, svLen);
System.arraycopy(cumWeights,0, adjCumWeights, offset, svLen);
if (adjLow) {
adjQuantiles[0] = minItem;
adjCumWeights[0] = 1;
}
if (adjHigh) {
adjQuantiles[adjLen - 1] = maxItem;
adjCumWeights[adjLen - 1] = cumWeights[svLen - 1];
adjCumWeights[adjLen - 2] = cumWeights[svLen - 1] - 1;
}
} else {
adjQuantiles = quantiles;
adjCumWeights = cumWeights;
}
for (int i = 0; i < partArrLen; i++) {
final int index = getQuantileIndex(searchNormRanks[i], adjCumWeights, searchCrit);
partQuantiles[i] = adjQuantiles[index];
final long cumWt = adjCumWeights[index];
partNatRanks[i] = cumWt;
partNormRanks[i] = (double)cumWt / totalN;
}
final GenericPartitionBoundaries<T> gpb = new GenericPartitionBoundaries<>(
this.totalN,
evSpQuantiles,
evSpNatRanks,
evSpNormRanks,
partQuantiles,
partNatRanks,
partNormRanks,
getMaxItem(),
getMinItem(),
searchCrit);
Expand All @@ -198,13 +243,14 @@ public double[] getPMF(final T[] splitPoints, final QuantileSearchCriteria searc
public T getQuantile(final double rank, final QuantileSearchCriteria searchCrit) {
if (isEmpty()) { throw new IllegalArgumentException(EMPTY_MSG); }
QuantilesUtil.checkNormalizedRankBounds(rank);
final int index = getQuantileIndex(rank, searchCrit);
final int index = getQuantileIndex(rank, cumWeights, searchCrit);
return quantiles[index];
}

private int getQuantileIndex(final double rank, final QuantileSearchCriteria searchCrit) {
private int getQuantileIndex(final double normRank, final long[] cumWeights,
final QuantileSearchCriteria searchCrit) {
final int len = cumWeights.length;
final double naturalRank = getNaturalRank(rank, totalN, searchCrit);
final double naturalRank = getNaturalRank(normRank, totalN, searchCrit);
final InequalitySearch crit = (searchCrit == INCLUSIVE) ? InequalitySearch.GE : InequalitySearch.GT;
final int index = InequalitySearch.find(cumWeights, 0, len - 1, naturalRank, crit);
if (index == -1) { return len - 1; }
Expand Down
54 changes: 28 additions & 26 deletions src/main/java/org/apache/datasketches/quantiles/ItemsUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@
package org.apache.datasketches.quantiles;

import static org.apache.datasketches.common.Util.LS;
import static org.apache.datasketches.quantiles.ClassicUtil.computeNumLevelsNeeded;
import static org.apache.datasketches.quantiles.ClassicUtil.computeValidLevels;
import static org.apache.datasketches.quantiles.ClassicUtil.getNormalizedRankError;

import java.util.Arrays;

Expand Down Expand Up @@ -85,7 +88,7 @@ static <T> String toString(final boolean sketchSummary, final boolean dataDetail
final long bitPattern = sketch.getBitPattern();

if (dataDetail) {
sb.append(ClassicUtil.LS).append("### ").append(thisSimpleName).append(" DATA DETAIL: ").append(ClassicUtil.LS);
sb.append(LS).append("### ").append(thisSimpleName).append(" DATA DETAIL: ").append(LS);
final Object[] items = sketch.getCombinedBuffer();

//output the base buffer
Expand All @@ -95,7 +98,7 @@ static <T> String toString(final boolean sketchSummary, final boolean dataDetail
sb.append(' ').append(items[i]);
}
}
sb.append(ClassicUtil.LS);
sb.append(LS);
//output all the levels
final int numItems = combAllocCount;
if (numItems > (2 * k)) {
Expand All @@ -105,46 +108,45 @@ static <T> String toString(final boolean sketchSummary, final boolean dataDetail
final int levelNum = j > (2 * k) ? (j - (2 * k)) / k : 0;
final String validLvl = ((1L << levelNum) & bitPattern) > 0 ? " T " : " F ";
final String lvl = String.format("%5d", levelNum);
sb.append(ClassicUtil.LS).append(" ").append(validLvl).append(" ").append(lvl).append(":");
sb.append(LS).append(" ").append(validLvl).append(" ").append(lvl).append(":");
}
sb.append(' ').append(items[j]);
}
sb.append(ClassicUtil.LS);
sb.append(LS);
}
sb.append("### END DATA DETAIL").append(ClassicUtil.LS);
sb.append("### END DATA DETAIL").append(LS);
}

if (sketchSummary) {
final long n = sketch.getN();
final String nStr = String.format("%,d", n);
final int numLevels = ClassicUtil.computeNumLevelsNeeded(k, n);
final String bufCntStr = String.format("%,d", combAllocCount);
final String baseBufCntStr = String.format("%,d", bbCount);
final String cBufCntStr = String.format("%,d", combAllocCount);
final int totNumLevels = computeNumLevelsNeeded(k, n);
final int numValidSamples = sketch.getNumRetained();
final String numValidSampStr = String.format("%,d", numValidSamples);
final int preBytes = sketch.isEmpty() ? Long.BYTES : 2 * Long.BYTES;
final double epsPmf = ClassicUtil.getNormalizedRankError(k, true);
final double epsPmf = getNormalizedRankError(k, true);
final String epsPmfPctStr = String.format("%.3f%%", epsPmf * 100.0);
final double eps = ClassicUtil.getNormalizedRankError(k, false);
final double eps = getNormalizedRankError(k, false);
final String epsPctStr = String.format("%.3f%%", eps * 100.0);
final int numSamples = sketch.getNumRetained();
final String numSampStr = String.format("%,d", numSamples);
final T minItem = sketch.isEmpty() ? null : sketch.getMinItem();
final T maxItem = sketch.isEmpty() ? null : sketch.getMaxItem();
sb.append(ClassicUtil.LS).append("### ").append(thisSimpleName).append(" SUMMARY: ").append(ClassicUtil.LS);
sb.append(" K : ").append(k).append(ClassicUtil.LS);
sb.append(" N : ").append(nStr).append(ClassicUtil.LS);
sb.append(" BaseBufferCount : ").append(bbCount).append(ClassicUtil.LS);
sb.append(" CombinedBufferAllocatedCount : ").append(bufCntStr).append(ClassicUtil.LS);
sb.append(" Total Levels : ").append(numLevels).append(ClassicUtil.LS);
sb.append(" Valid Levels : ").append(ClassicUtil.computeValidLevels(bitPattern))
.append(ClassicUtil.LS);
sb.append(" Level Bit Pattern : ").append(Long.toBinaryString(bitPattern))
.append(ClassicUtil.LS);
sb.append(" Valid Samples : ").append(numSampStr).append(ClassicUtil.LS);
sb.append(" Preamble Bytes : ").append(preBytes).append(ClassicUtil.LS);
sb.append(LS).append("### ").append(thisSimpleName).append(" SUMMARY: ").append(LS);
sb.append(" K : ").append(k).append(LS);
sb.append(" N : ").append(nStr).append(LS);
sb.append(" BaseBufferCount : ").append(baseBufCntStr).append(LS);
sb.append(" CombinedBufferAllocatedCount : ").append(cBufCntStr).append(LS);
sb.append(" Total Levels : ").append(totNumLevels).append(LS);
sb.append(" Valid Levels : ").append(computeValidLevels(bitPattern)).append(LS);
sb.append(" Level Bit Pattern : ").append(Long.toBinaryString(bitPattern)).append(LS);
sb.append(" Valid Samples : ").append(numValidSampStr).append(LS);
sb.append(" Preamble Bytes : ").append(preBytes).append(LS);
sb.append(" Normalized Rank Error : ").append(epsPctStr).append(LS);
sb.append(" Normalized Rank Error (PMF) : ").append(epsPmfPctStr).append(LS);
sb.append(" Min Quantile : ").append(minItem).append(ClassicUtil.LS);
sb.append(" Max Quantile : ").append(maxItem).append(ClassicUtil.LS);
sb.append("### END SKETCH SUMMARY").append(ClassicUtil.LS);
sb.append(" Min Quantile : ").append(minItem).append(LS);
sb.append(" Max Quantile : ").append(maxItem).append(LS);
sb.append("### END SKETCH SUMMARY").append(LS);
}
return sb.toString();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

package org.apache.datasketches.quantilescommon;

import static org.apache.datasketches.quantilescommon.QuantileSearchCriteria.INCLUSIVE;

/**
* Iterator over quantile sketches of generic type.
* @param <T> The generic quantile type
Expand All @@ -32,15 +34,32 @@ public GenericSortedViewIterator(final T[] quantiles, final long[] cumWeights) {
}

/**
* Gets the quantile at the current index.
* Gets the quantile at the current index
* This is equivalent to <i>getQuantile(INCLUSIVE)</i>.
*
* <p>Don't call this before calling next() for the first time
* or after getting false from next().</p>
* <p>Don't call this before calling next() for the first time or after getting false from next().</p>
*
* @return the quantile at the current index.
*/
public T getQuantile() {
return quantiles[index];
}

/**
* Gets the quantile at the current index (or previous index)
* based on the chosen search criterion.
*
* <p>Don't call this before calling next() for the first time or after getting false from next().</p>
*
* @param searchCrit if INCLUSIVE, includes the quantile at the current index.
* Otherwise, returns the quantile of the previous index.
*
* @return the quantile at the current index (or previous index)
* based on the chosen search criterion. If the chosen search criterion is <i>EXCLUSIVE</i> and
* the current index is at zero, this will return <i>null</i>.
*/
public T getQuantile(final QuantileSearchCriteria searchCrit) {
if (searchCrit == INCLUSIVE) { return quantiles[index]; }
return (index == 0) ? null : quantiles[index - 1];
}
}
Loading
Loading