Skip to content

Commit

Permalink
Merge pull request #505 from apache/improve_partition_boundaries
Browse files Browse the repository at this point in the history
Improve partition boundaries
  • Loading branch information
leerho authored Feb 15, 2024
2 parents 3410e86 + cfedb14 commit e89a10e
Show file tree
Hide file tree
Showing 11 changed files with 283 additions and 76 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,5 +33,10 @@ public enum BoundsRule {
/**
* Include only the lower bound but not the upper bound
*/
INCLUDE_LOWER
INCLUDE_LOWER,
/**
* Include none
*/
INCLUDE_NEITHER;

}
22 changes: 14 additions & 8 deletions src/main/java/org/apache/datasketches/partitions/Partitioner.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@
import static java.lang.Math.pow;
import static java.lang.Math.round;
import static java.util.Collections.unmodifiableList;
import static org.apache.datasketches.partitions.BoundsRule.INCLUDE_BOTH;
import static org.apache.datasketches.partitions.BoundsRule.INCLUDE_LOWER;
import static org.apache.datasketches.partitions.BoundsRule.INCLUDE_NEITHER;
import static org.apache.datasketches.partitions.BoundsRule.INCLUDE_UPPER;
import static org.apache.datasketches.quantilescommon.QuantileSearchCriteria.INCLUSIVE;
import static org.apache.datasketches.quantilescommon.QuantilesAPI.EMPTY_MSG;

Expand Down Expand Up @@ -184,33 +188,35 @@ public static class PartitionBoundsRow<T> {

public PartitionBoundsRow(final StackElement<T> se) {
final GenericPartitionBoundaries<T> gpb = se.gpb;
this.part = se.part;
this.levelPartId = se.levelPartId + "." + part;
final QuantileSearchCriteria searchCrit = gpb.getSearchCriteria();
final T[] boundaries = gpb.getBoundaries();
final int numParts = gpb.getNumPartitions();
this.part = se.part;
this.levelPartId = se.levelPartId + "." + part;
final long num;
this.approxNumDeltaItems = num = gpb.getNumDeltaItems()[part];
if (searchCrit == INCLUSIVE) {
if (part == 1) {
lowerBound = gpb.getMinItem();
upperBound = boundaries[part];
rule = BoundsRule.INCLUDE_BOTH;
rule = (num == 0) ? INCLUDE_NEITHER : (lowerBound == upperBound) ? INCLUDE_UPPER : INCLUDE_BOTH;
} else {
lowerBound = boundaries[part - 1];
upperBound = boundaries[part];
rule = BoundsRule.INCLUDE_UPPER;
rule = (num == 0) ? INCLUDE_NEITHER : INCLUDE_UPPER;
}
} else { //EXCLUSIVE
}
else { //EXCLUSIVE
if (part == numParts) {
lowerBound = boundaries[part - 1];
upperBound = gpb.getMaxItem();
rule = BoundsRule.INCLUDE_BOTH;
rule = (num == 0) ? INCLUDE_NEITHER : (lowerBound == upperBound) ? INCLUDE_LOWER : INCLUDE_BOTH;
} else {
lowerBound = boundaries[part - 1];
upperBound = boundaries[part];
rule = BoundsRule.INCLUDE_LOWER;
rule = (num == 0) ? INCLUDE_NEITHER : INCLUDE_LOWER;
}
}
approxNumDeltaItems = gpb.getNumDeltaItems()[part];
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.datasketches.quantiles;

import static org.apache.datasketches.quantiles.ClassicUtil.getNormalizedRankError;
import static org.apache.datasketches.quantilescommon.GenericInequalitySearch.find;
import static org.apache.datasketches.quantilescommon.QuantileSearchCriteria.INCLUSIVE;
import static org.apache.datasketches.quantilescommon.QuantilesAPI.EMPTY_MSG;
Expand Down Expand Up @@ -55,6 +56,7 @@ public class ItemsSketchSortedView<T> implements GenericSortedView<T>, Partition
private final T maxItem;
private final T minItem;
private final Class<T> clazz;
private final int k;

/**
* Construct from elements for testing.
Expand All @@ -70,14 +72,16 @@ public class ItemsSketchSortedView<T> implements GenericSortedView<T>, Partition
final long totalN,
final Comparator<T> comparator,
final T maxItem,
final T minItem) {
final T minItem,
final int k) {
this.quantiles = quantiles;
this.cumWeights = cumWeights;
this.totalN = totalN;
this.comparator = comparator;
this.maxItem = maxItem;
this.minItem = minItem;
this.clazz = (Class<T>)quantiles[0].getClass();
this.k = k;
}

/**
Expand All @@ -88,14 +92,14 @@ public class ItemsSketchSortedView<T> implements GenericSortedView<T>, Partition
ItemsSketchSortedView(final ItemsSketch<T> sketch) {
if (sketch.isEmpty()) { throw new SketchesArgumentException(EMPTY_MSG); }
this.totalN = sketch.getN();
final int k = sketch.getK();
final int numQuantiles = sketch.getNumRetained();
this.quantiles = (T[]) Array.newInstance(sketch.clazz, numQuantiles);
this.minItem = sketch.minItem_;
this.maxItem = sketch.maxItem_;
cumWeights = new long[numQuantiles];
comparator = sketch.getComparator();
clazz = sketch.clazz;
this.cumWeights = new long[numQuantiles];
this.comparator = sketch.getComparator();
this.clazz = sketch.clazz;
this.k = sketch.getK();

final Object[] combinedBuffer = sketch.getCombinedBuffer();
final int baseBufferCount = sketch.getBaseBufferCount();
Expand Down Expand Up @@ -155,27 +159,84 @@ public GenericPartitionBoundaries<T> getPartitionBoundaries(final int numEqually
final QuantileSearchCriteria searchCrit) {
if (isEmpty()) { throw new IllegalArgumentException(QuantilesAPI.EMPTY_MSG); }
final long totalN = this.totalN;
final double delta = getNormalizedRankError(k, true) * totalN;
final int maxParts = (int) (totalN / Math.ceil(delta * 2) );
final int svLen = cumWeights.length;
//adjust ends of sortedView arrays
cumWeights[0] = 1L;
cumWeights[svLen - 1] = totalN;
quantiles[0] = this.getMinItem();
quantiles[svLen - 1] = this.getMaxItem();

final double[] evSpNormRanks = evenlySpacedDoubles(0, 1.0, numEquallySized + 1);
final int len = evSpNormRanks.length;
final T[] evSpQuantiles = (T[]) Array.newInstance(clazz, len);
final long[] evSpNatRanks = new long[len];
for (int i = 0; i < len; i++) {
final int index = getQuantileIndex(evSpNormRanks[i], searchCrit);
evSpQuantiles[i] = quantiles[index];
evSpNatRanks[i] = cumWeights[index];
if (numEquallySized > maxParts) {
throw new SketchesArgumentException(QuantilesAPI.UNSUPPORTED_MSG
+ "The requested number of partitions is too large for the 'k' of this sketch "
+ "if it exceeds the maximum number of partitions allowed by the error threshold for the 'k' of this sketch."
+ "Requested Partitions: " + numEquallySized + " > " + maxParts);
}
if (numEquallySized > svLen / 2.0) {
throw new SketchesArgumentException(QuantilesAPI.UNSUPPORTED_MSG
+ "The requested number of partitions is too large for the number of retained items "
+ "if it exceeds maximum number of retained items divided by 2."
+ "Requested Partitions: " + numEquallySized + " > "
+ "Retained Items / 2: " + (svLen / 2));
}

final double[] searchNormRanks = evenlySpacedDoubles(0, 1.0, numEquallySized + 1);
final int partArrLen = searchNormRanks.length;
final T[] partQuantiles = (T[]) Array.newInstance(clazz, partArrLen);
final long[] partNatRanks = new long[partArrLen];
final double[] partNormRanks = new double[partArrLen];

//Adjust End Points: The ends of the Sorted View arrays may be missing the actual MinItem and MaxItem bounds,
// which are absolutely required when partitioning, especially inner partitions.

//Are the minItem and maxItem already in place?
int adjLen = svLen; //this will be the length of the local copies of quantiles and cumWeights
final boolean adjLow = quantiles[0] != minItem; //if true, adjust the low end
final boolean adjHigh = quantiles[svLen - 1] != maxItem; //if true, adjust the high end
adjLen += adjLow ? 1 : 0;
adjLen += adjHigh ? 1 : 0;

//These are local copies of the quantiles and cumWeights arrays just for partitioning.
//The rest of the SV remains unchanged.
final T[] adjQuantiles;
final long[] adjCumWeights;
if (adjLen > svLen) { //is any adjustment required at all?
adjQuantiles = (T[]) new Object[adjLen];
adjCumWeights = new long[adjLen];
final int offset = adjLow ? 1 : 0;
System.arraycopy(quantiles, 0, adjQuantiles, offset, svLen);
System.arraycopy(cumWeights,0, adjCumWeights, offset, svLen);

//Adjust the low end if required.
if (adjLow) {
adjQuantiles[0] = minItem;
adjCumWeights[0] = 1;
}
//When inserting a MaxItem, if required, we can't just add it at the top of the quantiles array,
// we have to adjust the cumulative weight of the item just before it as well so that the maximum cumulative
// weight at the upper end still equals totalN. (This is not the case at the low end. Quiz #1: Why? )
// If the maxItem is missing, the quantile that is currently in the top
// position must have a weight >= 2. (Quiz #2: Why?). Thus, it is safe to subtract 1.
if (adjHigh) {
adjQuantiles[adjLen - 1] = maxItem;
adjCumWeights[adjLen - 1] = cumWeights[svLen - 1];
adjCumWeights[adjLen - 2] = cumWeights[svLen - 1] - 1;
}
} else { //both min and max are already in place, no adjustments are required.
adjQuantiles = quantiles;
adjCumWeights = cumWeights;
} //END of Adjust End Points

//compute the quantiles and natural and normalized ranks for the partition boundaries.
for (int i = 0; i < partArrLen; i++) {
final int index = getQuantileIndex(searchNormRanks[i], adjCumWeights, searchCrit);
partQuantiles[i] = adjQuantiles[index];
final long cumWt = adjCumWeights[index];
partNatRanks[i] = cumWt;
partNormRanks[i] = (double)cumWt / totalN;
}
//Return the GPB of the complete specification of the boundaries.
final GenericPartitionBoundaries<T> gpb = new GenericPartitionBoundaries<>(
this.totalN,
evSpQuantiles,
evSpNatRanks,
evSpNormRanks,
partQuantiles,
partNatRanks,
partNormRanks,
getMaxItem(),
getMinItem(),
searchCrit);
Expand All @@ -198,13 +259,14 @@ public double[] getPMF(final T[] splitPoints, final QuantileSearchCriteria searc
public T getQuantile(final double rank, final QuantileSearchCriteria searchCrit) {
if (isEmpty()) { throw new IllegalArgumentException(EMPTY_MSG); }
QuantilesUtil.checkNormalizedRankBounds(rank);
final int index = getQuantileIndex(rank, searchCrit);
final int index = getQuantileIndex(rank, cumWeights, searchCrit);
return quantiles[index];
}

private int getQuantileIndex(final double rank, final QuantileSearchCriteria searchCrit) {
private int getQuantileIndex(final double normRank, final long[] cumWeights,
final QuantileSearchCriteria searchCrit) {
final int len = cumWeights.length;
final double naturalRank = getNaturalRank(rank, totalN, searchCrit);
final double naturalRank = getNaturalRank(normRank, totalN, searchCrit);
final InequalitySearch crit = (searchCrit == INCLUSIVE) ? InequalitySearch.GE : InequalitySearch.GT;
final int index = InequalitySearch.find(cumWeights, 0, len - 1, naturalRank, crit);
if (index == -1) { return len - 1; }
Expand Down
54 changes: 28 additions & 26 deletions src/main/java/org/apache/datasketches/quantiles/ItemsUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@
package org.apache.datasketches.quantiles;

import static org.apache.datasketches.common.Util.LS;
import static org.apache.datasketches.quantiles.ClassicUtil.computeNumLevelsNeeded;
import static org.apache.datasketches.quantiles.ClassicUtil.computeValidLevels;
import static org.apache.datasketches.quantiles.ClassicUtil.getNormalizedRankError;

import java.util.Arrays;

Expand Down Expand Up @@ -85,7 +88,7 @@ static <T> String toString(final boolean sketchSummary, final boolean dataDetail
final long bitPattern = sketch.getBitPattern();

if (dataDetail) {
sb.append(ClassicUtil.LS).append("### ").append(thisSimpleName).append(" DATA DETAIL: ").append(ClassicUtil.LS);
sb.append(LS).append("### ").append(thisSimpleName).append(" DATA DETAIL: ").append(LS);
final Object[] items = sketch.getCombinedBuffer();

//output the base buffer
Expand All @@ -95,7 +98,7 @@ static <T> String toString(final boolean sketchSummary, final boolean dataDetail
sb.append(' ').append(items[i]);
}
}
sb.append(ClassicUtil.LS);
sb.append(LS);
//output all the levels
final int numItems = combAllocCount;
if (numItems > (2 * k)) {
Expand All @@ -105,46 +108,45 @@ static <T> String toString(final boolean sketchSummary, final boolean dataDetail
final int levelNum = j > (2 * k) ? (j - (2 * k)) / k : 0;
final String validLvl = ((1L << levelNum) & bitPattern) > 0 ? " T " : " F ";
final String lvl = String.format("%5d", levelNum);
sb.append(ClassicUtil.LS).append(" ").append(validLvl).append(" ").append(lvl).append(":");
sb.append(LS).append(" ").append(validLvl).append(" ").append(lvl).append(":");
}
sb.append(' ').append(items[j]);
}
sb.append(ClassicUtil.LS);
sb.append(LS);
}
sb.append("### END DATA DETAIL").append(ClassicUtil.LS);
sb.append("### END DATA DETAIL").append(LS);
}

if (sketchSummary) {
final long n = sketch.getN();
final String nStr = String.format("%,d", n);
final int numLevels = ClassicUtil.computeNumLevelsNeeded(k, n);
final String bufCntStr = String.format("%,d", combAllocCount);
final String baseBufCntStr = String.format("%,d", bbCount);
final String cBufCntStr = String.format("%,d", combAllocCount);
final int totNumLevels = computeNumLevelsNeeded(k, n);
final int numValidSamples = sketch.getNumRetained();
final String numValidSampStr = String.format("%,d", numValidSamples);
final int preBytes = sketch.isEmpty() ? Long.BYTES : 2 * Long.BYTES;
final double epsPmf = ClassicUtil.getNormalizedRankError(k, true);
final double epsPmf = getNormalizedRankError(k, true);
final String epsPmfPctStr = String.format("%.3f%%", epsPmf * 100.0);
final double eps = ClassicUtil.getNormalizedRankError(k, false);
final double eps = getNormalizedRankError(k, false);
final String epsPctStr = String.format("%.3f%%", eps * 100.0);
final int numSamples = sketch.getNumRetained();
final String numSampStr = String.format("%,d", numSamples);
final T minItem = sketch.isEmpty() ? null : sketch.getMinItem();
final T maxItem = sketch.isEmpty() ? null : sketch.getMaxItem();
sb.append(ClassicUtil.LS).append("### ").append(thisSimpleName).append(" SUMMARY: ").append(ClassicUtil.LS);
sb.append(" K : ").append(k).append(ClassicUtil.LS);
sb.append(" N : ").append(nStr).append(ClassicUtil.LS);
sb.append(" BaseBufferCount : ").append(bbCount).append(ClassicUtil.LS);
sb.append(" CombinedBufferAllocatedCount : ").append(bufCntStr).append(ClassicUtil.LS);
sb.append(" Total Levels : ").append(numLevels).append(ClassicUtil.LS);
sb.append(" Valid Levels : ").append(ClassicUtil.computeValidLevels(bitPattern))
.append(ClassicUtil.LS);
sb.append(" Level Bit Pattern : ").append(Long.toBinaryString(bitPattern))
.append(ClassicUtil.LS);
sb.append(" Valid Samples : ").append(numSampStr).append(ClassicUtil.LS);
sb.append(" Preamble Bytes : ").append(preBytes).append(ClassicUtil.LS);
sb.append(LS).append("### ").append(thisSimpleName).append(" SUMMARY: ").append(LS);
sb.append(" K : ").append(k).append(LS);
sb.append(" N : ").append(nStr).append(LS);
sb.append(" BaseBufferCount : ").append(baseBufCntStr).append(LS);
sb.append(" CombinedBufferAllocatedCount : ").append(cBufCntStr).append(LS);
sb.append(" Total Levels : ").append(totNumLevels).append(LS);
sb.append(" Valid Levels : ").append(computeValidLevels(bitPattern)).append(LS);
sb.append(" Level Bit Pattern : ").append(Long.toBinaryString(bitPattern)).append(LS);
sb.append(" Valid Samples : ").append(numValidSampStr).append(LS);
sb.append(" Preamble Bytes : ").append(preBytes).append(LS);
sb.append(" Normalized Rank Error : ").append(epsPctStr).append(LS);
sb.append(" Normalized Rank Error (PMF) : ").append(epsPmfPctStr).append(LS);
sb.append(" Min Quantile : ").append(minItem).append(ClassicUtil.LS);
sb.append(" Max Quantile : ").append(maxItem).append(ClassicUtil.LS);
sb.append("### END SKETCH SUMMARY").append(ClassicUtil.LS);
sb.append(" Min Quantile : ").append(minItem).append(LS);
sb.append(" Max Quantile : ").append(maxItem).append(LS);
sb.append("### END SKETCH SUMMARY").append(LS);
}
return sb.toString();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

package org.apache.datasketches.quantilescommon;

import static org.apache.datasketches.quantilescommon.QuantileSearchCriteria.INCLUSIVE;

/**
* Iterator over quantile sketches of generic type.
* @param <T> The generic quantile type
Expand All @@ -32,15 +34,32 @@ public GenericSortedViewIterator(final T[] quantiles, final long[] cumWeights) {
}

/**
* Gets the quantile at the current index.
* Gets the quantile at the current index
* This is equivalent to <i>getQuantile(INCLUSIVE)</i>.
*
* <p>Don't call this before calling next() for the first time
* or after getting false from next().</p>
* <p>Don't call this before calling next() for the first time or after getting false from next().</p>
*
* @return the quantile at the current index.
*/
public T getQuantile() {
return quantiles[index];
}

/**
* Gets the quantile at the current index (or previous index)
* based on the chosen search criterion.
*
* <p>Don't call this before calling next() for the first time or after getting false from next().</p>
*
* @param searchCrit if INCLUSIVE, includes the quantile at the current index.
* Otherwise, returns the quantile of the previous index.
*
* @return the quantile at the current index (or previous index)
* based on the chosen search criterion. If the chosen search criterion is <i>EXCLUSIVE</i> and
* the current index is at zero, this will return <i>null</i>.
*/
public T getQuantile(final QuantileSearchCriteria searchCrit) {
if (searchCrit == INCLUSIVE) { return quantiles[index]; }
return (index == 0) ? null : quantiles[index - 1];
}
}
Loading

0 comments on commit e89a10e

Please sign in to comment.