Skip to content

Commit

Permalink
Merge pull request #488 from apache/kll_weighted_updates_group2
Browse files Browse the repository at this point in the history
Kll weighted updates group2
  • Loading branch information
leerho authored Jan 5, 2024
2 parents 1d5ea1c + 1db34d9 commit 9c2f8d9
Show file tree
Hide file tree
Showing 15 changed files with 222 additions and 161 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ public long getN() {

//restricted

@Override //returns updatable, expanded array including empty/garbage space at bottom
@Override //returns updatable, expanded array including free space at bottom
double[] getDoubleItemsArray() {
final int k = getK();
if (sketchStructure == COMPACT_EMPTY) { return new double[k]; }
Expand All @@ -196,7 +196,7 @@ public long getN() {
return doubleItemsArr;
}

@Override //returns compact items array of retained items, no empty/garbage.
@Override //returns compact items array of retained items, no free space.
double[] getDoubleRetainedItemsArray() {
if (sketchStructure == COMPACT_EMPTY) { return new double[0]; }
if (sketchStructure == COMPACT_SINGLE) { return new double[] { getDoubleSingleItem() }; }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ public long getN() {

//restricted

@Override //returns updatable, expanded array including empty/garbage space at bottom
@Override //returns updatable, expanded array including free space at bottom
float[] getFloatItemsArray() {
final int k = getK();
if (sketchStructure == COMPACT_EMPTY) { return new float[k]; }
Expand All @@ -196,7 +196,7 @@ float[] getFloatItemsArray() {
return floatItemsArr;
}

@Override //returns compact items array of retained items, no empty/garbage.
@Override //returns compact items array of retained items, no free space.
float[] getFloatRetainedItemsArray() {
if (sketchStructure == COMPACT_EMPTY) { return new float[0]; }
if (sketchStructure == COMPACT_SINGLE) { return new float[] { getFloatSingleItem() }; }
Expand Down
33 changes: 18 additions & 15 deletions src/main/java/org/apache/datasketches/kll/KllDoublesHelper.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import static org.apache.datasketches.common.Util.isEven;
import static org.apache.datasketches.common.Util.isOdd;
import static org.apache.datasketches.kll.KllHelper.findLevelToCompact;
import static org.apache.datasketches.kll.KllSketch.DEFAULT_M;
import static org.apache.datasketches.kll.KllSketch.SketchStructure.UPDATABLE;

import java.util.Arrays;
import java.util.Random;
Expand Down Expand Up @@ -313,30 +315,31 @@ private static void randomlyHalveUpDoubles(final double[] buf, final int start,
}
}

//Called from KllDoublesSketch::update and this
//Called from KllDoublesSketch::update and merge
static void updateDouble(final KllDoublesSketch dblSk, final double item) {
if (Double.isNaN(item)) { return; } //ignore
if (dblSk.isEmpty()) {
dblSk.setMinItem(item);
dblSk.setMaxItem(item);
} else {
dblSk.setMinItem(min(dblSk.getMinItem(), item));
dblSk.setMaxItem(max(dblSk.getMaxItem(), item));
}
int level0space = dblSk.levelsArr[0];
assert (level0space >= 0);
if (level0space == 0) {
int freeSpace = dblSk.levelsArr[0];
assert (freeSpace >= 0);
if (freeSpace == 0) {
compressWhileUpdatingSketch(dblSk);
level0space = dblSk.levelsArr[0];
assert (level0space > 0);
freeSpace = dblSk.levelsArr[0];
assert (freeSpace > 0);
}
dblSk.incN();
dblSk.setLevelZeroSorted(false);
final int nextPos = level0space - 1;
final int nextPos = freeSpace - 1;
dblSk.setLevelsArrayAt(0, nextPos);
dblSk.setDoubleItemsArrayAt(nextPos, item);
}

static void updateDouble(final KllDoublesSketch dblSk, final double item, final int weight) {
if (weight < dblSk.getLevelsArray(UPDATABLE)[0]) {
for (int i = 0; i < weight; i++) { dblSk.update(item); }
} else {
final KllHeapDoublesSketch tmpSk = new KllHeapDoublesSketch(dblSk.getK(), DEFAULT_M, item, weight);
dblSk.merge(tmpSk);
}
}

/**
* Compression algorithm used to merge higher levels.
* <p>Here is what we do for each level:</p>
Expand Down
24 changes: 19 additions & 5 deletions src/main/java/org/apache/datasketches/kll/KllDoublesSketch.java
Original file line number Diff line number Diff line change
Expand Up @@ -307,19 +307,21 @@ public byte[] toByteArray() {
}

@Override
public String toString(final boolean withSummary, final boolean withData) {
public String toString(final boolean withSummary, final boolean withDetail) {
KllSketch sketch = this;
if (withData && sketchStructure != UPDATABLE) {
if (withDetail && sketchStructure != UPDATABLE) {
final Memory mem = getWritableMemory();
assert mem != null;
sketch = KllDoublesSketch.heapify(getWritableMemory());
}
return KllHelper.toStringImpl(sketch, withSummary, withData, getSerDe());
return KllHelper.toStringImpl(sketch, withSummary, withDetail, getSerDe());
}

@Override
public void update(final double item) {
if (Double.isNaN(item)) { return; } //ignore
if (readOnly) { throw new SketchesArgumentException(TGT_IS_READ_ONLY_MSG); }
updateMinMax(item);
KllDoublesHelper.updateDouble(this, item);
kllDoublesSV = null;
}
Expand All @@ -329,11 +331,13 @@ public void update(final double item) {
* @param item the item to be repeated. NaNs are ignored.
* @param weight the number of times the update of item is to be repeated. It must be &ge; one.
*/
public void weightedUpdate(final double item, final int weight) {
public void update(final double item, final int weight) {
if (Double.isNaN(item)) { return; } //ignore
if (readOnly) { throw new SketchesArgumentException(TGT_IS_READ_ONLY_MSG); }
if (weight < 1) { throw new SketchesArgumentException("Weight is less than one."); }
if (Double.isNaN(item)) { return; } //ignore
KllHeapDoublesSketch.weightedUpdateDouble(this, item, weight);
updateMinMax(item);
KllDoublesHelper.updateDouble(this, item, weight);
kllDoublesSV = null;
}

Expand Down Expand Up @@ -403,4 +407,14 @@ private final void refreshSortedView() {

abstract void setMinItem(double item);

private void updateMinMax(final double item) {
if (isEmpty()) {
setMinItem(item);
setMaxItem(item);
} else {
setMinItem(min(getMinItem(), item));
setMaxItem(max(getMaxItem(), item));
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public KllDoublesSketchSortedView(final KllDoublesSketch sketch) {
if (!sketch.hasMemory()) { sketch.setLevelZeroSorted(true); }
}

final int numQuantiles = srcLevels[srcNumLevels] - srcLevels[0]; //remove garbage
final int numQuantiles = srcLevels[srcNumLevels] - srcLevels[0]; //remove free space
quantiles = new double[numQuantiles];
cumWeights = new long[numQuantiles];
populateFromSketch(srcQuantiles, srcLevels, srcNumLevels, numQuantiles);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public KllFloatsSketchSortedView(final KllFloatsSketch sketch) {
if (!sketch.hasMemory()) { sketch.setLevelZeroSorted(true); }
}

final int numQuantiles = srcLevels[srcNumLevels] - srcLevels[0]; //remove garbage
final int numQuantiles = srcLevels[srcNumLevels] - srcLevels[0]; //remove free space
quantiles = new float[numQuantiles];
cumWeights = new long[numQuantiles];
populateFromSketch(srcQuantiles, srcLevels, srcNumLevels, numQuantiles);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,10 +136,10 @@ else if (memStructure == COMPACT_FULL) {
maxDoubleItem = srcMem.getDouble(offsetBytes);
offsetBytes += Double.BYTES;
final int capacityItems = levelsArr[getNumLevels()];
final int garbageItems = levelsArr[0];
final int retainedItems = capacityItems - garbageItems;
final int freeSpace = levelsArr[0];
final int retainedItems = capacityItems - freeSpace;
doubleItems = new double[capacityItems];
srcMem.getDoubleArray(offsetBytes, doubleItems, garbageItems, retainedItems);
srcMem.getDoubleArray(offsetBytes, doubleItems, freeSpace, retainedItems);
}
else { //(memStructure == UPDATABLE)
int offsetBytes = DATA_START_ADR;
Expand Down Expand Up @@ -301,13 +301,4 @@ void setNumLevels(final int numLevels) {
@Override
void setWritableMemory(final WritableMemory wmem) { }

static void weightedUpdateDouble(final KllDoublesSketch dblSk, final double item, final int weight) {
if (weight < dblSk.getLevelsArray(UPDATABLE)[0]) {
for (int i = 0; i < weight; i++) { dblSk.update(item); }
} else {
final KllHeapDoublesSketch tmpSk = new KllHeapDoublesSketch(dblSk.getK(), DEFAULT_M, item, weight);
dblSk.merge(tmpSk);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -117,10 +117,10 @@ else if (memStructure == COMPACT_FULL) {
maxFloatItem = srcMem.getFloat(offsetBytes);
offsetBytes += Float.BYTES;
final int capacityItems = levelsArr[getNumLevels()];
final int garbageItems = levelsArr[0];
final int retainedItems = capacityItems - garbageItems;
final int freeSpace = levelsArr[0];
final int retainedItems = capacityItems - freeSpace;
floatItems = new float[capacityItems];
srcMem.getFloatArray(offsetBytes, floatItems, garbageItems, retainedItems);
srcMem.getFloatArray(offsetBytes, floatItems, freeSpace, retainedItems);
}
else { //(memStructure == UPDATABLE)
int offsetBytes = DATA_START_ADR;
Expand Down
122 changes: 70 additions & 52 deletions src/main/java/org/apache/datasketches/kll/KllHelper.java
Original file line number Diff line number Diff line change
Expand Up @@ -365,10 +365,10 @@ private static String outputData(final KllSketch sketch) {
final int k = sketch.getK();
final int m = sketch.getM();
final StringBuilder sb = new StringBuilder();
sb.append("### KllSketch itemsArray & levelsArray data:").append(LS);
sb.append(LS + "### KLL ItemsArray & LevelsArray Detail:").append(LS);
sb.append("Index, Value").append(LS);
if (levelsArr[0] > 0) {
final String gbg = " Empty or Garbage, size = " + levelsArr[0];
final String gbg = " Free Space, Size = " + levelsArr[0];
for (int i = 0; i < levelsArr[0]; i++) {
sb.append(" ").append(i + ", ").append(sketch.getItemAsString(i));
if (i == 0) { sb.append(gbg); }
Expand All @@ -381,10 +381,10 @@ private static String outputData(final KllSketch sketch) {
final int toIndex = levelsArr[level + 1]; // exclusive
String lvlData = "";
if (fromIndex < toIndex) {
lvlData = " level[" + level + "]=" + levelsArr[level]
+ ", cap=" + KllHelper.levelCapacity(k, numLevels, level, m)
+ ", size=" + KllHelper.currentLevelSizeItems(level, numLevels, levelsArr)
+ ", wt=" + (1 << level) + LS;
lvlData = " Level[" + level + "]=" + levelsArr[level]
+ ", Cap=" + KllHelper.levelCapacity(k, numLevels, level, m)
+ ", Size=" + KllHelper.currentLevelSizeItems(level, numLevels, levelsArr)
+ ", Wt=" + (1 << level) + LS;
}

for (int i = fromIndex; i < toIndex; i++) {
Expand All @@ -393,10 +393,25 @@ private static String outputData(final KllSketch sketch) {
}
level++;
}
sb.append(" ----------level[" + level + "]=" + levelsArr[level] + ": itemsArray[].length");
sb.append(" ----------Level[" + level + "]=" + levelsArr[level] + ": ItemsArray[].length");
sb.append(LS);
sb.append("### End data").append(LS);
sb.append("### End ItemsArray & LevelsArray Detail").append(LS);
return sb.toString();
}

static String outputLevels(final int k, final int m, final int numLevels, final int[] levelsArr) {
final StringBuilder sb = new StringBuilder();
sb.append(LS + "### KLL Levels Array:").append(LS)
.append(" Level, Offset: Nominal Capacity, Actual Capacity").append(LS);
int level = 0;
for ( ; level < numLevels; level++) {
sb.append(" ").append(level).append(", ").append(levelsArr[level]).append(": ")
.append(KllHelper.levelCapacity(k, numLevels, level, m))
.append(", ").append(KllHelper.currentLevelSizeItems(level, numLevels, levelsArr)).append(LS);
}
sb.append(" ").append(level).append(", ").append(levelsArr[level]).append(": ----ItemsArray[].length")
.append(LS);
sb.append("### End Levels Array").append(LS);
return sb.toString();
}

Expand Down Expand Up @@ -479,55 +494,58 @@ static byte[] toByteArray(final KllSketch srcSk, final boolean updatable) {

static <T> String toStringImpl(final KllSketch sketch, final boolean withSummary, final boolean withData,
final ArrayOfItemsSerDe<T> serDe) {
final SketchType sketchType = sketch.sketchType;
final boolean hasMemory = sketch.hasMemory();
final StringBuilder sb = new StringBuilder();
final int k = sketch.getK();
final int m = sketch.getM();
final long n = sketch.getN();
final int numLevels = sketch.getNumLevels();
final int[] fullLevelsArr = sketch.getLevelsArray(UPDATABLE);
//final int[] levelsArr = sketch.getLevelsArray(sketch.sketchStructure);
final String epsPct = String.format("%.3f%%", sketch.getNormalizedRankError(false) * 100);
final String epsPMFPct = String.format("%.3f%%", sketch.getNormalizedRankError(true) * 100);
final boolean compact = sketch.isCompactMemoryFormat();

final StringBuilder sb = new StringBuilder();
final String directStr = hasMemory ? "Direct" : "";
final String compactStr = compact ? "Compact" : "";
final String readOnlyStr = sketch.isReadOnly() ? "true" + ("(" + (compact ? "Format" : "Memory") + ")") : "false";
final String skTypeStr = sketchType.getName();
final String className = "Kll" + directStr + compactStr + skTypeStr;

sb.append(LS).append("### ").append(className).append(" Summary:").append(LS);
sb.append(" K : ").append(k).append(LS);
sb.append(" Dynamic min K : ").append(sketch.getMinK()).append(LS);
sb.append(" M : ").append(m).append(LS);
sb.append(" N : ").append(n).append(LS);
sb.append(" Epsilon : ").append(epsPct).append(LS);
sb.append(" Epsilon PMF : ").append(epsPMFPct).append(LS);
sb.append(" Empty : ").append(sketch.isEmpty()).append(LS);
sb.append(" Estimation Mode : ").append(sketch.isEstimationMode()).append(LS);
sb.append(" Levels : ").append(numLevels).append(LS);
sb.append(" Level 0 Sorted : ").append(sketch.isLevelZeroSorted()).append(LS);
sb.append(" Capacity Items : ").append(fullLevelsArr[numLevels]).append(LS);
sb.append(" Retained Items : ").append(sketch.getNumRetained()).append(LS);
sb.append(" Empty/Garbage Items : ").append(sketch.levelsArr[0]).append(LS);
sb.append(" ReadOnly : ").append(readOnlyStr).append(LS);
if (sketchType != ITEMS_SKETCH) {
sb.append(" Updatable Storage Bytes: ").append(sketch.currentSerializedSizeBytes(true)).append(LS);
}
sb.append(" Compact Storage Bytes : ").append(sketch.currentSerializedSizeBytes(false)).append(LS);

final String emptyStr = (sketchType == ITEMS_SKETCH) ? "Null" : "NaN";

sb.append(" Min Item : ").append(sketch.isEmpty() ? emptyStr : sketch.getMinItemAsString())
.append(LS);
sb.append(" Max Item : ").append(sketch.isEmpty() ? emptyStr : sketch.getMaxItemAsString())
.append(LS);
sb.append("### End sketch summary").append(LS);

if (! withSummary) { sb.setLength(0); }
if (withData) { sb.append(outputData(sketch)); }
if (withSummary) {
final SketchType sketchType = sketch.sketchType;
final boolean hasMemory = sketch.hasMemory();
final long n = sketch.getN();
final String epsPct = String.format("%.3f%%", sketch.getNormalizedRankError(false) * 100);
final String epsPMFPct = String.format("%.3f%%", sketch.getNormalizedRankError(true) * 100);
final boolean compact = sketch.isCompactMemoryFormat();

final String directStr = hasMemory ? "Direct" : "";
final String compactStr = compact ? "Compact" : "";
final String readOnlyStr = sketch.isReadOnly() ? "true" + ("(" + (compact ? "Format" : "Memory") + ")") : "false";
final String skTypeStr = sketchType.getName();
final String className = "Kll" + directStr + compactStr + skTypeStr;

sb.append(LS + "### ").append(className).append(" Summary:").append(LS);
sb.append(" K : ").append(k).append(LS);
sb.append(" Dynamic min K : ").append(sketch.getMinK()).append(LS);
sb.append(" M : ").append(m).append(LS);
sb.append(" N : ").append(n).append(LS);
sb.append(" Epsilon : ").append(epsPct).append(LS);
sb.append(" Epsilon PMF : ").append(epsPMFPct).append(LS);
sb.append(" Empty : ").append(sketch.isEmpty()).append(LS);
sb.append(" Estimation Mode : ").append(sketch.isEstimationMode()).append(LS);
sb.append(" Levels : ").append(numLevels).append(LS);
sb.append(" Level 0 Sorted : ").append(sketch.isLevelZeroSorted()).append(LS);
sb.append(" Capacity Items : ").append(fullLevelsArr[numLevels]).append(LS);
sb.append(" Retained Items : ").append(sketch.getNumRetained()).append(LS);
sb.append(" Free Space : ").append(sketch.levelsArr[0]).append(LS);
sb.append(" ReadOnly : ").append(readOnlyStr).append(LS);
if (sketchType != ITEMS_SKETCH) {
sb.append(" Updatable Storage Bytes: ").append(sketch.currentSerializedSizeBytes(true)).append(LS);
}
sb.append(" Compact Storage Bytes : ").append(sketch.currentSerializedSizeBytes(false)).append(LS);

final String emptyStr = (sketchType == ITEMS_SKETCH) ? "Null" : "NaN";

sb.append(" Min Item : ").append(sketch.isEmpty() ? emptyStr : sketch.getMinItemAsString())
.append(LS);
sb.append(" Max Item : ").append(sketch.isEmpty() ? emptyStr : sketch.getMaxItemAsString())
.append(LS);
sb.append("### End sketch summary").append(LS);
}
if (withData) {
sb.append(outputLevels(k, m, numLevels, fullLevelsArr));
sb.append(outputData(sketch));
}
return sb.toString();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,7 @@ private final KllItemsSketchSortedView<T> refreshSortedView() {

/**
* @return a full array of items as if the sketch was in COMPACT_FULL or UPDATABLE format.
* This will include zeros and possibly some garbage items.
* This will include zeros and possibly some free space.
*/
abstract T[] getTotalItemsArray();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ public class KllItemsSketchSortedView<T> implements GenericSortedView<T>, Partit
if (!sketch.hasMemory()) { sketch.setLevelZeroSorted(true); }
}

final int numQuantiles = srcLevels[srcNumLevels] - srcLevels[0]; //remove garbage
final int numQuantiles = srcLevels[srcNumLevels] - srcLevels[0]; //remove free space
quantiles = (T[]) Array.newInstance(sketch.serDe.getClassOfT(), numQuantiles);
cumWeights = new long[numQuantiles];
populateFromSketch(srcQuantiles, srcLevels, srcNumLevels, numQuantiles);
Expand Down
Loading

0 comments on commit 9c2f8d9

Please sign in to comment.